Skip to content

Commit 414a408

Browse files
liran2000eolivelli
authored andcommitted
CURATOR-599: Configurable ZookeeperFactory by ZKClientConfig
Option to use ZooKeeper client config. This seems mandatory for using zookeeper.request.timeout for preventing the potential race condition of hanging indefinitely, as described at the ticket. Author: liran2000 <[email protected]> Reviewers: Enrico Olivelli <[email protected]>, Zili Chen, Cameron McKenzie Closes #391 from liran2000/CURATOR-599
1 parent 4dffb5e commit 414a408

File tree

5 files changed

+217
-42
lines changed

5 files changed

+217
-42
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.curator.utils;
21+
22+
import org.apache.zookeeper.Watcher;
23+
import org.apache.zookeeper.ZooKeeper;
24+
import org.apache.zookeeper.admin.ZooKeeperAdmin;
25+
import org.apache.zookeeper.client.ZKClientConfig;
26+
27+
/**
28+
* Configurable ZookeeperFactory, by using org.apache.zookeeper.client.ZKClientConfig.
29+
*
30+
*/
31+
public class ConfigurableZookeeperFactory extends DefaultZookeeperFactory
32+
{
33+
34+
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
35+
boolean canBeReadOnly, ZKClientConfig zkClientConfig) throws Exception
36+
{
37+
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly, zkClientConfig);
38+
}
39+
}

curator-client/src/main/java/org/apache/curator/utils/ZookeeperFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.zookeeper.Watcher;
2222
import org.apache.zookeeper.ZooKeeper;
23+
import org.apache.zookeeper.admin.ZooKeeperAdmin;
24+
import org.apache.zookeeper.client.ZKClientConfig;
2325

2426
public interface ZookeeperFactory
2527
{
@@ -38,4 +40,26 @@ public interface ZookeeperFactory
3840
* @throws Exception errors
3941
*/
4042
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception;
43+
44+
/**
45+
* Allocate a new ZooKeeper instance
46+
*
47+
*
48+
* @param connectString the connection string
49+
* @param sessionTimeout session timeout in milliseconds
50+
* @param watcher optional watcher
51+
* @param canBeReadOnly if true, allow ZooKeeper client to enter
52+
* read only mode in case of a network partition. See
53+
* {@link ZooKeeper#ZooKeeper(String, int, Watcher, long, byte[], boolean)}
54+
* for details
55+
* @param zkClientConfig ZooKeeper client config
56+
* @return the instance
57+
* @throws Exception errors
58+
*/
59+
public default ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig zkClientConfig) throws Exception {
60+
if (zkClientConfig == null) {
61+
return newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
62+
}
63+
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, canBeReadOnly, zkClientConfig);
64+
}
4165
}

curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,16 @@
1919

2020
package org.apache.curator.framework;
2121

22-
import com.google.common.base.Preconditions;
23-
import com.google.common.collect.ImmutableList;
22+
import java.net.InetAddress;
23+
import java.net.UnknownHostException;
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.Objects;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.ThreadFactory;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import org.apache.curator.CuratorZookeeperClient;
2432
import org.apache.curator.RetryPolicy;
2533
import org.apache.curator.ensemble.EnsembleProvider;
2634
import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
@@ -41,15 +49,10 @@
4149
import org.apache.zookeeper.CreateMode;
4250
import org.apache.zookeeper.Watcher;
4351
import org.apache.zookeeper.ZooKeeper;
44-
import java.net.InetAddress;
45-
import java.net.UnknownHostException;
46-
import java.util.Arrays;
47-
import java.util.List;
48-
import java.util.Objects;
49-
import java.util.concurrent.Executor;
50-
import java.util.concurrent.ThreadFactory;
51-
import java.util.concurrent.TimeUnit;
52-
import org.apache.curator.CuratorZookeeperClient;
52+
import org.apache.zookeeper.client.ZKClientConfig;
53+
54+
import com.google.common.base.Preconditions;
55+
import com.google.common.collect.ImmutableList;
5356

5457
/**
5558
* Factory methods for creating framework-style clients
@@ -108,6 +111,29 @@ public static CuratorFramework newClient(String connectString, int sessionTimeou
108111
retryPolicy(retryPolicy).
109112
build();
110113
}
114+
115+
/**
116+
* Create a new client
117+
*
118+
* @param connectString list of servers to connect to
119+
* @param sessionTimeoutMs session timeout
120+
* @param connectionTimeoutMs connection timeout
121+
* @param retryPolicy retry policy to use
122+
* @param zkClientConfig ZKClientConfig
123+
* @return client
124+
*
125+
* @since 5.1.1, supported from ZooKeeper 3.6.1 and above.
126+
*/
127+
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy, ZKClientConfig zkClientConfig)
128+
{
129+
return builder().
130+
connectString(connectString).
131+
sessionTimeoutMs(sessionTimeoutMs).
132+
connectionTimeoutMs(connectionTimeoutMs).
133+
retryPolicy(retryPolicy).
134+
zkClientConfig(zkClientConfig).
135+
build();
136+
}
111137

