Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
Expand All @@ -55,6 +54,7 @@
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ConfigUtils;
import org.apache.zookeeper.util.CircularBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,17 +137,13 @@ public class QuorumCnxManager {
* Mapping from Peer to Thread number
*/
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

/*
* Reception queue
*/
public final ArrayBlockingQueue<Message> recvQueue;
/*
* Object to synchronize access to recvQueue
*/
private final Object recvQLock = new Object();
public final BlockingQueue<Message> recvQueue;

/*
* Shutdown flag
Expand Down Expand Up @@ -253,10 +249,10 @@ public static InitialMessage parse(Long protocolVersion, DataInputStream din) th
}

public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
this.queueSendMap = new ConcurrentHashMap<>();
this.senderWorkerMap = new ConcurrentHashMap<>();
this.lastMessageSent = new ConcurrentHashMap<>();

String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if (cnxToValue != null) {
Expand Down Expand Up @@ -438,7 +434,8 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
}

senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));

queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

sw.start();
rw.start();
Expand Down Expand Up @@ -573,7 +570,7 @@ private void handleConnection(Socket sock, DataInputStream din) throws IOExcepti

senderWorkerMap.put(sid, sw);

queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

sw.start();
rw.start();
Expand All @@ -598,10 +595,9 @@ public void toSend(Long sid, ByteBuffer b) {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new ArrayBlockingQueue<>(SEND_CAPACITY));
BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));
addToSendQueue(bq, b);
connectOne(sid);

}
}

Expand Down Expand Up @@ -724,9 +720,10 @@ public void connectAll() {
* Check if all queues are empty, indicating that all messages have been delivered.
*/
boolean haveDelivered() {
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
LOG.debug("Queue size: {}", queue.size());
if (queue.size() == 0) {
for (BlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
final int queueSize = queue.size();
LOG.debug("Queue size: {}", queueSize);
if (queueSize == 0) {
return true;
}
}
Expand Down Expand Up @@ -1085,7 +1082,7 @@ public void run() {
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
Expand All @@ -1103,7 +1100,7 @@ public void run() {

ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
Expand Down Expand Up @@ -1216,37 +1213,19 @@ public void run() {
}

/**
* Inserts an element in the specified queue. If the Queue is full, this
* method removes an element from the head of the Queue and then inserts
* the element at the tail. It can happen that an element is removed
* by another thread in {@link SendWorker#run() }
* method before this method attempts to remove an element from the queue.
* This will cause {@link ArrayBlockingQueue#remove() remove} to throw an
* exception, which is safe to ignore.
*
* Unlike {@link #addToRecvQueue(Message) addToRecvQueue} this method does
* not need to be synchronized since there is only one thread that inserts
* an element in the queue and another thread that reads from the queue.
* Inserts an element in the provided {@link BlockingQueue}. This method
* assumes that if the Queue is full, an element from the head of the Queue is
* removed and the new item is inserted at the tail of the queue. This is done
* to prevent a thread from blocking while inserting an element in the queue.
*
* @param queue
* Reference to the Queue
* @param buffer
* Reference to the buffer to be inserted in the queue
* @param queue Reference to the Queue
* @param buffer Reference to the buffer to be inserted in the queue
*/
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
if (queue.remainingCapacity() == 0) {
try {
queue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty Queue. Ignoring exception.", ne);
}
}
try {
queue.add(buffer);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert an element in the queue ", ie);
private void addToSendQueue(final BlockingQueue<ByteBuffer> queue,
final ByteBuffer buffer) {
final boolean success = queue.offer(buffer);
if (!success) {
throw new RuntimeException("Could not insert into receive queue");
}
}

Expand All @@ -1257,7 +1236,7 @@ private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buf
* @return
* true if the specified queue is empty
*/
private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
private boolean isSendQueueEmpty(final BlockingQueue<ByteBuffer> queue) {
return queue.isEmpty();
}

Expand All @@ -1266,60 +1245,37 @@ private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
* waiting up to the specified wait time if necessary for an element to
* become available.
*
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
private ByteBuffer pollSendQueue(final BlockingQueue<ByteBuffer> queue,
final long timeout, final TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}

/**
* Inserts an element in the {@link #recvQueue}. If the Queue is full, this
* methods removes an element from the head of the Queue and then inserts
* the element at the tail of the queue.
*
* This method is synchronized to achieve fairness between two threads that
* are trying to insert an element in the queue. Each thread checks if the
* queue is full, then removes the element at the head of the queue, and
* then inserts an element at the tail. This three-step process is done to
* prevent a thread from blocking while inserting an element in the queue.
* If we do not synchronize the call to this method, then a thread can grab
* a slot in the queue created by the second thread. This can cause the call
* to insert by the second thread to fail.
* Note that synchronizing this method does not block another thread
* from polling the queue since that synchronization is provided by the
* queue itself.
* methods removes an element from the head of the Queue and then inserts the
* element at the tail of the queue.
*
* @param msg
* Reference to the message to be inserted in the queue
* @param msg Reference to the message to be inserted in the queue
*/
public void addToRecvQueue(Message msg) {
synchronized (recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception.", ne);
}
}
try {
recvQueue.add(msg);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert element in the recvQueue ", ie);
}
}
public void addToRecvQueue(final Message msg) {
final boolean success = this.recvQueue.offer(msg);
if (!success) {
throw new RuntimeException("Could not insert into receive queue");
}
}

/**
* Retrieves and removes a message at the head of this queue,
* waiting up to the specified wait time if necessary for an element to
* become available.
*
* {@link ArrayBlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
* {@link BlockingQueue#poll(long, java.util.concurrent.TimeUnit)}
*/
public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
return recvQueue.poll(timeout, unit);
public Message pollRecvQueue(final long timeout, final TimeUnit unit)
throws InterruptedException {
return this.recvQueue.poll(timeout, unit);
}

public boolean connectedToPeer(long peerSid) {
Expand Down
Loading