@@ -74,7 +74,6 @@ class PooledClusterConnectionProvider<K, V>
7474
7575 private static final InternalLogger logger = InternalLoggerFactory .getInstance (PooledClusterConnectionProvider .class );
7676
77- // Contains NodeId-identified and HostAndPort-identified connections.
7877 private final Lock stateLock = new ReentrantLock ();
7978
8079 private final boolean debugEnabled = logger .isDebugEnabled ();
@@ -157,44 +156,39 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Conne
157156 }
158157
159158 private CompletableFuture <StatefulRedisConnection <K , V >> getWriteConnection (int slot ) {
160- if (writers [slot ] == null ) {
161- stateLock .lock ();
162- try {
163- if (writers [slot ] == null ) {
164- RedisClusterNode master = partitions .getMasterBySlot (slot );
165- if (master == null ) {
166- clusterEventListener .onUncoveredSlot (slot );
167- return Futures .failed (new PartitionSelectorException (
168- "Cannot determine a partition for slot " + slot + "." , partitions .clone ()));
169- }
170159
171- // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
172- // host because the nodeId can be handled by a different host.
173- RedisURI uri = master . getUri () ;
174- ConnectionKey key = new ConnectionKey ( ConnectionIntent . WRITE , uri . getHost (), uri . getPort ());
160+ CompletableFuture < StatefulRedisConnection < K , V >> writer = writers [ slot ];
161+ if ( writer != null ) {
162+ return writer ;
163+ }
175164
176- ConnectionFuture <StatefulRedisConnection <K , V >> future = getConnectionAsync (key );
165+ RedisClusterNode master = partitions .getMasterBySlot (slot );
166+ if (master == null ) {
167+ clusterEventListener .onUncoveredSlot (slot );
168+ return Futures .failed (
169+ new PartitionSelectorException ("Cannot determine a partition for slot " + slot + "." , partitions .clone ()));
170+ }
177171
178- return future .thenApply (connection -> {
172+ // Use always host and port for slot-oriented operations. We don't want to get reconnected on a different
173+ // host because the nodeId can be handled by a different host.
174+ RedisURI uri = master .getUri ();
175+ ConnectionKey key = new ConnectionKey (ConnectionIntent .WRITE , uri .getHost (), uri .getPort ());
179176
180- stateLock .lock ();
181- try {
182- if (writers [slot ] == null ) {
183- writers [slot ] = CompletableFuture .completedFuture (connection );
184- }
185- } finally {
186- stateLock .unlock ();
187- }
177+ ConnectionFuture <StatefulRedisConnection <K , V >> future = getConnectionAsync (key );
178+
179+ return future .thenApply (connection -> {
188180
189- return connection ;
190- }).toCompletableFuture ();
181+ stateLock .lock ();
182+ try {
183+ if (writers [slot ] == null ) {
184+ writers [slot ] = CompletableFuture .completedFuture (connection );
191185 }
192186 } finally {
193187 stateLock .unlock ();
194188 }
195- }
196189
197- return writers [slot ];
190+ return connection ;
191+ }).toCompletableFuture ();
198192 }
199193
200194 private CompletableFuture <StatefulRedisConnection <K , V >> getReadConnection (int slot ) {
@@ -651,7 +645,6 @@ public ReadFrom getReadFrom() {
651645 }
652646
653647 /**
654- *
655648 * @return number of connections.
656649 */
657650 long getConnectionCount () {
@@ -682,8 +675,8 @@ private static RuntimeException connectionAttemptRejected(String message) {
682675 }
683676
684677 private boolean validateClusterNodeMembership () {
685- return redisClusterClient .getClusterClientOptions () == null
686- || redisClusterClient . getClusterClientOptions () .isValidateClusterNodeMembership ();
678+ return redisClusterClient .getClusterClientOptions () == null || redisClusterClient . getClusterClientOptions ()
679+ .isValidateClusterNodeMembership ();
687680 }
688681
689682 /**
0 commit comments