Skip to content

Commit fbc8679

Browse files
authored
core: Sticky TRANSIENT_FAILURE in PickFirstLoadBalancer (#10106)
When the subchannel is transitioning from TRANSIENT_FAILURE to either IDLE or CONNECTING we will not update the LB state. Additionally, if the subchannel becomes idle we request a new connection so that the subchannel will keep on trying to establish a connection.
1 parent c5b825a commit fbc8679

File tree

2 files changed

+71
-13
lines changed

2 files changed

+71
-13
lines changed

core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
final class PickFirstLoadBalancer extends LoadBalancer {
4040
private final Helper helper;
4141
private Subchannel subchannel;
42+
private ConnectivityState currentState = IDLE;
4243

4344
PickFirstLoadBalancer(Helper helper) {
4445
this.helper = checkNotNull(helper, "helper");
@@ -69,7 +70,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
6970

7071
// The channel state does not get updated when doing name resolving today, so for the moment
7172
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
72-
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
73+
updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
7374
subchannel.requestConnection();
7475
} else {
7576
subchannel.updateAddresses(servers);
@@ -86,20 +87,34 @@ public void handleNameResolutionError(Status error) {
8687
}
8788
// NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine
8889
// for time being.
89-
helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
90+
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
9091
}
9192

9293
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
93-
ConnectivityState currentState = stateInfo.getState();
94-
if (currentState == SHUTDOWN) {
94+
ConnectivityState newState = stateInfo.getState();
95+
if (newState == SHUTDOWN) {
9596
return;
9697
}
97-
if (stateInfo.getState() == TRANSIENT_FAILURE || stateInfo.getState() == IDLE) {
98+
if (newState == TRANSIENT_FAILURE || newState == IDLE) {
9899
helper.refreshNameResolution();
99100
}
100101

102+
// If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
103+
// transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
104+
// transient failure". Only a subchannel state change to READY will get the LB out of
105+
// TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
106+
// keep retrying for a connection.
107+
if (currentState == TRANSIENT_FAILURE) {
108+
if (newState == CONNECTING) {
109+
return;
110+
} else if (newState == IDLE) {
111+
requestConnection();
112+
return;
113+
}
114+
}
115+
101116
SubchannelPicker picker;
102-
switch (currentState) {
117+
switch (newState) {
103118
case IDLE:
104119
picker = new RequestConnectionPicker(subchannel);
105120
break;
@@ -115,9 +130,15 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
115130
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
116131
break;
117132
default:
118-
throw new IllegalArgumentException("Unsupported state:" + currentState);
133+
throw new IllegalArgumentException("Unsupported state:" + newState);
119134
}
120-
helper.updateBalancingState(currentState, picker);
135+
136+
updateBalancingState(newState, picker);
137+
}
138+
139+
private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
140+
currentState = state;
141+
helper.updateBalancingState(state, picker);
121142
}
122143

123144
@Override

core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,17 +260,17 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
260260
reset(mockHelper);
261261
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
262262

263+
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
264+
inOrder.verify(mockHelper).refreshNameResolution();
265+
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
266+
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
267+
263268
Status error = Status.UNAVAILABLE.withDescription("boom!");
264269
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
265270
inOrder.verify(mockHelper).refreshNameResolution();
266271
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
267272
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
268273

269-
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
270-
inOrder.verify(mockHelper).refreshNameResolution();
271-
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
272-
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
273-
274274
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
275275
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
276276
assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
@@ -279,6 +279,43 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
279279
verifyNoMoreInteractions(mockHelper);
280280
}
281281

282+
@Test
283+
public void pickAfterResolutionAfterTransientValue() throws Exception {
284+
InOrder inOrder = inOrder(mockHelper);
285+
286+
loadBalancer.acceptResolvedAddresses(
287+
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
288+
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
289+
CreateSubchannelArgs args = createArgsCaptor.getValue();
290+
assertThat(args.getAddresses()).isEqualTo(servers);
291+
verify(mockSubchannel).start(stateListenerCaptor.capture());
292+
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
293+
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
294+
verify(mockSubchannel).requestConnection();
295+
reset(mockHelper);
296+
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
297+
298+
// An error has happened.
299+
Status error = Status.UNAVAILABLE.withDescription("boom!");
300+
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
301+
inOrder.verify(mockHelper).refreshNameResolution();
302+
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
303+
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
304+
305+
// But a subsequent IDLE update should be ignored and the LB state not updated. Additionally,
306+
// a request for a new connection should be made keep the subchannel trying to connect.
307+
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
308+
inOrder.verify(mockHelper).refreshNameResolution();
309+
verifyNoMoreInteractions(mockHelper);
310+
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
311+
verify(mockSubchannel, times(2)).requestConnection();
312+
313+
// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
314+
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
315+
verifyNoMoreInteractions(mockHelper);
316+
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
317+
}
318+
282319
@Test
283320
public void nameResolutionError() throws Exception {
284321
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");

0 commit comments

Comments
 (0)