Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public enum DisconnectReason {
CLOSE_CONNECTION_COMMAND("close_connection_command"),
CLEAN_UP("clean_up"),
CONNECTION_MODE_CHANGED("connection_mode_changed"),
RENEW_GLOBAL_SESSION_IN_RO_MODE("renew a global session in readonly mode"),
// Below reasons are NettyServerCnxnFactory only
CHANNEL_DISCONNECTED("channel disconnected"),
CHANNEL_CLOSED_EXCEPTION("channel_closed_exception"),
Expand Down Expand Up @@ -298,7 +299,7 @@ void disableRecv() {

protected ZooKeeperSaslServer zooKeeperSaslServer = null;

protected static class CloseRequestException extends IOException {
public static class CloseRequestException extends IOException {

private static final long serialVersionUID = -7854505709816442681L;
private DisconnectReason reason;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1413,13 +1413,13 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
long clientSessionId = connReq.getSessionId();
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(clientSessionId),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
validateSession(cnxn, sessionId);
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(sessionId),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
Expand All @@ -1433,6 +1433,17 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
}
}

/**
* Validate if a particular session can be reestablished.
*
* @param cnxn
* @param sessionId
*/
protected void validateSession(ServerCnxn cnxn, long sessionId)
throws IOException {
// do nothing
}

public boolean shouldThrottle(long outStandingCount) {
int globalOutstandingLimit = getGlobalOutstandingLimit();
if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,14 @@ public void run() {
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
ReplyHeader hdr = new ReplyHeader(
request.cxid,
zks.getZKDatabase().getDataTreeLastProcessedZxid(),
Code.NOTREADONLY.intValue());
try {
request.cnxn.sendResponse(hdr, null, null);
} catch (IOException e) {
LOG.error("IO exception while sending response", e);
}
sendErrorResponse(request);
continue;
case OpCode.closeSession:
case OpCode.createSession:
if (!request.isLocalSession()) {
sendErrorResponse(request);
continue;
}
}

// proceed to the next processor
Expand All @@ -109,6 +107,18 @@ public void run() {
LOG.info("ReadOnlyRequestProcessor exited loop!");
}

private void sendErrorResponse(Request request) {
ReplyHeader hdr = new ReplyHeader(
request.cxid,
zks.getZKDatabase().getDataTreeLastProcessedZxid(),
Code.NOTREADONLY.intValue());
try {
request.cnxn.sendResponse(hdr, null, null);
} catch (IOException e) {
LOG.error("IO exception while sending response", e);
}
}

@Override
public void processRequest(Request request) {
if (!finished) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -26,6 +27,7 @@
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerBean;
Expand Down Expand Up @@ -80,6 +82,15 @@ public synchronized void startup() {
LOG.info("Read-only server started");
}

@Override
protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException {
if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) {
String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
}
}

@Override
protected void registerJMX() {
// register with JMX
Expand Down