Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ commons-math3 = "org.apache.commons:commons-math3:3.6.1"
conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2"
cronet-api = "org.chromium.net:cronet-api:119.6045.31"
cronet-embedded = "org.chromium.net:cronet-embedded:119.6045.31"
dev-cel-compiler = "dev.cel:compiler:0.9.1-proto3"
dev-cel-protobuf = "dev.cel:protobuf:0.9.1-proto3"
dev-cel-runtime = "dev.cel:runtime:0.9.1-proto3"
# error-prone 2.31.0+ blocked on https://github.com/grpc/grpc-java/issues/10152
# It breaks Bazel (ArrayIndexOutOfBoundsException in turbine) and Dexing ("D8:
# java.lang.NullPointerException"). We can trivially upgrade the Bazel CI to
# 6.3.0+ (https://github.com/bazelbuild/bazel/issues/18743).
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.30.0"
errorprone-annotations = "com.google.errorprone:error_prone_annotations:2.36.0"
# error-prone 2.32.0+ require Java 17+
errorprone-core = "com.google.errorprone:error_prone_core:2.31.0"
google-api-protos = "com.google.api.grpc:proto-google-common-protos:2.51.0"
Expand Down
5 changes: 4 additions & 1 deletion xds/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ dependencies {
project(':grpc-services'),
project(':grpc-auth'),
project(path: ':grpc-alts', configuration: 'shadow'),
libraries.dev.cel.runtime,
libraries.dev.cel.protobuf,
libraries.guava,
libraries.gson,
libraries.re2j,
Expand All @@ -72,7 +74,8 @@ dependencies {
compileOnly libraries.netty.transport.epoll

testImplementation project(':grpc-testing'),
project(':grpc-testing-proto')
project(':grpc-testing-proto'),
libraries.dev.cel.compiler
testImplementation (libraries.netty.transport.epoll) {
artifact {
classifier = "linux-x86_64"
Expand Down
3 changes: 2 additions & 1 deletion xds/src/main/java/io/grpc/xds/FilterRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() {
instance = newRegistry().register(
new FaultFilter.Provider(),
new RouterFilter.Provider(),
new RbacFilter.Provider());
new RbacFilter.Provider(),
new RlqsFilter.Provider());
}
return instance;
}
Expand Down
9 changes: 9 additions & 0 deletions xds/src/main/java/io/grpc/xds/MessagePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.xds;

import com.github.xds.type.matcher.v3.CelMatcher;
import com.github.xds.type.matcher.v3.HttpAttributesCelMatchInput;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
Expand All @@ -28,6 +30,8 @@
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBAC;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBACPerRoute;
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
Expand Down Expand Up @@ -58,6 +62,11 @@ private static JsonFormat.Printer newPrinter() {
.add(RBAC.getDescriptor())
.add(RBACPerRoute.getDescriptor())
.add(Router.getDescriptor())
// RLQS
.add(RateLimitQuotaFilterConfig.getDescriptor())
.add(RateLimitQuotaOverride.getDescriptor())
.add(HttpAttributesCelMatchInput.getDescriptor())
.add(CelMatcher.getDescriptor())
// UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported
// by top-level resource types.
.add(UpstreamTlsContext.getDescriptor())
Expand Down
309 changes: 309 additions & 0 deletions xds/src/main/java/io/grpc/xds/RlqsFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
import static io.grpc.xds.client.XdsResourceType.unpackAny;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaBucketSettings;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
import io.grpc.ChannelCredentials;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InternalLogId;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.SynchronizationContext;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.xds.client.Bootstrapper.RemoteServerInfo;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import io.grpc.xds.internal.datatype.GrpcService;
import io.grpc.xds.internal.matchers.HttpMatchInput;
import io.grpc.xds.internal.matchers.Matcher;
import io.grpc.xds.internal.matchers.MatcherList;
import io.grpc.xds.internal.matchers.OnMatch;
import io.grpc.xds.internal.rlqs.RlqsBucketSettings;
import io.grpc.xds.internal.rlqs.RlqsFilterState;
import io.grpc.xds.internal.rlqs.RlqsRateLimitResult;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/** RLQS xDS HTTP filter implementation. */
// TODO(sergiitk): introduce a layer between the filter and interceptor.
// lds has filter names and the names are unique - even for server instances.
final class RlqsFilter implements Filter {
private final XdsLogger logger;

static final boolean enabled = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_ENABLE_RLQS", false);

// TODO(sergiitk): [IMPL] remove
// Do do not fail on parsing errors, only log requests.
static final boolean dryRun = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_RLQS_DRY_RUN", false);

static final String TYPE_URL = "type.googleapis.com/"
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig";
static final String TYPE_URL_OVERRIDE_CONFIG = "type.googleapis.com/"
+ "envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride";

private final String name;
private final SynchronizationContext syncContext;
// TODO(sergiitk): [QUESTION] always in sync context?
private volatile boolean shutdown = false;
// TODO(sergiitk): [IMPL] figure out what to use here.
// TODO(sergiitk): [IMPL] scheduler - consider using GrpcUtil.TIMER_SERVICE.
// TODO(sergiitk): [IMPL] note that the scheduler has a finite lifetime.
private final ScheduledExecutorService scheduler =
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);

// Filter state is unique per parsed filter config.
private final ConcurrentMap<RlqsFilterConfig, RlqsFilterState> filterStateCache =
new ConcurrentHashMap<>();

public RlqsFilter(String name) {
this.name = name;
logger = XdsLogger.withLogId(InternalLogId.allocate(this.getClass(), null));
logger.log(XdsLogLevel.DEBUG,
"Created RlqsFilter name='%s' with enabled=%s, dryRun=%s", name, enabled, dryRun);

syncContext = new SynchronizationContext((thread, error) -> {
String message = "Uncaught exception in RlqsFilter SynchronizationContext. Panic!";
// TODO(sergiitk): update to the new signature when ready.
logger.log(XdsLogLevel.DEBUG,
message + " {0} \nTrace:\n {1}", error, Throwables.getStackTraceAsString(error));
// TODO(sergiitk): [IMPL] do we need separate RlqsCacheSynchronizationException?
throw new RuntimeException(message, error);
});
}

static final class Provider implements Filter.Provider {
private static final Logger logger = Logger.getLogger(Provider.class.getName());

@Override
public String[] typeUrls() {
return new String[]{TYPE_URL, TYPE_URL_OVERRIDE_CONFIG};
}

@Override
public boolean isServerFilter() {
return true;
}

@Override
public RlqsFilter newInstance(String name) {
return new RlqsFilter(name);
}

@Override
public ConfigOrError<RlqsFilterConfig> parseFilterConfig(Message rawProtoMessage) {
try {
RlqsFilterConfig rlqsFilterConfig =
parseRlqsFilter(unpackAny(rawProtoMessage, RateLimitQuotaFilterConfig.class));
return ConfigOrError.fromConfig(rlqsFilterConfig);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Can't unpack RateLimitQuotaFilterConfig proto: " + e);
} catch (ResourceInvalidException e) {
return ConfigOrError.fromError(e.getMessage());
}
}

@Override
public ConfigOrError<RlqsFilterConfig> parseFilterConfigOverride(Message rawProtoMessage) {
try {
RlqsFilterConfig rlqsFilterConfig =
parseRlqsFilterOverride(unpackAny(rawProtoMessage, RateLimitQuotaOverride.class));
return ConfigOrError.fromConfig(rlqsFilterConfig);
} catch (InvalidProtocolBufferException e) {
return ConfigOrError.fromError("Can't unpack RateLimitQuotaOverride proto: " + e);
} catch (ResourceInvalidException e) {
return ConfigOrError.fromError(e.getMessage());
}
}

@VisibleForTesting
RlqsFilterConfig parseRlqsFilter(RateLimitQuotaFilterConfig rlqsFilterProto)
throws ResourceInvalidException, InvalidProtocolBufferException {
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
if (rlqsFilterProto.getDomain().isEmpty()) {
throw new ResourceInvalidException("RateLimitQuotaFilterConfig domain is required");
}
builder.domain(rlqsFilterProto.getDomain())
.rlqsService(GrpcService.fromEnvoyProto(rlqsFilterProto.getRlqsServer()));

// TODO(sergiitk): [IMPL] Remove
if (dryRun) {
logger.finest("RLQS DRY RUN: not parsing matchers");
return builder.build();
}

// TODO(sergiitk): [IMPL] actually parse, move to RlqsBucketSettings.fromProto()
RateLimitQuotaBucketSettings fallbackBucketSettingsProto = unpackAny(
rlqsFilterProto.getBucketMatchers().getOnNoMatch().getAction().getTypedConfig(),
RateLimitQuotaBucketSettings.class);
RlqsBucketSettings fallbackBucket = RlqsBucketSettings.create(
ImmutableMap.of("bucket_id", headers -> "hello"),
fallbackBucketSettingsProto.getReportingInterval());

// TODO(sergiitk): [IMPL] actually parse, move to Matcher.fromProto()
Matcher<HttpMatchInput, RlqsBucketSettings> bucketMatchers = new RlqsMatcher(fallbackBucket);

return builder.bucketMatchers(bucketMatchers).build();
}
}

@Override
public void close() {
// TODO(sergiitk): [DESIGN] besides shutting down everything, should there
// be per-route interceptor destructors?
if (shutdown) {
return;
}

syncContext.execute(() -> {
shutdown = true;
logger.log(XdsLogLevel.DEBUG, "Shutting down RlqsFilter name='%s'", name);
for (RlqsFilterConfig rlqsFilterConfig : filterStateCache.keySet()) {
filterStateCache.get(rlqsFilterConfig).shutdown();
}
filterStateCache.clear();
shutdown = false;
});
}

// @Override
public boolean isEnabled() {
return enabled;
}

@Nullable
@Override
public ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig) {
if (shutdown) {
return null;
}

// Called when we get an xds update - when the LRS or RLS changes.
RlqsFilterConfig rlqsFilterConfig = (RlqsFilterConfig) checkNotNull(config, "config");
checkNotNull(rlqsFilterConfig.rlqsService(), "config.rlqsService");

// Per-route and per-host configuration overrides.
if (overrideConfig != null) {
RlqsFilterConfig rlqsFilterOverride = (RlqsFilterConfig) overrideConfig;
// All fields are inherited from the main config, unless overridden.
RlqsFilterConfig.Builder overrideBuilder = rlqsFilterConfig.toBuilder();
if (!rlqsFilterOverride.domain().isEmpty()) {
overrideBuilder.domain(rlqsFilterOverride.domain());
}
if (rlqsFilterOverride.bucketMatchers() != null) {
overrideBuilder.bucketMatchers(rlqsFilterOverride.bucketMatchers());
}
// Override bucket matchers if not null.
rlqsFilterConfig = overrideBuilder.build();
}

RlqsFilterState rlqsFilterState = getOrCreateFilterState(rlqsFilterConfig);
return new RlqsServerInterceptor(rlqsFilterState);
}

private RlqsFilterState getOrCreateFilterState(final RlqsFilterConfig config) {
// TODO(sergiitk): [IMPL] get channel creds from the bootstrap.
return filterStateCache.computeIfAbsent(config, (k) -> {
ChannelCredentials creds = InsecureChannelCredentials.create();
return new RlqsFilterState(
RemoteServerInfo.create(config.rlqsService().targetUri(), creds),
config.domain(),
config.bucketMatchers(),
config.hashCode(),
scheduler);
});
}

static class RlqsMatcher extends Matcher<HttpMatchInput, RlqsBucketSettings> {
private final RlqsBucketSettings fallbackBucket;

RlqsMatcher(RlqsBucketSettings fallbackBucket) {
this.fallbackBucket = fallbackBucket;
}

@Nullable
@Override
public MatcherList<HttpMatchInput, RlqsBucketSettings> matcherList() {
return null;
}

@Override
public OnMatch<HttpMatchInput, RlqsBucketSettings> onNoMatch() {
return OnMatch.ofAction(fallbackBucket);
}

@Override
public RlqsBucketSettings match(HttpMatchInput input) {
return null;
}
}

@VisibleForTesting
static RlqsFilterConfig parseRlqsFilterOverride(RateLimitQuotaOverride rlqsFilterProtoOverride)
throws ResourceInvalidException {
RlqsFilterConfig.Builder builder = RlqsFilterConfig.builder();
// TODO(sergiitk): [IMPL] bucket_matchers.

return builder.domain(rlqsFilterProtoOverride.getDomain()).build();
}

private static class RlqsServerInterceptor implements ServerInterceptor {
private final RlqsFilterState rlqsFilterState;

public RlqsServerInterceptor(RlqsFilterState rlqsFilterState) {
this.rlqsFilterState = rlqsFilterState;
}

@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
HttpMatchInput httpMatchInput = HttpMatchInput.create(headers, call);

// TODO(sergiitk): [IMPL] Remove
if (dryRun) {
// logger.log(XdsLogLevel.INFO, "RLQS DRY RUN: request <<" + httpMatchInput + ">>");
return next.startCall(call, headers);
}

RlqsRateLimitResult result = rlqsFilterState.rateLimit(httpMatchInput);
if (result.isAllowed()) {
return next.startCall(call, headers);
}
RlqsRateLimitResult.DenyResponse denyResponse = result.denyResponse().get();
call.close(denyResponse.status(), denyResponse.headersToAdd());
return new Listener<ReqT>(){};
}
}
}
Loading