112138
/**
113139
* Return the local address as bytes that can be used as a node payload
@@ -150,6 +176,7 @@ public static class Builder
150176
private Executor runSafeService = null;
151177
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory = ConnectionStateListenerManagerFactory.standard;
152178
private int simulatedSessionExpirationPercent = 100;
179+
private ZKClientConfig zkClientConfig;
153180

154181
/**
155182
* Apply the current values and build a new CuratorFramework
@@ -466,6 +493,11 @@ public Builder simulatedSessionExpirationPercent(int simulatedSessionExpirationP
466493
this.simulatedSessionExpirationPercent = simulatedSessionExpirationPercent;
467494
return this;
468495
}
496+
497+
public Builder zkClientConfig(ZKClientConfig zkClientConfig) {
498+
this.zkClientConfig = zkClientConfig;
499+
return this;
500+
}
469501

470502
/**
471503
* Add an enforced schema set
@@ -584,6 +616,10 @@ public ConnectionStateErrorPolicy getConnectionStateErrorPolicy()
584616
public int getSimulatedSessionExpirationPercent() {
585617
return simulatedSessionExpirationPercent;
586618
}
619+
620+
public ZKClientConfig getZkClientConfig() {
621+
return zkClientConfig;
622+
}
587623

588624
public SchemaSet getSchemaSet()
589625
{

curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,25 @@
1919

2020
package org.apache.curator.framework.imps;
2121

22-
import com.google.common.annotations.VisibleForTesting;
23-
import com.google.common.base.Preconditions;
24-
import com.google.common.collect.ImmutableList;
22+
import java.util.ArrayList;
23+
import java.util.Arrays;
24+
import java.util.Collection;
25+
import java.util.List;
26+
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.Callable;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.DelayQueue;
31+
import java.util.concurrent.Executor;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.Executors;
34+
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.ThreadFactory;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicLong;
39+
import java.util.concurrent.atomic.AtomicReference;
40+
2541
import org.apache.curator.CuratorConnectionLossException;
2642
import org.apache.curator.CuratorZookeeperClient;
2743
import org.apache.curator.RetryLoop;
@@ -30,7 +46,25 @@
3046
import org.apache.curator.framework.CuratorFramework;
3147
import org.apache.curator.framework.CuratorFrameworkFactory;
3248
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
33-
import org.apache.curator.framework.api.*;
49+
import org.apache.curator.framework.api.ACLProvider;
50+
import org.apache.curator.framework.api.CompressionProvider;
51+
import org.apache.curator.framework.api.CreateBuilder;
52+
import org.apache.curator.framework.api.CuratorEvent;
53+
import org.apache.curator.framework.api.CuratorEventType;
54+
import org.apache.curator.framework.api.CuratorListener;
55+
import org.apache.curator.framework.api.DeleteBuilder;
56+
import org.apache.curator.framework.api.ExistsBuilder;
57+
import org.apache.curator.framework.api.GetACLBuilder;
58+
import org.apache.curator.framework.api.GetChildrenBuilder;
59+
import org.apache.curator.framework.api.GetConfigBuilder;
60+
import org.apache.curator.framework.api.GetDataBuilder;
61+
import org.apache.curator.framework.api.ReconfigBuilder;
62+
import org.apache.curator.framework.api.RemoveWatchesBuilder;
63+
import org.apache.curator.framework.api.SetACLBuilder;
64+
import org.apache.curator.framework.api.SetDataBuilder;
65+
import org.apache.curator.framework.api.SyncBuilder;
66+
import org.apache.curator.framework.api.UnhandledErrorListener;
67+
import org.apache.curator.framework.api.WatchesBuilder;
3468
import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
3569
import org.apache.curator.framework.api.transaction.CuratorTransaction;
3670
import org.apache.curator.framework.api.transaction.TransactionOp;
@@ -51,17 +85,14 @@
5185
import org.apache.zookeeper.WatchedEvent;
5286
import org.apache.zookeeper.Watcher;
5387
import org.apache.zookeeper.ZooKeeper;
88+
import org.apache.zookeeper.client.ZKClientConfig;
5489
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
5590
import org.slf4j.Logger;
5691
import org.slf4j.LoggerFactory;
57-
import java.util.ArrayList;
58-
import java.util.Arrays;
59-
import java.util.Collection;
60-
import java.util.List;
61-
import java.util.concurrent.*;
62-
import java.util.concurrent.atomic.AtomicBoolean;
63-
import java.util.concurrent.atomic.AtomicLong;
64-
import java.util.concurrent.atomic.AtomicReference;
92+
93+
import com.google.common.annotations.VisibleForTesting;
94+
import com.google.common.base.Preconditions;
95+
import com.google.common.collect.ImmutableList;
6596

6697
public class CuratorFrameworkImpl implements CuratorFramework
6798
{
@@ -108,7 +139,7 @@ interface DebugBackgroundListener
108139

109140
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
110141
{
111-
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
142+
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());
112143
this.client = new CuratorZookeeperClient
113144
(
114145
localZookeeperFactory,
@@ -200,23 +231,26 @@ public QuorumVerifier getCurrentConfig()
200231
return (ensembleTracker != null) ? ensembleTracker.getCurrentConfig() : null;
201232
}
202233

203-
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
234+
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory, ZKClientConfig zkClientConfig)
204235
{
205-
return new ZookeeperFactory()
236+
return new ZookeeperFactory()
206237
{
207238
@Override
208239
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
209240
{
210-
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
211-
for ( AuthInfo auth : authInfos )
212-
{
213-
zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
214-
}
215-
241+
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly, zkClientConfig);
242+
addAuthInfos(zooKeeper);
216243
return zooKeeper;
217244
}
218245
};
219246
}
247+
248+
private void addAuthInfos(ZooKeeper zooKeeper) {
249+
for ( AuthInfo auth: authInfos)
250+
{
251+
zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
252+
}
253+
}
220254

221255
private ThreadFactory getThreadFactory(CuratorFrameworkFactory.Builder builder)
222256
{
@@ -1061,4 +1095,5 @@ private void processEvent(final CuratorEvent curatorEvent)
10611095
}
10621096
});
10631097
}
1098+
10641099
}

curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,16 @@
2525
import static org.junit.jupiter.api.Assertions.assertNull;
2626
import static org.junit.jupiter.api.Assertions.assertTrue;
2727
import static org.junit.jupiter.api.Assertions.fail;
28-
import com.google.common.collect.Lists;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.concurrent.ArrayBlockingQueue;
33+
import java.util.concurrent.BlockingQueue;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.LinkedBlockingQueue;
36+
import java.util.concurrent.TimeUnit;
37+
2938
import org.apache.curator.framework.AuthInfo;
3039
import org.apache.curator.framework.CuratorFramework;
3140
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -50,26 +59,24 @@
5059
import org.apache.zookeeper.Watcher;
5160
import org.apache.zookeeper.ZooDefs;
5261
import org.apache.zookeeper.ZooKeeper;
62+
import org.apache.zookeeper.client.ZKClientConfig;
5363
import org.apache.zookeeper.data.ACL;
5464
import org.apache.zookeeper.data.Stat;
5565
import org.junit.jupiter.api.AfterEach;
5666
import org.junit.jupiter.api.BeforeEach;
5767
import org.junit.jupiter.api.Tag;
5868
import org.junit.jupiter.api.Test;
69+
import org.slf4j.Logger;
70+
import org.slf4j.LoggerFactory;
5971

60-
import java.io.IOException;
61-
import java.util.ArrayList;
62-
import java.util.List;
63-
import java.util.concurrent.ArrayBlockingQueue;
64-
import java.util.concurrent.BlockingQueue;
65-
import java.util.concurrent.CountDownLatch;
66-
import java.util.concurrent.LinkedBlockingQueue;
67-
import java.util.concurrent.TimeUnit;
72+
import com.google.common.collect.Lists;
6873

6974
@SuppressWarnings("deprecation")
7075
@Tag(CuratorTestBase.zk35TestCompatibilityGroup)
7176
public class TestFramework extends BaseClassForTests
7277
{
78+
private final Logger log = LoggerFactory.getLogger(getClass());
79+
7380
@BeforeEach
7481
@Override
7582
public void setup() throws Exception
@@ -1073,7 +1080,41 @@ public void testCreateModes() throws Exception
10731080
}
10741081
finally
10751082
{
1076-
CloseableUtils.closeQuietly(client);
1083+
CloseableUtils.closeQuietly(client);
1084+
}
1085+
}
1086+
1087+
@Test
1088+
public void testConfigurableZookeeper() throws Exception
1089+
{
1090+
CuratorFramework client = null;
1091+
try
1092+
{
1093+
ZKClientConfig zkClientConfig = new ZKClientConfig();
1094+
String zookeeperRequestTimeout = "30000";
1095+
zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, zookeeperRequestTimeout);
1096+
client = CuratorFrameworkFactory.newClient(server.getConnectString(), 30000, 30000, new RetryOneTime(1), zkClientConfig);
1097+
client.start();
1098+
1099+
byte[] writtenBytes = {1, 2, 3};
1100+
client.create().forPath("/test", writtenBytes);
1101+
1102+
byte[] readBytes = client.getData().forPath("/test");
1103+
assertArrayEquals(writtenBytes, readBytes);
1104+
assertEquals(zookeeperRequestTimeout, client.getZookeeperClient().getZooKeeper().getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT));
1105+
1106+
} catch (NoSuchMethodError e) {
1107+
log.debug("NoSuchMethodError: ", e);
1108+
log.info("Got NoSuchMethodError, meaning probably this cannot be used with ZooKeeper version < 3.6.1");
1109+
}
1110+
finally
1111+
{
1112+
try {
1113+
CloseableUtils.closeQuietly(client);
1114+
} catch (NoSuchMethodError e) {
1115+
log.debug("close: NoSuchMethodError: ", e);
1116+
log.info("close: Got NoSuchMethodError, meaning probably this cannot be used with ZooKeeper version < 3.6.1");
1117+
}
10771118
}
10781119
}
10791120

0 commit comments

Comments
 (0)