2525import static io .grpc .ConnectivityState .SHUTDOWN ;
2626import static io .grpc .ConnectivityState .TRANSIENT_FAILURE ;
2727
28+ import com .google .common .annotations .VisibleForTesting ;
29+ import com .google .common .base .Joiner ;
2830import com .google .common .base .MoreObjects ;
2931import com .google .common .collect .HashMultiset ;
3032import com .google .common .collect .Multiset ;
3436import io .grpc .EquivalentAddressGroup ;
3537import io .grpc .InternalLogId ;
3638import io .grpc .LoadBalancer ;
39+ import io .grpc .Metadata ;
3740import io .grpc .Status ;
3841import io .grpc .SynchronizationContext ;
3942import io .grpc .util .MultiChildLoadBalancer ;
43+ import io .grpc .xds .ThreadSafeRandom .ThreadSafeRandomImpl ;
4044import io .grpc .xds .client .XdsLogger ;
4145import io .grpc .xds .client .XdsLogger .XdsLogLevel ;
4246import java .net .SocketAddress ;
@@ -69,13 +73,21 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer {
6973 new LazyLoadBalancer .Factory (pickFirstLbProvider );
7074 private final XdsLogger logger ;
7175 private final SynchronizationContext syncContext ;
76+ private final ThreadSafeRandom random ;
7277 private List <RingEntry > ring ;
78+ @ Nullable private Metadata .Key <String > requestHashHeaderKey ;
7379
7480 RingHashLoadBalancer (Helper helper ) {
81+ this (helper , ThreadSafeRandomImpl .instance );
82+ }
83+
84+ @ VisibleForTesting
85+ RingHashLoadBalancer (Helper helper , ThreadSafeRandom random ) {
7586 super (helper );
7687 syncContext = checkNotNull (helper .getSynchronizationContext (), "syncContext" );
7788 logger = XdsLogger .withLogId (InternalLogId .allocate ("ring_hash_lb" , helper .getAuthority ()));
7889 logger .log (XdsLogLevel .INFO , "Created" );
90+ this .random = checkNotNull (random , "random" );
7991 }
8092
8193 @ Override
@@ -92,6 +104,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
92104 if (config == null ) {
93105 throw new IllegalArgumentException ("Missing RingHash configuration" );
94106 }
107+ requestHashHeaderKey =
108+ config .requestHashHeader .isEmpty ()
109+ ? null
110+ : Metadata .Key .of (config .requestHashHeader , Metadata .ASCII_STRING_MARSHALLER );
95111 Map <EquivalentAddressGroup , Long > serverWeights = new HashMap <>();
96112 long totalWeight = 0L ;
97113 for (EquivalentAddressGroup eag : addrList ) {
@@ -197,7 +213,8 @@ protected void updateOverallBalancingState() {
197213 overallState = TRANSIENT_FAILURE ;
198214 }
199215
200- RingHashPicker picker = new RingHashPicker (syncContext , ring , getChildLbStates ());
216+ RingHashPicker picker =
217+ new RingHashPicker (syncContext , ring , getChildLbStates (), requestHashHeaderKey , random );
201218 getHelper ().updateBalancingState (overallState , picker );
202219 this .currentConnectivityState = overallState ;
203220 }
@@ -325,21 +342,32 @@ private static final class RingHashPicker extends SubchannelPicker {
325342 // TODO(chengyuanzhang): can be more performance-friendly with
326343 // IdentityHashMap<Subchannel, ConnectivityStateInfo> and RingEntry contains Subchannel.
327344 private final Map <Endpoint , SubchannelView > pickableSubchannels ; // read-only
345+ @ Nullable private final Metadata .Key <String > requestHashHeaderKey ;
346+ private final ThreadSafeRandom random ;
347+ private final boolean hasEndpointInConnectingState ;
328348
329349 private RingHashPicker (
330350 SynchronizationContext syncContext , List <RingEntry > ring ,
331- Collection <ChildLbState > children ) {
351+ Collection <ChildLbState > children , Metadata .Key <String > requestHashHeaderKey ,
352+ ThreadSafeRandom random ) {
332353 this .syncContext = syncContext ;
333354 this .ring = ring ;
355+ this .requestHashHeaderKey = requestHashHeaderKey ;
356+ this .random = random ;
334357 pickableSubchannels = new HashMap <>(children .size ());
358+ boolean hasConnectingState = false ;
335359 for (ChildLbState childLbState : children ) {
336360 pickableSubchannels .put ((Endpoint )childLbState .getKey (),
337361 new SubchannelView (childLbState , childLbState .getCurrentState ()));
362+ if (childLbState .getCurrentState () == CONNECTING ) {
363+ hasConnectingState = true ;
364+ }
338365 }
366+ this .hasEndpointInConnectingState = hasConnectingState ;
339367 }
340368
341369 // Find the ring entry with hash next to (clockwise) the RPC's hash (binary search).
342- private int getTargetIndex (Long requestHash ) {
370+ private int getTargetIndex (long requestHash ) {
343371 if (ring .size () <= 1 ) {
344372 return 0 ;
345373 }
@@ -365,38 +393,77 @@ private int getTargetIndex(Long requestHash) {
365393
366394 @ Override
367395 public PickResult pickSubchannel (PickSubchannelArgs args ) {
368- Long requestHash = args .getCallOptions ().getOption (XdsNameResolver .RPC_HASH_KEY );
369- if (requestHash == null ) {
370- return PickResult .withError (RPC_HASH_NOT_FOUND );
396+ // Determine request hash.
397+ boolean usingRandomHash = false ;
398+ long requestHash ;
399+ if (requestHashHeaderKey == null ) {
400+ // Set by the xDS config selector.
401+ Long rpcHashFromCallOptions = args .getCallOptions ().getOption (XdsNameResolver .RPC_HASH_KEY );
402+ if (rpcHashFromCallOptions == null ) {
403+ return PickResult .withError (RPC_HASH_NOT_FOUND );
404+ }
405+ requestHash = rpcHashFromCallOptions ;
406+ } else {
407+ Iterable <String > headerValues = args .getHeaders ().getAll (requestHashHeaderKey );
408+ if (headerValues != null ) {
409+ requestHash = hashFunc .hashAsciiString (Joiner .on ("," ).join (headerValues ));
410+ } else {
411+ requestHash = random .nextLong ();
412+ usingRandomHash = true ;
413+ }
371414 }
372415
373416 int targetIndex = getTargetIndex (requestHash );
374417
375- // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
376- // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
377- // CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
378- // IDLE, we initiate a connection.
379- for (int i = 0 ; i < ring .size (); i ++) {
380- int index = (targetIndex + i ) % ring .size ();
381- SubchannelView subchannelView = pickableSubchannels .get (ring .get (index ).addrKey );
382- ChildLbState childLbState = subchannelView .childLbState ;
383-
384- if (subchannelView .connectivityState == READY ) {
385- return childLbState .getCurrentPicker ().pickSubchannel (args );
418+ if (!usingRandomHash ) {
419+ // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, we ignore
420+ // all TF subchannels and find the first ring entry in READY, CONNECTING or IDLE. If
421+ // CONNECTING or IDLE we return a pick with no results. Additionally, if that entry is in
422+ // IDLE, we initiate a connection.
423+ for (int i = 0 ; i < ring .size (); i ++) {
424+ int index = (targetIndex + i ) % ring .size ();
425+ SubchannelView subchannelView = pickableSubchannels .get (ring .get (index ).addrKey );
426+ ChildLbState childLbState = subchannelView .childLbState ;
427+
428+ if (subchannelView .connectivityState == READY ) {
429+ return childLbState .getCurrentPicker ().pickSubchannel (args );
430+ }
431+
432+ // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
433+ // are failed unless there is a READY connection.
434+ if (subchannelView .connectivityState == CONNECTING ) {
435+ return PickResult .withNoResult ();
436+ }
437+
438+ if (subchannelView .connectivityState == IDLE ) {
439+ syncContext .execute (() -> {
440+ childLbState .getLb ().requestConnection ();
441+ });
442+
443+ return PickResult .withNoResult (); // Indicates that this should be retried after backoff
444+ }
386445 }
387-
388- // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs
389- // are failed unless there is a READY connection.
390- if (subchannelView .connectivityState == CONNECTING ) {
391- return PickResult .withNoResult ();
446+ } else {
447+ // Using a random hash. Find and use the first READY ring entry, triggering at most one
448+ // entry to attempt connection.
449+ boolean requestedConnection = hasEndpointInConnectingState ;
450+ for (int i = 0 ; i < ring .size (); i ++) {
451+ int index = (targetIndex + i ) % ring .size ();
452+ SubchannelView subchannelView = pickableSubchannels .get (ring .get (index ).addrKey );
453+ ChildLbState childLbState = subchannelView .childLbState ;
454+ if (subchannelView .connectivityState == READY ) {
455+ return childLbState .getCurrentPicker ().pickSubchannel (args );
456+ }
457+ if (!requestedConnection && subchannelView .connectivityState == IDLE ) {
458+ syncContext .execute (
459+ () -> {
460+ childLbState .getLb ().requestConnection ();
461+ });
462+ requestedConnection = true ;
463+ }
392464 }
393-
394- if (subchannelView .connectivityState == IDLE ) {
395- syncContext .execute (() -> {
396- childLbState .getLb ().requestConnection ();
397- });
398-
399- return PickResult .withNoResult (); // Indicates that this should be retried after backoff
465+ if (requestedConnection ) {
466+ return PickResult .withNoResult ();
400467 }
401468 }
402469
@@ -444,20 +511,24 @@ public int compareTo(RingEntry entry) {
444511 static final class RingHashConfig {
445512 final long minRingSize ;
446513 final long maxRingSize ;
514+ final String requestHashHeader ;
447515
448- RingHashConfig (long minRingSize , long maxRingSize ) {
516+ RingHashConfig (long minRingSize , long maxRingSize , String requestHashHeader ) {
449517 checkArgument (minRingSize > 0 , "minRingSize <= 0" );
450518 checkArgument (maxRingSize > 0 , "maxRingSize <= 0" );
451519 checkArgument (minRingSize <= maxRingSize , "minRingSize > maxRingSize" );
520+ checkNotNull (requestHashHeader );
452521 this .minRingSize = minRingSize ;
453522 this .maxRingSize = maxRingSize ;
523+ this .requestHashHeader = requestHashHeader ;
454524 }
455525
456526 @ Override
457527 public String toString () {
458528 return MoreObjects .toStringHelper (this )
459529 .add ("minRingSize" , minRingSize )
460530 .add ("maxRingSize" , maxRingSize )
531+ .add ("requestHashHeader" , requestHashHeader )
461532 .toString ();
462533 }
463534 }
0 commit comments