Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
interface ThreadSafeRandom {
int nextInt(int bound);

long nextLong();

final class ThreadSafeRandomImpl implements ThreadSafeRandom {

static final ThreadSafeRandom instance = new ThreadSafeRandomImpl();
Expand All @@ -33,5 +35,10 @@ private ThreadSafeRandomImpl() {}
public int nextInt(int bound) {
return ThreadLocalRandom.current().nextInt(bound);
}

@Override
public long nextLong() {
return ThreadLocalRandom.current().nextLong();
}
}
}
104 changes: 69 additions & 35 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteAction;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.XdsClient.LdsResourceWatcher;
import io.grpc.xds.XdsClient.LdsUpdate;
Expand Down Expand Up @@ -95,6 +96,8 @@ final class XdsNameResolver extends NameResolver {

static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
static final CallOptions.Key<Long> RPC_HASH_KEY =
CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
@VisibleForTesting
static boolean enableTimeout =
Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
Expand All @@ -119,13 +122,15 @@ final class XdsNameResolver extends NameResolver {
@VisibleForTesting
static AtomicLong activeFaultInjectedStreamCounter = new AtomicLong();

private final InternalLogId logId;
private final XdsLogger logger;
private final String authority;
private final ServiceConfigParser serviceConfigParser;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService scheduler;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final ThreadSafeRandom random;
private final XxHash64 hashFunc = XxHash64.INSTANCE;
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
private final ConfigSelector configSelector = new ConfigSelector();

Expand All @@ -152,7 +157,8 @@ final class XdsNameResolver extends NameResolver {
this.scheduler = checkNotNull(scheduler, "scheduler");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.random = checkNotNull(random, "random");
logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name));
logId = InternalLogId.allocate("xds-resolver", name);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
}

