Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public enum DisconnectReason {
CLOSE_CONNECTION_COMMAND("close_connection_command"),
CLEAN_UP("clean_up"),
CONNECTION_MODE_CHANGED("connection_mode_changed"),
RENEW_GLOBAL_SESSION_IN_RO_MODE("renew a global session in readonly mode"),
// Below reasons are NettyServerCnxnFactory only
CHANNEL_DISCONNECTED("channel disconnected"),
CHANNEL_CLOSED_EXCEPTION("channel_closed_exception"),
Expand Down Expand Up @@ -298,7 +299,7 @@ void disableRecv() {

protected ZooKeeperSaslServer zooKeeperSaslServer = null;

protected static class CloseRequestException extends IOException {
public static class CloseRequestException extends IOException {

private static final long serialVersionUID = -7854505709816442681L;
private DisconnectReason reason;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1413,13 +1413,13 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
long clientSessionId = connReq.getSessionId();
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(clientSessionId),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
validateSession(cnxn, sessionId);
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(sessionId),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
Expand All @@ -1433,6 +1433,17 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
}
}

/**
* Validate if a particular session can be reestablished.
*
* @param cnxn
* @param sessionId
*/
protected void validateSession(ServerCnxn cnxn, long sessionId)
throws IOException {
// do nothing
}

public boolean shouldThrottle(long outStandingCount) {
int globalOutstandingLimit = getGlobalOutstandingLimit();
if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,14 @@ public void run() {
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
ReplyHeader hdr = new ReplyHeader(
request.cxid,
zks.getZKDatabase().getDataTreeLastProcessedZxid(),
Code.NOTREADONLY.intValue());
try {
request.cnxn.sendResponse(hdr, null, null);
} catch (IOException e) {
LOG.error("IO exception while sending response", e);
}
sendErrorResponse(request);
continue;
case OpCode.closeSession:
case OpCode.createSession:
if (!request.isLocalSession()) {
sendErrorResponse(request);
continue;
}
}

// proceed to the next processor
Expand All @@ -109,6 +107,18 @@ public void run() {
LOG.info("ReadOnlyRequestProcessor exited loop!");
}

private void sendErrorResponse(Request request) {
ReplyHeader hdr = new ReplyHeader(
request.cxid,
zks.getZKDatabase().getDataTreeLastProcessedZxid(),
Code.NOTREADONLY.intValue());
try {
request.cnxn.sendResponse(hdr, null, null);
} catch (IOException e) {
LOG.error("IO exception while sending response", e);
}
}

@Override
public void processRequest(Request request) {
if (!finished) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.DataTreeBean;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerBean;
Expand Down Expand Up @@ -80,6 +84,49 @@ public synchronized void startup() {
LOG.info("Read-only server started");
}

@Override
public void createSessionTracker() {
sessionTracker = new LearnerSessionTracker(
this, getZKDatabase().getSessionWithTimeOuts(),
this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}

@Override
protected void startSessionTracker() {
((LearnerSessionTracker) sessionTracker).start();
}

@Override
protected void setLocalSessionFlag(Request si) {
switch (si.type) {
case OpCode.createSession:
if (self.areLocalSessionsEnabled()) {
si.setLocalSession(true);
}
break;
case OpCode.closeSession:
if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) {
si.setLocalSession(true);
} else {
LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode",
Long.toHexString(si.sessionId));
}
break;
default:
break;
}
}

@Override
protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException {
if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) {
String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
}
}

@Override
protected void registerJMX() {
// register with JMX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.ByteArrayOutputStream;
import java.io.LineNumberReader;
import java.io.StringReader;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
Expand Down Expand Up @@ -56,7 +57,6 @@ public class ReadOnlyModeTest extends ZKTestCase {
@Before
public void setUp() throws Exception {
System.setProperty("readonlymode.enabled", "true");
qu.startQuorum();
}

@After
Expand All @@ -70,6 +70,9 @@ public void tearDown() throws Exception {
*/
@Test(timeout = 90000)
public void testMultiTransaction() throws Exception {
qu.enableLocalSession(true);
qu.startQuorum();

CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
Expand All @@ -78,9 +81,12 @@ public void testMultiTransaction() throws Exception {
final String node1 = "/tnode1";
final String node2 = "/tnode2";
zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.close();
watcher.waitForDisconnected(CONNECTION_TIMEOUT);

watcher.reset();
qu.shutdown(2);
zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT);
assertEquals("Should be in r-o mode", States.CONNECTEDREADONLY, zk.getState());

Expand All @@ -107,6 +113,9 @@ public void testMultiTransaction() throws Exception {
*/
@Test(timeout = 90000)
public void testReadOnlyClient() throws Exception {
qu.enableLocalSession(true);
qu.startQuorum();

CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
Expand Down Expand Up @@ -158,6 +167,9 @@ public void testReadOnlyClient() throws Exception {
*/
@Test(timeout = 90000)
public void testConnectionEvents() throws Exception {
qu.enableLocalSession(true);
qu.startQuorum();

CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
boolean success = false;
Expand Down Expand Up @@ -198,6 +210,9 @@ public void testConnectionEvents() throws Exception {
*/
@Test(timeout = 90000)
public void testSessionEstablishment() throws Exception {
qu.enableLocalSession(true);
qu.startQuorum();

qu.shutdown(2);

CountdownWatcher watcher = new CountdownWatcher();
Expand Down Expand Up @@ -227,13 +242,53 @@ public void testSessionEstablishment() throws Exception {
zk.close();
}

@Test(timeout = 90000)
public void testGlobalSessionInRO() throws Exception {
qu.startQuorum();

CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT);
LOG.info("global session created 0x{}", Long.toHexString(zk.getSessionId()));

watcher.reset();
qu.shutdown(2);
try {
watcher.waitForConnected(CONNECTION_TIMEOUT);
fail("Should not be able to renew a global session");
} catch (TimeoutException e) {
}
zk.close();

watcher.reset();
zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
try {
watcher.waitForConnected(CONNECTION_TIMEOUT);
fail("Should not be able to create a global session");
} catch (TimeoutException e) {
}
zk.close();

qu.getPeer(1).peer.enableLocalSessions(true);
zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
try {
watcher.waitForConnected(CONNECTION_TIMEOUT);
} catch (TimeoutException e) {
fail("Should be able to create a local session");
}
zk.close();
}

/**
* Ensures that client seeks for r/w servers while it's connected to r/o
* server.
*/
@SuppressWarnings("deprecation")
@Test(timeout = 90000)
public void testSeekForRwServer() throws Exception {
qu.enableLocalSession(true);
qu.startQuorum();

// setup the logger to capture all logs
Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();
Expand Down