Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 110 additions & 63 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.StaticHostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.client.ZooKeeperBuilder;
import org.apache.zookeeper.client.ZooKeeperOptions;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -445,7 +447,9 @@ public boolean isConnected() {
* if an invalid chroot path is specified
*/
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
this(connectString, sessionTimeout, watcher, false);
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.toOptions());
}

/**
Expand Down Expand Up @@ -498,7 +502,10 @@ public ZooKeeper(
int sessionTimeout,
Watcher watcher,
ZKClientConfig conf) throws IOException {
this(connectString, sessionTimeout, watcher, false, conf);
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withClientConfig(conf)
.toOptions());
}

/**
Expand Down Expand Up @@ -564,7 +571,11 @@ public ZooKeeper(
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly, aHostProvider, null);
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withCanBeReadOnly(canBeReadOnly)
.withHostProvider(ignored -> aHostProvider)
.toOptions());
}

/**
Expand Down Expand Up @@ -634,25 +645,12 @@ public ZooKeeper(
HostProvider hostProvider,
ZKClientConfig clientConfig
) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);

this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
this.hostProvider = hostProvider;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);

cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this.clientConfig,
watcher,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withCanBeReadOnly(canBeReadOnly)
.withHostProvider(ignored -> hostProvider)
.withClientConfig(clientConfig)
.toOptions());
}

ClientCnxn createConnection(
Expand All @@ -662,6 +660,8 @@ ClientCnxn createConnection(
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly
) throws IOException {
return new ClientCnxn(
Expand All @@ -671,6 +671,8 @@ ClientCnxn createConnection(
clientConfig,
defaultWatcher,
clientCnxnSocket,
sessionId,
sessionPasswd,
canBeReadOnly);
}

Expand Down Expand Up @@ -731,7 +733,10 @@ public ZooKeeper(
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly) throws IOException {
this(connectString, sessionTimeout, watcher, canBeReadOnly, createDefaultHostProvider(connectString));
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withCanBeReadOnly(canBeReadOnly)
.toOptions());
}

/**
Expand Down Expand Up @@ -794,13 +799,11 @@ public ZooKeeper(
Watcher watcher,
boolean canBeReadOnly,
ZKClientConfig conf) throws IOException {
this(
connectString,
sessionTimeout,
watcher,
canBeReadOnly,
createDefaultHostProvider(connectString),
conf);
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withCanBeReadOnly(canBeReadOnly)
.withClientConfig(conf)
.toOptions());
}

/**
Expand Down Expand Up @@ -861,7 +864,10 @@ public ZooKeeper(
Watcher watcher,
long sessionId,
byte[] sessionPasswd) throws IOException {
this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withSession(sessionId, sessionPasswd)
.toOptions());
}

/**
Expand Down Expand Up @@ -936,15 +942,12 @@ public ZooKeeper(
byte[] sessionPasswd,
boolean canBeReadOnly,
HostProvider aHostProvider) throws IOException {
this(
connectString,
sessionTimeout,
watcher,
sessionId,
sessionPasswd,
canBeReadOnly,
aHostProvider,
null);
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withSession(sessionId, sessionPasswd)
.withCanBeReadOnly(canBeReadOnly)
.withHostProvider(ignored -> aHostProvider)
.toOptions());
}

/**
Expand Down Expand Up @@ -1025,20 +1028,72 @@ public ZooKeeper(
boolean canBeReadOnly,
HostProvider hostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} "
+ "sessionTimeout={} watcher={} sessionId=0x{} sessionPasswd={}",
connectString,
sessionTimeout,
watcher,
Long.toHexString(sessionId),
(sessionPasswd == null ? "<null>" : "<hidden>"));
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withSession(sessionId, sessionPasswd)
.withDefaultWatcher(watcher)
.withCanBeReadOnly(canBeReadOnly)
.withHostProvider(ignored -> hostProvider)
.withClientConfig(clientConfig)
.toOptions());
}

/**
* Create a ZooKeeper client and establish session asynchronously.
*
* <p>This constructor will initiate connection to the server and return
* immediately - potentially (usually) before the session is fully established.
* The watcher from options will be notified of any changes in state. This
* notification can come at any point before or after the constructor call
* has returned.
*
* <p>The instantiated ZooKeeper client object will pick an arbitrary server
* from the connect string and attempt to connect to it. If establishment of
* the connection fails, another server in the connect string will be tried
* (the order is non-deterministic, as we random shuffle the list), until a
* connection is established. The client will continue attempts until the
* session is explicitly closed (or the session is expired by the server).
*
* @param options options for ZooKeeper client
* @throws IOException in cases of IO failure
*/
@InterfaceAudience.Private
public ZooKeeper(ZooKeeperOptions options) throws IOException {
String connectString = options.getConnectString();
int sessionTimeout = options.getSessionTimeout();
long sessionId = options.getSessionId();
byte[] sessionPasswd = sessionId == 0 ? new byte[16] : options.getSessionPasswd();
Watcher watcher = options.getDefaultWatcher();
boolean canBeReadOnly = options.isCanBeReadOnly();

if (sessionId == 0) {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
} else {
LOG.info(
"Initiating client connection, connectString={} "
+ "sessionTimeout={} watcher={} sessionId=0x{} sessionPasswd={}",
connectString,
sessionTimeout,
watcher,
Long.toHexString(sessionId),
(sessionPasswd == null ? "<null>" : "<hidden>"));
}

ZKClientConfig clientConfig = options.getClientConfig();
this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider;
if (options.getHostProvider() != null) {
hostProvider = options.getHostProvider().apply(connectStringParser.getServerAddresses());
} else {
hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
}
this.hostProvider = hostProvider;

cnxn = new ClientCnxn(
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
Expand All @@ -1048,7 +1103,7 @@ public ZooKeeper(
sessionId,
sessionPasswd,
canBeReadOnly);
cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.seenRwServerBefore = sessionId != 0; // since user has provided sessionId
cnxn.start();
}

Expand Down Expand Up @@ -1120,19 +1175,11 @@ public ZooKeeper(
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) throws IOException {
this(
connectString,
sessionTimeout,
watcher,
sessionId,
sessionPasswd,
canBeReadOnly,
createDefaultHostProvider(connectString));
}

// default hostprovider
private static HostProvider createDefaultHostProvider(String connectString) {
return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses());
this(new ZooKeeperBuilder(connectString, sessionTimeout)
.withDefaultWatcher(watcher)
.withSession(sessionId, sessionPasswd)
.withCanBeReadOnly(canBeReadOnly)
.toOptions());
}

// VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.client.ZooKeeperOptions;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.GetDataResponse;
Expand All @@ -53,6 +54,11 @@ public class ZooKeeperAdmin extends ZooKeeper {

private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperAdmin.class);

@InterfaceAudience.Private
public ZooKeeperAdmin(ZooKeeperOptions options) throws IOException {
super(options);
}

/**
* Create a ZooKeeperAdmin object which is used to perform dynamic reconfiguration
* operations.
Expand Down
Loading