Expand Down Expand Up @@ -347,26 +353,33 @@ static boolean matchHostName(String hostName, String pattern) {
private final class ConfigSelector extends InternalConfigSelector {
@Override
public Result selectConfig(PickSubchannelArgs args) {
// Index ASCII headers by keys.
Map<String, Iterable<String>> asciiHeaders = new HashMap<>();
// Index ASCII headers by key, multi-value headers are concatenated for matching purposes.
Map<String, String> asciiHeaders = new HashMap<>();
Metadata headers = args.getHeaders();
for (String headerName : headers.keys()) {
if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
continue;
}
Metadata.Key<String> key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
asciiHeaders.put(headerName, headers.getAll(key));
Iterable<String> values = headers.getAll(key);
if (values != null) {
asciiHeaders.put(headerName, Joiner.on(",").join(values));
}
}
// Special hack for exposing headers: "content-type".
asciiHeaders.put("content-type", "application/grpc");
String cluster = null;
Route selectedRoute = null;
HttpFault selectedFaultConfig;
RoutingConfig routingCfg;
do {
selectedFaultConfig = routingConfig.faultConfig;
for (Route route : routingConfig.routes) {
routingCfg = routingConfig;
selectedFaultConfig = routingCfg.faultConfig;
for (Route route : routingCfg.routes) {
if (matchRoute(route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(),
asciiHeaders, random)) {
selectedRoute = route;
if (routingConfig.applyFaultInjection && route.httpFault() != null) {
if (routingCfg.applyFaultInjection && route.httpFault() != null) {
selectedFaultConfig = route.httpFault();
}
break;
Expand All @@ -390,7 +403,7 @@ public Result selectConfig(PickSubchannelArgs args) {
accumulator += weightedCluster.weight();
if (select < accumulator) {
cluster = weightedCluster.name();
if (routingConfig.applyFaultInjection && weightedCluster.httpFault() != null) {
if (routingCfg.applyFaultInjection && weightedCluster.httpFault() != null) {
selectedFaultConfig = weightedCluster.httpFault();
}
break;
Expand All @@ -403,7 +416,7 @@ public Result selectConfig(PickSubchannelArgs args) {
if (enableTimeout) {
Long timeoutNano = selectedRoute.routeAction().timeoutNano();
if (timeoutNano == null) {
timeoutNano = routingConfig.fallbackTimeoutNano;
timeoutNano = routingCfg.fallbackTimeoutNano;
}
if (timeoutNano > 0) {
rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNano);
Expand All @@ -417,7 +430,6 @@ public Result selectConfig(PickSubchannelArgs args) {
parsedServiceConfig.getError().augmentDescription(
"Failed to parse service config (method config)"));
}
final String finalCluster = cluster;
if (selectedFaultConfig != null && selectedFaultConfig.maxActiveFaults() != null
&& activeFaultInjectedStreamCounter.get() >= selectedFaultConfig.maxActiveFaults()) {
selectedFaultConfig = null;
Expand Down Expand Up @@ -447,15 +459,18 @@ public Result selectConfig(PickSubchannelArgs args) {
abortStatus = determineFaultAbortStatus(selectedFaultConfig.faultAbort(), headers);
}
}
final String finalCluster = cluster;
final Long finalDelayNanos = delayNanos;
final Status finalAbortStatus = abortStatus;
final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), asciiHeaders);
class ClusterSelectionInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
final Channel next) {
final CallOptions callOptionsForCluster =
callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster);
callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
.withOption(RPC_HASH_KEY, hash);
Supplier<ClientCall<ReqT, RespT>> configApplyingCallSupplier =
new Supplier<ClientCall<ReqT, RespT>>() {
@Override
Expand Down Expand Up @@ -553,6 +568,36 @@ public void run() {
}
}

private long generateHash(List<HashPolicy> hashPolicies, Map<String, String> headers) {
Long hash = null;
for (HashPolicy policy : hashPolicies) {
Long newHash = null;
if (policy.type() == HashPolicy.Type.HEADER) {
if (headers.containsKey(policy.headerName())) {
String value = headers.get(policy.headerName());
if (policy.regEx() != null && policy.regExSubstitution() != null) {
value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
}
newHash = hashFunc.hashAsciiString(value);
}
} else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
newHash = hashFunc.hashLong(logId.getId());
}
if (newHash != null ) {
// Rotating the old value prevents duplicate hash rules from cancelling each other out
// and preserves all of the entropy.
long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
hash = oldHash ^ newHash;
}
// If the policy is a terminal policy and a hash has been generated, ignore
// the rest of the hash policies.
if (policy.isTerminal() && hash != null) {
break;
}
}
return hash == null ? random.nextLong() : hash;
}

@Nullable
private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
Long delayNanos;
Expand Down Expand Up @@ -748,7 +793,7 @@ public void sendMessage(ReqT message) {}

@VisibleForTesting
static boolean matchRoute(RouteMatch routeMatch, String fullMethodName,
Map<String, Iterable<String>> headers, ThreadSafeRandom random) {
Map<String, String> headers, ThreadSafeRandom random) {
if (!matchPath(routeMatch.pathMatcher(), fullMethodName)) {
return false;
}
Expand All @@ -774,52 +819,41 @@ static boolean matchPath(PathMatcher pathMatcher, String fullMethodName) {
}

private static boolean matchHeaders(
List<HeaderMatcher> headerMatchers, Map<String, Iterable<String>> headers) {
List<HeaderMatcher> headerMatchers, Map<String, String> headers) {
for (HeaderMatcher headerMatcher : headerMatchers) {
Iterable<String> headerValues = headers.get(headerMatcher.name());
// Special cases for hiding headers: "grpc-previous-rpc-attempts".
if (headerMatcher.name().equals("grpc-previous-rpc-attempts")) {
headerValues = null;
}
// Special case for exposing headers: "content-type".
if (headerMatcher.name().equals("content-type")) {
headerValues = Collections.singletonList("application/grpc");
}
if (!matchHeader(headerMatcher, headerValues)) {
if (!matchHeader(headerMatcher, headers.get(headerMatcher.name()))) {
return false;
}
}
return true;
}

@VisibleForTesting
static boolean matchHeader(HeaderMatcher headerMatcher,
@Nullable Iterable<String> headerValues) {
static boolean matchHeader(HeaderMatcher headerMatcher, @Nullable String value) {
if (headerMatcher.present() != null) {
return (headerValues == null) == headerMatcher.present().equals(headerMatcher.inverted());
return (value == null) == headerMatcher.present().equals(headerMatcher.inverted());
}
if (headerValues == null) {
if (value == null) {
return false;
}
String valueStr = Joiner.on(",").join(headerValues);
boolean baseMatch;
if (headerMatcher.exactValue() != null) {
baseMatch = headerMatcher.exactValue().equals(valueStr);
baseMatch = headerMatcher.exactValue().equals(value);
} else if (headerMatcher.safeRegEx() != null) {
baseMatch = headerMatcher.safeRegEx().matches(valueStr);
baseMatch = headerMatcher.safeRegEx().matches(value);
} else if (headerMatcher.range() != null) {
long numValue;
try {
numValue = Long.parseLong(valueStr);
numValue = Long.parseLong(value);
baseMatch = numValue >= headerMatcher.range().start()
&& numValue <= headerMatcher.range().end();
} catch (NumberFormatException ignored) {
baseMatch = false;
}
} else if (headerMatcher.prefix() != null) {
baseMatch = valueStr.startsWith(headerMatcher.prefix());
baseMatch = value.startsWith(headerMatcher.prefix());
} else {
baseMatch = valueStr.endsWith(headerMatcher.suffix());
baseMatch = value.endsWith(headerMatcher.suffix());
}
return baseMatch != headerMatcher.inverted();
}
Expand Down Expand Up @@ -1033,7 +1067,7 @@ public void run() {
}

/**
* Grouping of the list of usable routes and their corresponding fallback timeout value.
* VirtualHost-level configuration for request routing.
*/
private static class RoutingConfig {
private final long fallbackTimeoutNano;
Expand All @@ -1042,7 +1076,7 @@ private static class RoutingConfig {
@Nullable
private final HttpFault faultConfig;

private static RoutingConfig empty =
private static final RoutingConfig empty =
new RoutingConfig(0L, Collections.<Route>emptyList(), false, null);

private RoutingConfig(
Expand Down
5 changes: 5 additions & 0 deletions xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public int nextInt(int bound) {
assertThat(nextInt).isLessThan(bound);
return nextInt;
}

@Override
public long nextLong() {
throw new UnsupportedOperationException("Should not be called");
}
}

private final FakeRandom fakeRandom = new FakeRandom();
Expand Down
Loading