1616import java .util .concurrent .ConcurrentHashMap ;
1717import java .util .concurrent .locks .Lock ;
1818import java .util .concurrent .locks .ReentrantLock ;
19+ import java .util .concurrent .ScheduledExecutorService ;
20+ import java .util .concurrent .Executors ;
21+ import java .util .concurrent .TimeUnit ;
22+
1923import java .util .function .Consumer ;
2024import java .util .function .Predicate ;
2125
@@ -82,6 +86,12 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
8286
8387 private HealthStatusManager healthStatusManager = new HealthStatusManager ();
8488
89+ // Failback mechanism fields
90+ private final ScheduledExecutorService failbackScheduler = Executors .newSingleThreadScheduledExecutor (r -> {
91+ Thread t = new Thread (r , "failback-scheduler" );
92+ t .setDaemon (true );
93+ return t ;
94+ });
8595 // Store retry and circuit breaker configs for dynamic cluster addition/removal
8696 private RetryConfig retryConfig ;
8797 private CircuitBreakerConfig circuitBreakerConfig ;
@@ -151,6 +161,13 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste
151161 /// --- ///
152162
153163 this .fallbackExceptionList = multiClusterClientConfig .getFallbackExceptionList ();
164+
165+ // Start periodic failback checker
166+ if (multiClusterClientConfig .isFailbackSupported ()) {
167+ long failbackInterval = multiClusterClientConfig .getFailbackCheckInterval ();
168+ failbackScheduler .scheduleAtFixedRate (this ::periodicFailbackCheck , failbackInterval , failbackInterval ,
169+ TimeUnit .MILLISECONDS );
170+ }
154171 }
155172
156173 /**
@@ -194,6 +211,7 @@ public void remove(Endpoint endpoint) {
194211 if (multiClusterMap .size () < 2 ) {
195212 throw new JedisValidationException ("Cannot remove the last remaining endpoint" );
196213 }
214+ log .debug ("Removing endpoint {}" , endpoint );
197215
198216 activeClusterIndexLock .lock ();
199217 try {
@@ -251,7 +269,6 @@ private void addClusterInternal(MultiClusterClientConfig multiClusterClientConfi
251269 circuitBreakerEventPublisher .onError (event -> log .error (String .valueOf (event )));
252270 circuitBreakerEventPublisher .onFailureRateExceeded (event -> log .error (String .valueOf (event )));
253271 circuitBreakerEventPublisher .onSlowCallRateExceeded (event -> log .error (String .valueOf (event )));
254- circuitBreakerEventPublisher .onStateTransition (event -> log .warn (String .valueOf (event )));
255272
256273 ConnectionPool pool ;
257274 if (poolConfig != null ) {
@@ -281,20 +298,51 @@ private void handleStatusChange(HealthStatusChangeEvent eventArgs) {
281298
282299 clusterWithHealthChange .setHealthStatus (newStatus );
283300
284- if (newStatus .isHealthy ()) {
285- if (clusterWithHealthChange .isFailbackSupported () && activeCluster != clusterWithHealthChange ) {
286- // lets check if weighted switching is possible
287- Map .Entry <Endpoint , Cluster > failbackCluster = findWeightedHealthyClusterToIterate ();
288- if (failbackCluster == clusterWithHealthChange
289- && clusterWithHealthChange .getWeight () > activeCluster .getWeight ()) {
290- setActiveCluster (clusterWithHealthChange , false );
301+ if (!newStatus .isHealthy ()) {
302+ // Handle failover if this was the active cluster
303+ if (clusterWithHealthChange == activeCluster ) {
304+ clusterWithHealthChange .setGracePeriod ();
305+ if (iterateActiveCluster () != null ) {
306+ this .runClusterFailoverPostProcessor (activeCluster );
291307 }
292308 }
293- } else if (clusterWithHealthChange == activeCluster ) {
294- if (iterateActiveCluster () != null ) {
295- this .runClusterFailoverPostProcessor (activeCluster );
309+ }
310+ }
311+
312+ /**
313+ * Periodic failback checker - runs at configured intervals to check for failback opportunities
314+ */
315+ private void periodicFailbackCheck () {
316+ // Find the best candidate cluster for failback
317+ Cluster bestCandidate = null ;
318+ float bestWeight = activeCluster .getWeight ();
319+
320+ for (Map .Entry <Endpoint , Cluster > entry : multiClusterMap .entrySet ()) {
321+ Cluster cluster = entry .getValue ();
322+
323+ // Skip if this is already the active cluster
324+ if (cluster == activeCluster ) {
325+ continue ;
326+ }
327+
328+ // Skip if cluster is not healthy
329+ if (!cluster .isHealthy ()) {
330+ continue ;
331+ }
332+
333+ // This cluster is a valid candidate
334+ if (cluster .getWeight () > bestWeight ) {
335+ bestCandidate = cluster ;
336+ bestWeight = cluster .getWeight ();
296337 }
297338 }
339+
340+ // Perform failback if we found a better candidate
341+ if (bestCandidate != null ) {
342+ log .info ("Performing failback from {} to {} (higher weight cluster available)" ,
343+ activeCluster .getCircuitBreaker ().getName (), bestCandidate .getCircuitBreaker ().getName ());
344+ setActiveCluster (bestCandidate , true );
345+ }
298346 }
299347
300348 public Endpoint iterateActiveCluster () {
@@ -397,7 +445,21 @@ private boolean setActiveCluster(Cluster cluster, boolean validateConnection) {
397445
398446 @ Override
399447 public void close () {
400- activeCluster .getConnectionPool ().close ();
448+ // Shutdown the failback scheduler
449+ failbackScheduler .shutdown ();
450+ try {
451+ if (!failbackScheduler .awaitTermination (1 , TimeUnit .SECONDS )) {
452+ failbackScheduler .shutdownNow ();
453+ }
454+ } catch (InterruptedException e ) {
455+ failbackScheduler .shutdownNow ();
456+ Thread .currentThread ().interrupt ();
457+ }
458+
459+ // Close all cluster connection pools
460+ for (Cluster cluster : multiClusterMap .values ()) {
461+ cluster .getConnectionPool ().close ();
462+ }
401463 }
402464
403465 @ Override
@@ -425,26 +487,21 @@ public Cluster getCluster() {
425487 }
426488
427489 @ VisibleForTesting
428- public Cluster getCluster (Endpoint multiClusterIndex ) {
429- return multiClusterMap .get (multiClusterIndex );
490+ public Cluster getCluster (Endpoint endpoint ) {
491+ return multiClusterMap .get (endpoint );
430492 }
431493
432494 public CircuitBreaker getClusterCircuitBreaker () {
433495 return activeCluster .getCircuitBreaker ();
434496 }
435497
436- public CircuitBreaker getClusterCircuitBreaker (int multiClusterIndex ) {
437- return activeCluster .getCircuitBreaker ();
438- }
439-
440498 /**
441499 * Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list provided at
442500 * startup via the MultiClusterClientConfig, is unavailable and therefore no further failover is possible. Users can
443501 * manually failback to an available cluster
444502 */
445503 public boolean canIterateOnceMore () {
446504 Map .Entry <Endpoint , Cluster > e = findWeightedHealthyClusterToIterate ();
447-
448505 return e != null ;
449506 }
450507
@@ -472,6 +529,9 @@ public static class Cluster {
472529 private MultiClusterClientConfig multiClusterClientConfig ;
473530 private boolean disabled = false ;
474531
532+ // Grace period tracking
533+ private volatile long gracePeriodEndsAt = 0 ;
534+
475535 public Cluster (ConnectionPool connectionPool , Retry retry , CircuitBreaker circuitBreaker , float weight ,
476536 MultiClusterClientConfig multiClusterClientConfig ) {
477537 this .connectionPool = connectionPool ;
@@ -513,11 +573,14 @@ public float getWeight() {
513573 }
514574
515575 public boolean isCBForcedOpen () {
576+ if (circuitBreaker .getState () == State .FORCED_OPEN && !isInGracePeriod ()) {
577+ circuitBreaker .transitionToClosedState ();
578+ }
516579 return circuitBreaker .getState () == CircuitBreaker .State .FORCED_OPEN ;
517580 }
518581
519582 public boolean isHealthy () {
520- return healthStatus .isHealthy () && !isCBForcedOpen () && !disabled ;
583+ return healthStatus .isHealthy () && !isCBForcedOpen () && !disabled && ! isInGracePeriod () ;
521584 }
522585
523586 public boolean retryOnFailover () {
@@ -532,6 +595,20 @@ public void setDisabled(boolean disabled) {
532595 this .disabled = disabled ;
533596 }
534597
598+ /**
599+ * Checks if the cluster is currently in grace period
600+ */
601+ public boolean isInGracePeriod () {
602+ return System .currentTimeMillis () < gracePeriodEndsAt ;
603+ }
604+
605+ /**
606+ * Sets the grace period for this cluster
607+ */
608+ public void setGracePeriod () {
609+ gracePeriodEndsAt = System .currentTimeMillis () + multiClusterClientConfig .getGracePeriod ();
610+ }
611+
535612 /**
536613 * Whether failback is supported by client
537614 */
0 commit comments