Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,8 @@ of servers -- that is, when deploying clusters of servers.
<a name="id_multi_address"></a>
Since ZooKeeper 3.6.0 it is possible to specify **multiple addresses** for each
ZooKeeper server (see [ZOOKEEPER-3188](https://issues.apache.org/jira/projects/ZOOKEEPER/issues/ZOOKEEPER-3188)).
This helps to increase availability and adds network level
To enable this feature, you must set the *multiAddress.enabled* configuration property
to *true*. This helps to increase availability and adds network level
resiliency to ZooKeeper. When multiple physical network interfaces are used
for the servers, ZooKeeper is able to bind on all interfaces and runtime switching
to a working interface in case a network error. The different addresses can be specified
Expand All @@ -1220,7 +1221,18 @@ of servers -- that is, when deploying clusters of servers.
server.1=zoo1-net1:2888:3888|zoo1-net2:2889:3889
server.2=zoo2-net1:2888:3888|zoo2-net2:2889:3889
server.3=zoo3-net1:2888:3888|zoo3-net2:2889:3889



###### Note
>By enabling this feature, the Quorum protocol (ZooKeeper Server-Server protocol) will change.
The users will not notice this and when anyone starts a ZooKeeper cluster with the new config,
everything will work normally. However, it's not possible to enable this feature and specify
multiple addresses during a rolling upgrade if the old ZooKeeper cluster didn't support the
*multiAddress* feature (and the new Quorum protocol). In case if you need this feature but you
also need to perform a rolling upgrade from a ZooKeeper cluster older than *3.6.0*, then you
first need to do the rolling upgrade without enabling the MultiAddress feature and later make
a separate rolling restart with the new configuration where **multiAddress.enabled** is set
to **true** and multiple addresses are provided.

* *syncLimit* :
(No Java system property)
Expand Down Expand Up @@ -1584,6 +1596,16 @@ and [SASL authentication for ZooKeeper](https://cwiki.apache.org/confluence/disp
(e.g. the zk/[email protected] client principal will be authenticated in ZooKeeper as zk/myhost)
Default: false

* *multiAddress.enabled* :
(Java system property: **zookeeper.multiAddress.enabled**)
**New in 3.6.0:**
Since ZooKeeper 3.6.0 you can also [specify multiple addresses](#id_multi_address)
for each ZooKeeper server instance (this can increase availability when multiple physical
network interfaces can be used parallel in the cluster). Setting this parameter to
**true** will enable this feature. Please note, that you can not enable this feature
during a rolling upgrade if the version of the old ZooKeeper cluster is prior to 3.6.0.
The default value is **false**.

* *multiAddress.reachabilityCheckTimeoutMs* :
(Java system property: **zookeeper.multiAddress.reachabilityCheckTimeoutMs**)
**New in 3.6.0:**
Expand All @@ -1596,7 +1618,8 @@ and [SASL authentication for ZooKeeper](https://cwiki.apache.org/confluence/disp
in parallel for the different addresses, so the timeout you set here is the maximum time will be taken
by checking the reachability of all addresses.
The default value is **1000**.


This parameter has no effect, unless you enable the MultiAddress feature by setting *multiAddress.enabled=true*.

<a name="Experimental+Options%2FFeatures"></a>

Expand Down Expand Up @@ -1688,6 +1711,7 @@ the variable does.
Please note, disabling the reachability check will cause the cluster not to be able to reconfigure
itself properly during network problems, so the disabling is advised only during testing.

This parameter has no effect, unless you enable the MultiAddress feature by setting *multiAddress.enabled=true*.

<a name="Disabling+data+directory+autocreation"></a>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
Expand Down Expand Up @@ -113,9 +115,12 @@ public class QuorumCnxManager {
private AtomicLong observerCounter = new AtomicLong(-1);

/*
* Protocol identifier used among peers
* Protocol identifier used among peers (must be a negative number for backward compatibility reasons)
*/
public static final long PROTOCOL_VERSION = -65535L;
// the following protocol version was sent in every connection initiation message since ZOOKEEPER-107 released in 3.5.0
public static final long PROTOCOL_VERSION_V1 = -65536L;
// ZOOKEEPER-3188 introduced multiple addresses in the connection initiation message, released in 3.6.0
public static final long PROTOCOL_VERSION_V2 = -65535L;

/*
* Max buffer size to be read from the network.
Expand Down Expand Up @@ -218,7 +223,7 @@ public static class InitialMessageException extends Exception {
public static InitialMessage parse(Long protocolVersion, DataInputStream din) throws InitialMessageException, IOException {
Long sid;

if (protocolVersion != PROTOCOL_VERSION) {
if (protocolVersion != PROTOCOL_VERSION_V1 && protocolVersion != PROTOCOL_VERSION_V2) {
throw new InitialMessageException("Got unrecognized protocol version %s", protocolVersion);
}

Expand All @@ -236,6 +241,8 @@ public static InitialMessage parse(Long protocolVersion, DataInputStream din) th
throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", num_read, remaining, sid);
}

// in PROTOCOL_VERSION_V1 we expect to get a single address here represented as a 'host:port' string
// in PROTOCOL_VERSION_V2 we expect to get multiple addresses like: 'host1:port1|host2:port2|...'
String[] addressStrings = new String(b).split("\\|");
List<InetSocketAddress> addresses = new ArrayList<>(addressStrings.length);
for (String addr : addressStrings) {
Expand Down Expand Up @@ -416,10 +423,20 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {

// Sending id and challenge

// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
// First sending the protocol version (in other words - message type).
// For backward compatibility reasons we stick to the old protocol version, unless the MultiAddress
// feature is enabled. During rolling upgrade, we must make sure that all the servers can
// understand the protocol version we use to avoid multiple partitions. see ZOOKEEPER-3720
long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;
dout.writeLong(protocolVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of new protocol (v2) I would like to spend here a 64bit long of 'flags' (filled with zeros in 3.6.0) in order to introduce a more futureproof 'feature detection' that is better than using protocol version numbers.

We can then add 'multiaddress' as an enabled feature.

We should do more reasoning and design steps but this way of coding the initial handshake is quite common and it will open up the way for future improvements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I really like this idea with the 'feature flags', I will change accordingly.

The only constraint is that the version number must be negative. I think I saw in the git history, that some years ago, when the protocol version was not introduced yet, this was the way how someone solved the introduction of versioning. The old code sent some positive sid or message size, and the new code sent a negative version number. This way enforcing the old code to break and throw away the message, while the new code was able to verify if it got the message in the new format based on the negative number. But I need to double-check this tomorrow, maybe I remember wrong.

And also there can be maybe 1-2 negative numbers already used as protocol versions, so I will check which bits are still available.

Although I think it is enough to support the rolling upgrade from the first stable 3.4 release, which can simplify things, and we don't need to go back in time in the code too much.

Copy link
Contributor Author

@symat symat Feb 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I did some deeper digging in the git history, this is what I found:

The different version numbers / behaviors we had before:

  • protocol version originally introduced by ZOOKEEPER-107 in 3.5.0
  • before 3.5.0, always a positive SID was sent in the beginning of the connection initiation message and there were no address information there. Even 3.4.14 still only sends a positive SID.
  • since ZOOKEEPER-107 (in 3.5.0+): version -65536L (0xffff0000) was sent, and on the receiver side:
    • if any positive (incl. 0) Long number received, then accept the number as SID and get the address from the own config
    • if -65536L was received, then parsing the message with the new format and use the address provided there
    • for any other negative numbers, we fail
  • in ZOOKEEPER-1633 (3.4.7), the receiver logic was making more clever to expect the SID as the second Long in the message, if the first Long was negative. So from 3.4.7 all the 3.4 servers can at least not die due to the new message formats, and rolling upgrades become possible without multiple partitions. However, all 3.4.X still sends only the positive SID and will not use any address information from the initial message (just reads the SID).

This also means, that we can choose any negative number as new protocol version for the MultiAddress feature, and then the rolling upgrade from 3.4.7-3.4.14 to 3.6.0 could work theoretically even with MultiAddress enabled, as long as we always send the SID right after the protocol version.

But the value of the protocol version is actually checked in 3.5.0, so we definitely need to disable MultiAddress to be able to get the rolling upgrade to work from 3.5.

Long story short: we only have a single protocol version, -65536 (0xffff0000) as of now, plus the case when positive numbers are sent. The -65535 (0xffff0001) seems to me a good protocol version candidate for the multiaddress compatible protocol, since it is also only a single bit difference between the two. So if later we will introduce the 'feature flags' we can already say that the last bit represented the multiaddress feature.

What do you think? (maybe I misunderstood your proposal here)

But I just don't know how this whole thing should look in the later releases. Also this whole protocol version is now only about the initial message. We don't know if there is any other incompatibility between the other messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli what do you think?

should we add the 'feature flags' implementation here in 3.6.0? Or is it enough if we choose a new optional version number that differs only in a single bit to the current version number, so that we are keeping this possibility of 'feature flags' open for the future? (currently we have only a single 'feature' anyway)

I am not sure if it worths to push for a more generic solution, given that we don't really know yet how to design this in the future.

dout.writeLong(self.getId());
String addr = self.getElectionAddress().getAllAddresses().stream()

// now we send our election address. For the new protocol version, we can send multiple addresses.
Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2
? self.getElectionAddress().getAllAddresses()
: Arrays.asList(self.getElectionAddress().getOne());

String addr = addressesToSend.stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
Expand Down Expand Up @@ -639,7 +656,7 @@ public void toSend(Long sid, ByteBuffer b) {
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server {}", sid);
if (electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) {
if (self.isMultiAddressEnabled() && electionAddr.size() > 1 && self.isMultiAddressReachabilityCheckEnabled()) {
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
Expand Down Expand Up @@ -710,7 +727,7 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
synchronized void connectOne(long sid) {
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server {}", sid);
if (self.isMultiAddressReachabilityCheckEnabled()) {
if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,22 @@ public void setObserverMasterPort(int observerMasterPort) {
this.observerMasterPort = observerMasterPort;
}

private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis();
public static final String CONFIG_KEY_MULTI_ADDRESS_ENABLED = "zookeeper.multiAddress.enabled";
public static final String CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED = "false";

private boolean multiAddressEnabled = true;
public boolean isMultiAddressEnabled() {
return multiAddressEnabled;
}

public void setMultiAddressEnabled(boolean multiAddressEnabled) {
this.multiAddressEnabled = multiAddressEnabled;
LOG.info("multiAddress.enabled set to {}", multiAddressEnabled);
}

public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS = "zookeeper.multiAddress.reachabilityCheckTimeoutMs";

private int multiAddressReachabilityCheckTimeoutMs = (int) MultipleAddresses.DEFAULT_TIMEOUT.toMillis();
public int getMultiAddressReachabilityCheckTimeoutMs() {
return multiAddressReachabilityCheckTimeoutMs;
}
Expand All @@ -172,6 +186,8 @@ public void setMultiAddressReachabilityCheckTimeoutMs(int multiAddressReachabili
LOG.info("multiAddress.reachabilityCheckTimeoutMs set to {}", multiAddressReachabilityCheckTimeoutMs);
}

public static final String CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED = "zookeeper.multiAddress.reachabilityCheckEnabled";

private boolean multiAddressReachabilityCheckEnabled = true;

public boolean isMultiAddressReachabilityCheckEnabled() {
Expand Down Expand Up @@ -274,6 +290,12 @@ public QuorumServer(long sid, String addressStr) throws ConfigException {
}
}

boolean multiAddressEnabled = Boolean.parseBoolean(
System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
if (!multiAddressEnabled && serverAddresses.length > 1) {
throw new ConfigException("Multiple address feature is disabled, but multiple addresses were specified for sid " + sid);
}

for (String serverAddress : serverAddresses) {
String serverParts[] = ConfigUtils.getHostAndPort(serverAddress);
if ((serverClientParts.length > 2) || (serverParts.length < 3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ public class QuorumPeerConfig {
protected int quorumCnxnThreadsSize;

// multi address related configs
private boolean multiAddressEnabled = Boolean.parseBoolean(
System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED));
private boolean multiAddressReachabilityCheckEnabled =
Boolean.parseBoolean(System.getProperty("zookeeper.multiAddress.reachabilityCheckEnabled",
"true"));
Boolean.parseBoolean(System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED, "true"));
private int multiAddressReachabilityCheckTimeoutMs =
Integer.parseInt(System.getProperty("zookeeper.multiAddress.reachabilityCheckTimeoutMs",
String.valueOf(MultipleAddresses.DEFAULT_TIMEOUT.toMillis())));

Integer.parseInt(System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS,
String.valueOf(MultipleAddresses.DEFAULT_TIMEOUT.toMillis())));

/**
* Minimum snapshot retain count.
Expand Down Expand Up @@ -398,6 +398,8 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti
} else if (key.startsWith("metricsProvider.")) {
String keyForMetricsProvider = key.substring(16);
metricsProviderConfiguration.put(keyForMetricsProvider, value);
} else if (key.equals("multiAddress.enabled")) {
multiAddressEnabled = Boolean.parseBoolean(value);
} else if (key.equals("multiAddress.reachabilityCheckTimeoutMs")) {
multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(value);
} else if (key.equals("multiAddress.reachabilityCheckEnabled")) {
Expand Down Expand Up @@ -939,6 +941,10 @@ public Boolean getQuorumListenOnAllIPs() {
return quorumListenOnAllIPs;
}

public boolean isMultiAddressEnabled() {
return multiAddressEnabled;
}

public boolean isMultiAddressReachabilityCheckEnabled() {
return multiAddressReachabilityCheckEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServ
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void testCnxManagerSpinLock() throws Exception {

InetSocketAddress otherAddr = peers.get(2L).electionAddr.getReachableOrOne();
DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream());
dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION);
dout.writeLong(QuorumCnxManager.PROTOCOL_VERSION_V1);
dout.writeLong(2);
String addr = otherAddr.getHostString() + ":" + otherAddr.getPort();
byte[] addr_bytes = addr.getBytes();
Expand Down Expand Up @@ -604,7 +604,7 @@ public void testInitialMessage() throws Exception {
dout.writeBytes(hostport);

din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V1, din);
fail("long message accepted");
} catch (InitialMessage.InitialMessageException ex) {
}
Expand All @@ -620,7 +620,7 @@ public void testInitialMessage() throws Exception {
dout.writeBytes(hostport);

din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V1, din);
fail("bad hostport accepted");
} catch (InitialMessage.InitialMessageException ex) {
}
Expand All @@ -637,7 +637,7 @@ public void testInitialMessage() throws Exception {

// now parse it
din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V1, din);
assertEquals(Long.valueOf(5), msg.sid);
assertEquals(Arrays.asList(new InetSocketAddress("10.0.0.2", 3888)), msg.electionAddr);
} catch (InitialMessage.InitialMessageException ex) {
Expand All @@ -656,7 +656,7 @@ public void testInitialMessage() throws Exception {

// now parse it
din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din);
msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION_V2, din);
assertEquals(Long.valueOf(5), msg.sid);
assertEquals(Arrays.asList(new InetSocketAddress("1.1.1.1", 9999),
new InetSocketAddress("2.2.2.2", 8888),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.apache.zookeeper.util.ServiceUtils;
import org.junit.After;
import org.junit.Test;

public class LearnerTest extends ZKTestCase {
Expand Down Expand Up @@ -134,6 +135,11 @@ protected Socket createSocket() throws X509Exception, IOException {
}
}

@After
public void cleanup() {
System.clearProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED);
}

@Test(expected = IOException.class)
public void connectionRetryTimeoutTest() throws Exception {
Learner learner = new TestLearner();
Expand Down Expand Up @@ -178,6 +184,7 @@ public void connectionInitLimitTimeoutTest() throws Exception {

@Test
public void shouldTryMultipleAddresses() throws Exception {
System.setProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "true");
TestLearner learner = new TestLearner();
learner.self = new QuorumPeer();
learner.self.setTickTime(2000);
Expand Down Expand Up @@ -205,6 +212,7 @@ public void shouldTryMultipleAddresses() throws Exception {

@Test
public void multipleAddressesSomeAreFailing() throws Exception {
System.setProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, "true");
TestLearner learner = new TestLearner();
learner.self = new QuorumPeer();
learner.self.setTickTime(2000);
Expand Down
Loading