Skip to content

Commit 076adf2

Browse files
committed
Merge branch 'master' of github.com:grpc/grpc-java into impl/xds_timeout_with_max_stream_duration
2 parents 77c9ba2 + ef90da0 commit 076adf2

File tree

15 files changed

+272
-181
lines changed

15 files changed

+272
-181
lines changed

alts/src/main/java/io/grpc/alts/internal/AltsAuthContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public AltsAuthContext(HandshakerResult result) {
3434
.setPeerServiceAccount(result.getPeerIdentity().getServiceAccount())
3535
.setLocalServiceAccount(result.getLocalIdentity().getServiceAccount())
3636
.setPeerRpcVersions(result.getPeerRpcVersions())
37-
.putAllPeerAttributes(result.getPeerIdentity().getAttributes())
37+
.putAllPeerAttributes(result.getPeerIdentity().getAttributesMap())
3838
.build();
3939
}
4040

api/src/main/java/io/grpc/ClientCall.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,10 @@ public abstract class ClientCall<ReqT, RespT> {
104104
* Callbacks for receiving metadata, response messages and completion status from the server.
105105
*
106106
* <p>Implementations are free to block for extended periods of time. Implementations are not
107-
* required to be thread-safe.
107+
* required to be thread-safe, but they must not be thread-hostile. The caller is free to call
108+
* an instance from multiple threads, but only one call simultaneously. A single thread may
109+
* interleave calls to multiple instances, so implementations using ThreadLocals must be careful
110+
* to avoid leaking inappropriate state (e.g., clearing the ThreadLocal before returning).
108111
*/
109112
public abstract static class Listener<T> {
110113

api/src/main/java/io/grpc/ClientInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
* <p>Providing authentication credentials is better served by {@link
3333
* CallCredentials}. But a {@code ClientInterceptor} could set the {@code
3434
* CallCredentials} within the {@link CallOptions}.
35+
*
36+
* <p>The interceptor may be called for multiple {@link ClientCall calls} by one or more threads
37+
* without completing the previous ones first. Refer to the
38+
* {@link io.grpc.ClientCall.Listener ClientCall.Listener} docs for more details regarding thread
39+
* safety of the returned listener.
3540
*/
3641
@ThreadSafe
3742
public interface ClientInterceptor {

api/src/main/java/io/grpc/ServerCall.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ public abstract class ServerCall<ReqT, RespT> {
4646
* close, which is guaranteed before completion.
4747
*
4848
* <p>Implementations are free to block for extended periods of time. Implementations are not
49-
* required to be thread-safe.
49+
* required to be thread-safe, but they must not be thread-hostile. The caller is free to call
50+
* an instance from multiple threads, but only one call simultaneously. A single thread may
51+
* interleave calls to multiple instances, so implementations using ThreadLocals must be careful
52+
* to avoid leaking inappropriate state (e.g., clearing the ThreadLocal before returning).
5053
*/
5154
// TODO(ejona86): We need to decide what to do in the case of server closing with non-cancellation
5255
// before client half closes. It may be that we treat such a case as an error. If we permit such

api/src/main/java/io/grpc/ServerInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
* <li>Logging and monitoring call behavior</li>
3030
* <li>Delegating calls to other servers</li>
3131
* </ul>
32+
*
33+
* <p>The interceptor may be called for multiple {@link ServerCall calls} by one or more threads
34+
* without completing the previous ones first. Refer to the
35+
* {@link io.grpc.ServerCall.Listener ServerCall.Listener} docs for more details regarding thread
36+
* safety of the returned listener.
3237
*/
3338
@ThreadSafe
3439
public interface ServerInterceptor {

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,6 @@ public static void main(String[] args) throws Exception {
6161
client.parseArgs(args);
6262
client.setUp();
6363

64-
Runtime.getRuntime().addShutdownHook(new Thread() {
65-
@Override
66-
@SuppressWarnings("CatchAndPrintStackTrace")
67-
public void run() {
68-
System.out.println("Shutting down");
69-
try {
70-
client.tearDown();
71-
} catch (Exception e) {
72-
e.printStackTrace();
73-
}
74-
}
75-
});
76-
7764
try {
7865
client.run();
7966
} finally {

rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
6262
import io.grpc.rls.Throttler.ThrottledException;
6363
import io.grpc.stub.StreamObserver;
64+
import io.grpc.util.ForwardingLoadBalancerHelper;
6465
import java.util.HashMap;
6566
import java.util.Map;
6667
import java.util.concurrent.ScheduledExecutorService;
@@ -110,7 +111,7 @@ final class CachingRlsLbClient {
110111
private final long staleAgeNanos;
111112
private final long callTimeoutNanos;
112113

113-
private final Helper helper;
114+
private final RlsLbHelper helper;
114115
private final ManagedChannel rlsChannel;
115116
private final RouteLookupServiceStub rlsStub;
116117
private final RlsPicker rlsPicker;
@@ -119,7 +120,7 @@ final class CachingRlsLbClient {
119120
private final ChannelLogger logger;
120121

121122
private CachingRlsLbClient(Builder builder) {
122-
helper = checkNotNull(builder.helper, "helper");
123+
helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
123124
scheduledExecutorService = helper.getScheduledExecutorService();
124125
synchronizationContext = helper.getSynchronizationContext();
125126
lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
@@ -200,6 +201,7 @@ public void onError(Throwable t) {
200201
logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
201202
response.setException(t);
202203
throttler.registerBackendResponse(false);
204+
helper.propagateRlsError();
203205
}
204206

205207
@Override
@@ -288,6 +290,41 @@ void requestConnection() {
288290
rlsChannel.getState(true);
289291
}
290292

293+
private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
294+
295+
final Helper helper;
296+
private ConnectivityState state;
297+
private SubchannelPicker picker;
298+
299+
RlsLbHelper(Helper helper) {
300+
this.helper = helper;
301+
}
302+
303+
@Override
304+
protected Helper delegate() {
305+
return helper;
306+
}
307+
308+
@Override
309+
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
310+
state = newState;
311+
picker = newPicker;
312+
super.updateBalancingState(newState, newPicker);
313+
}
314+
315+
void propagateRlsError() {
316+
getSynchronizationContext().execute(new Runnable() {
317+
@Override
318+
public void run() {
319+
if (picker != null) {
320+
// Refresh the channel state and let pending RPCs reprocess the picker.
321+
updateBalancingState(state, picker);
322+
}
323+
}
324+
});
325+
}
326+
}
327+
291328
/**
292329
* Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
293330
*/

rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import static org.mockito.Mockito.CALLS_REAL_METHODS;
2626
import static org.mockito.Mockito.inOrder;
2727
import static org.mockito.Mockito.mock;
28-
import static org.mockito.Mockito.never;
2928
import static org.mockito.Mockito.times;
3029
import static org.mockito.Mockito.verify;
3130

@@ -41,13 +40,16 @@
4140
import io.grpc.ForwardingChannelBuilder;
4241
import io.grpc.LoadBalancer;
4342
import io.grpc.LoadBalancer.Helper;
43+
import io.grpc.LoadBalancer.PickResult;
4444
import io.grpc.LoadBalancer.SubchannelPicker;
4545
import io.grpc.LoadBalancerProvider;
4646
import io.grpc.ManagedChannel;
4747
import io.grpc.ManagedChannelBuilder;
4848
import io.grpc.Metadata;
4949
import io.grpc.NameResolver;
50+
import io.grpc.NameResolver.ConfigOrError;
5051
import io.grpc.Status;
52+
import io.grpc.Status.Code;
5153
import io.grpc.SynchronizationContext;
5254
import io.grpc.inprocess.InProcessChannelBuilder;
5355
import io.grpc.inprocess.InProcessServerBuilder;
@@ -102,6 +104,7 @@ public class CachingRlsLbClientTest {
102104

103105
private static final RouteLookupConfig ROUTE_LOOKUP_CONFIG = getRouteLookupConfig();
104106
private static final int SERVER_LATENCY_MILLIS = 10;
107+
private static final String DEFAULT_TARGET = "fallback.cloudbigtable.googleapis.com";
105108

106109
@Rule
107110
public final MockitoRule mocks = MockitoJUnit.rule();
@@ -278,7 +281,9 @@ public void get_updatesLbState() throws Exception {
278281
rlsServerImpl.setLookupTable(
279282
ImmutableMap.of(
280283
routeLookupRequest,
281-
new RouteLookupResponse(ImmutableList.of("target"), "header-rls-data-value")));
284+
new RouteLookupResponse(
285+
ImmutableList.of("primary.cloudbigtable.googleapis.com"),
286+
"header-rls-data-value")));
282287

283288
// valid channel
284289
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
@@ -298,28 +303,41 @@ public void get_updatesLbState() throws Exception {
298303
assertThat(stateCaptor.getAllValues())
299304
.containsExactly(ConnectivityState.CONNECTING, ConnectivityState.READY);
300305
Metadata headers = new Metadata();
301-
pickerCaptor.getValue().pickSubchannel(
306+
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(
302307
new PickSubchannelArgsImpl(
303308
TestMethodDescriptors.voidMethod().toBuilder().setFullMethodName("foo/bar").build(),
304309
headers,
305310
CallOptions.DEFAULT));
311+
assertThat(pickResult.getStatus().isOk()).isTrue();
312+
assertThat(pickResult.getSubchannel()).isNotNull();
306313
assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value");
307314

308315
// move backoff further back to only test error behavior
309316
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
310317
// try to get invalid
311318
RouteLookupRequest invalidRouteLookupRequest =
312319
new RouteLookupRequest(
313-
"unknown_server", "/doesn/exists", "grpc", ImmutableMap.<String, String>of());
320+
"service1", "/doesn/exists", "grpc", ImmutableMap.<String, String>of());
314321
CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest);
315322
assertThat(errorResp.isPending()).isTrue();
316323
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
317324

318325
errorResp = getInSyncContext(invalidRouteLookupRequest);
319326
assertThat(errorResp.hasError()).isTrue();
320327

321-
inOrder.verify(helper, never())
322-
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
328+
// Channel is still READY because the subchannel for method /foo/bar is still READY.
329+
// Method /doesn/exists will use fallback child balancer and fail immediately.
330+
inOrder.verify(helper)
331+
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
332+
pickResult = pickerCaptor.getValue().pickSubchannel(
333+
new PickSubchannelArgsImpl(
334+
TestMethodDescriptors.voidMethod().toBuilder()
335+
.setFullMethodName("doesn/exists")
336+
.build(),
337+
headers,
338+
CallOptions.DEFAULT));
339+
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
340+
assertThat(pickResult.getStatus().getDescription()).isEqualTo("fallback not available");
323341
}
324342

325343
@Test
@@ -370,7 +388,7 @@ private static RouteLookupConfig getRouteLookupConfig() {
370388
/* staleAgeInMillis= */ TimeUnit.SECONDS.toMillis(240),
371389
/* cacheSizeBytes= */ 1000,
372390
/* validTargets= */ ImmutableList.of("a valid target"),
373-
/* defaultTarget= */ "us_east_1.cloudbigtable.googleapis.com");
391+
DEFAULT_TARGET);
374392
}
375393

376394
private static BackoffPolicy createBackoffPolicy(final long delay, final TimeUnit unit) {
@@ -395,6 +413,10 @@ public BackoffPolicy get() {
395413
}
396414
}
397415

416+
/**
417+
* A load balancer that immediately goes to READY when using the rls response target and
418+
* immediately fails when using the fallback target.
419+
*/
398420
private static final class TestLoadBalancerProvider extends LoadBalancerProvider {
399421

400422
@Override
@@ -412,21 +434,39 @@ public String getPolicyName() {
412434
return null;
413435
}
414436

437+
@Override
438+
public ConfigOrError parseLoadBalancingPolicyConfig(
439+
Map<String, ?> rawLoadBalancingPolicyConfig) {
440+
return ConfigOrError.fromConfig(rawLoadBalancingPolicyConfig);
441+
}
442+
415443
@Override
416444
public LoadBalancer newLoadBalancer(final Helper helper) {
417445
return new LoadBalancer() {
418446

419447
@Override
420448
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
421-
// TODO: make the picker accessible
422-
helper.updateBalancingState(
423-
ConnectivityState.READY,
424-
new SubchannelPicker() {
425-
@Override
426-
public PickResult pickSubchannel(PickSubchannelArgs args) {
427-
return PickResult.withSubchannel(mock(Subchannel.class));
428-
}
429-
});
449+
Map<?, ?> config = (Map<?, ?>) resolvedAddresses.getLoadBalancingPolicyConfig();
450+
if (DEFAULT_TARGET.equals(config.get("target"))) {
451+
helper.updateBalancingState(
452+
ConnectivityState.TRANSIENT_FAILURE,
453+
new SubchannelPicker() {
454+
@Override
455+
public PickResult pickSubchannel(PickSubchannelArgs args) {
456+
return PickResult.withError(
457+
Status.UNAVAILABLE.withDescription("fallback not available"));
458+
}
459+
});
460+
} else {
461+
helper.updateBalancingState(
462+
ConnectivityState.READY,
463+
new SubchannelPicker() {
464+
@Override
465+
public PickResult pickSubchannel(PickSubchannelArgs args) {
466+
return PickResult.withSubchannel(mock(Subchannel.class));
467+
}
468+
});
469+
}
430470
}
431471

432472
@Override

xds/src/main/java/io/grpc/xds/EnvoyProtoData.java

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -918,16 +918,6 @@ RouteAction getRouteAction() {
918918
return routeAction;
919919
}
920920

921-
// TODO(chengyuanzhang): delete and do not use after routing feature is always ON.
922-
boolean isDefaultRoute() {
923-
// For backward compatibility, all the other matchers are ignored.
924-
String prefix = routeMatch.getPathMatch().getPrefix();
925-
if (prefix != null) {
926-
return prefix.isEmpty() || prefix.equals("/");
927-
}
928-
return false;
929-
}
930-
931921
@Override
932922
public boolean equals(Object o) {
933923
if (this == o) {
@@ -992,17 +982,12 @@ static StructOrError<Route> fromEnvoyProtoRoute(
992982
}
993983

994984
@VisibleForTesting
995-
@SuppressWarnings("deprecation")
996985
@Nullable
997986
static StructOrError<RouteMatch> convertEnvoyProtoRouteMatch(
998987
io.envoyproxy.envoy.config.route.v3.RouteMatch proto) {
999988
if (proto.getQueryParametersCount() != 0) {
1000989
return null;
1001990
}
1002-
if (proto.hasCaseSensitive() && !proto.getCaseSensitive().getValue()) {
1003-
return StructOrError.fromError("Unsupported match option: case insensitive");
1004-
}
1005-
1006991
StructOrError<PathMatcher> pathMatch = convertEnvoyProtoPathMatcher(proto);
1007992
if (pathMatch.getErrorDetail() != null) {
1008993
return StructOrError.fromError(pathMatch.getErrorDetail());
@@ -1032,32 +1017,28 @@ static StructOrError<RouteMatch> convertEnvoyProtoRouteMatch(
10321017
pathMatch.getStruct(), Collections.unmodifiableList(headerMatchers), fractionMatch));
10331018
}
10341019

1035-
@SuppressWarnings("deprecation")
10361020
private static StructOrError<PathMatcher> convertEnvoyProtoPathMatcher(
10371021
io.envoyproxy.envoy.config.route.v3.RouteMatch proto) {
1038-
String path = null;
1039-
String prefix = null;
1040-
Pattern safeRegEx = null;
1022+
boolean caseSensitive = proto.getCaseSensitive().getValue();
10411023
switch (proto.getPathSpecifierCase()) {
10421024
case PREFIX:
1043-
prefix = proto.getPrefix();
1044-
break;
1025+
return StructOrError.fromStruct(
1026+
PathMatcher.fromPrefix(proto.getPrefix(), caseSensitive));
10451027
case PATH:
1046-
path = proto.getPath();
1047-
break;
1028+
return StructOrError.fromStruct(PathMatcher.fromPath(proto.getPath(), caseSensitive));
10481029
case SAFE_REGEX:
10491030
String rawPattern = proto.getSafeRegex().getRegex();
1031+
Pattern safeRegEx;
10501032
try {
10511033
safeRegEx = Pattern.compile(rawPattern);
10521034
} catch (PatternSyntaxException e) {
10531035
return StructOrError.fromError("Malformed safe regex pattern: " + e.getMessage());
10541036
}
1055-
break;
1037+
return StructOrError.fromStruct(PathMatcher.fromRegEx(safeRegEx));
10561038
case PATHSPECIFIER_NOT_SET:
10571039
default:
10581040
return StructOrError.fromError("Unknown path match type");
10591041
}
1060-
return StructOrError.fromStruct(new PathMatcher(path, prefix, safeRegEx));
10611042
}
10621043

10631044
private static StructOrError<FractionMatcher> convertEnvoyProtoFraction(

0 commit comments

Comments
 (0)