Skip to content

Commit de910cc

Browse files
committed
Add ChannelBuilderCustomizer for custom gRPC Channel handling
1 parent 400daf0 commit de910cc

File tree

8 files changed

+170
-9
lines changed

8 files changed

+170
-9
lines changed

docs/src/main/asciidoc/grpc-service-consumption.adoc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,50 @@ Don't forget to replace it with the name you used in the `@GrpcClient` annotatio
217217

218218
IMPORTANT: When you enable `quarkus.grpc.clients."client-name".xds.enabled`, it's the xDS that should handle most of the configuration above.
219219

220+
=== Custom Channel building
221+
222+
When Quarkus builds a gRPC Channel instance (the way gRPC clients communicate with gRPC services on a lower network level), users can apply their own Channel(Builder) customizers. The customizers are applied by `priority`, the higher the number the later customizer is applied. The customizers are applied before Quarkus applies user's client configuration; e.g. ideal for some initial defaults per all clients.
223+
224+
There are two `customize` methods, the first one uses gRPC's `ManagedChannelBuilder` as a parameter - to be used with Quarkus' legacy gRPC support, where the other uses `GrpcClientOptions` - to be used with the new Vert.x gRPC support. User should implement the right `customize` method per gRPC support type usage, or both if the customizer is gRPC type neutral.
225+
226+
[source, java]
227+
----
228+
public interface ChannelBuilderCustomizer<T extends ManagedChannelBuilder<T>> {
229+
230+
/**
231+
* Customize a ManagedChannelBuilder instance.
232+
*
233+
* @param name gRPC client name
234+
* @param config client's configuration
235+
* @param builder Channel builder instance
236+
* @return map of config properties to be used as default service config against the builder
237+
*/
238+
default Map<String, Object> customize(String name, GrpcClientConfiguration config, T builder) {
239+
return Map.of();
240+
}
241+
242+
/**
243+
* Customize a GrpcClientOptions instance.
244+
*
245+
* @param name gRPC client name
246+
* @param config client's configuration
247+
* @param options GrpcClientOptions instance
248+
*/
249+
default void customize(String name, GrpcClientConfiguration config, GrpcClientOptions options) {
250+
}
251+
252+
/**
253+
* Priority by which the customizers are applied.
254+
* Higher priority is applied later.
255+
*
256+
* @return the priority
257+
*/
258+
default int priority() {
259+
return 0;
260+
}
261+
}
262+
----
263+
220264
=== Enabling TLS
221265

222266
To enable TLS, use the following configuration.

extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcClientProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,11 @@ UnremovableBeanBuildItem unremovableClientInterceptors() {
437437
return UnremovableBeanBuildItem.beanTypes(GrpcDotNames.CLIENT_INTERCEPTOR);
438438
}
439439

440+
@BuildStep
441+
UnremovableBeanBuildItem unremovableChannelBuilderCustomizers() {
442+
return UnremovableBeanBuildItem.beanTypes(GrpcDotNames.CHANNEL_BUILDER_CUSTOMIZER);
443+
}
444+
440445
Set<String> getRegisteredInterceptors(InjectionPointInfo injectionPoint) {
441446
Set<AnnotationInstance> qualifiers = injectionPoint.getRequiredQualifiers();
442447
if (qualifiers.size() <= 1) {

extensions/grpc/deployment/src/main/java/io/quarkus/grpc/deployment/GrpcDotNames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.quarkus.grpc.RegisterClientInterceptor;
2525
import io.quarkus.grpc.RegisterInterceptor;
2626
import io.quarkus.grpc.RegisterInterceptors;
27+
import io.quarkus.grpc.api.ChannelBuilderCustomizer;
2728
import io.quarkus.grpc.runtime.supports.Channels;
2829
import io.quarkus.grpc.runtime.supports.GrpcClientConfigProvider;
2930
import io.smallrye.common.annotation.Blocking;
@@ -58,6 +59,8 @@ public class GrpcDotNames {
5859
.createSimple(RegisterClientInterceptor.List.class.getName());
5960
public static final DotName CLIENT_INTERCEPTOR = DotName.createSimple(ClientInterceptor.class.getName());
6061

62+
public static final DotName CHANNEL_BUILDER_CUSTOMIZER = DotName.createSimple(ChannelBuilderCustomizer.class.getName());
63+
6164
static final MethodDescriptor CREATE_CHANNEL_METHOD = MethodDescriptor.ofMethod(Channels.class, "createChannel",
6265
Channel.class, String.class, Set.class);
6366
static final MethodDescriptor RETRIEVE_CHANNEL_METHOD = MethodDescriptor.ofMethod(Channels.class, "retrieveChannel",
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.quarkus.grpc.api;
2+
3+
import java.util.Map;
4+
5+
import io.grpc.ManagedChannelBuilder;
6+
import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
7+
import io.vertx.grpc.client.GrpcClientOptions;
8+
9+
/**
10+
* Allow for customization of Channel building.
11+
* Implement the customize method, depending on which Channel implementation you're going to use,
12+
* e.g. Vert.x or Netty.
13+
* This is an experimental API, subject to change.
14+
*/
15+
public interface ChannelBuilderCustomizer<T extends ManagedChannelBuilder<T>> {
16+
17+
/**
18+
* Customize a ManagedChannelBuilder instance.
19+
*
20+
* @param name gRPC client name
21+
* @param config client's configuration
22+
* @param builder Channel builder instance
23+
* @return map of config properties to be used as default service config against the builder
24+
*/
25+
default Map<String, Object> customize(String name, GrpcClientConfiguration config, T builder) {
26+
return Map.of();
27+
}
28+
29+
/**
30+
* Customize a GrpcClientOptions instance.
31+
*
32+
* @param name gRPC client name
33+
* @param config client's configuration
34+
* @param options GrpcClientOptions instance
35+
*/
36+
default void customize(String name, GrpcClientConfiguration config, GrpcClientOptions options) {
37+
}
38+
39+
/**
40+
* Priority by which the customizers are applied.
41+
* Higher priority is applied later.
42+
*
43+
* @return the priority
44+
*/
45+
default int priority() {
46+
return 0;
47+
}
48+
}

extensions/grpc/runtime/src/main/java/io/quarkus/grpc/runtime/supports/Channels.java

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.nio.file.Path;
1616
import java.time.Duration;
1717
import java.util.ArrayList;
18+
import java.util.Comparator;
1819
import java.util.HashSet;
1920
import java.util.List;
2021
import java.util.Map;
@@ -27,6 +28,8 @@
2728
import java.util.concurrent.TimeoutException;
2829

2930
import jakarta.enterprise.context.spi.CreationalContext;
31+
import jakarta.enterprise.inject.Any;
32+
import jakarta.enterprise.util.TypeLiteral;
3033

3134
import org.eclipse.microprofile.context.ManagedExecutor;
3235
import org.jboss.logging.Logger;
@@ -50,6 +53,7 @@
5053
import io.quarkus.arc.InstanceHandle;
5154
import io.quarkus.grpc.GrpcClient;
5255
import io.quarkus.grpc.RegisterClientInterceptor;
56+
import io.quarkus.grpc.api.ChannelBuilderCustomizer;
5357
import io.quarkus.grpc.runtime.ClientInterceptorStorage;
5458
import io.quarkus.grpc.runtime.GrpcClientInterceptorContainer;
5559
import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
@@ -145,6 +149,13 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
145149
}
146150
}
147151

152+
List<ChannelBuilderCustomizer<?>> channelBuilderCustomizers = container
153+
.select(new TypeLiteral<ChannelBuilderCustomizer<?>>() {
154+
}, Any.Literal.INSTANCE)
155+
.stream()
156+
.sorted(Comparator.<ChannelBuilderCustomizer<?>, Integer> comparing(ChannelBuilderCustomizer::priority))
157+
.toList();
158+
148159
boolean plainText = config.ssl().trustStore().isEmpty();
149160
Optional<Boolean> usePlainText = config.plainText();
150161
if (usePlainText.isPresent()) {
@@ -187,8 +198,17 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
187198
if (provider != null) {
188199
builder = provider.createChannelBuilder(config, target);
189200
} else {
190-
builder = NettyChannelBuilder
191-
.forTarget(target)
201+
builder = NettyChannelBuilder.forTarget(target);
202+
}
203+
204+
for (ChannelBuilderCustomizer customizer : channelBuilderCustomizers) {
205+
Map<String, Object> map = customizer.customize(name, config, builder);
206+
builder.defaultServiceConfig(map);
207+
}
208+
209+
if (builder instanceof NettyChannelBuilder) {
210+
NettyChannelBuilder ncBuilder = (NettyChannelBuilder) builder;
211+
builder = ncBuilder
192212
// clients are intercepted using the IOThreadClientInterceptor interceptor which will decide on which
193213
// thread the messages should be processed.
194214
.directExecutor() // will use I/O thread - must not be blocked.
@@ -201,6 +221,10 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
201221
.maxInboundMetadataSize(config.maxInboundMetadataSize().orElse(DEFAULT_MAX_HEADER_LIST_SIZE))
202222
.maxInboundMessageSize(config.maxInboundMessageSize().orElse(DEFAULT_MAX_MESSAGE_SIZE))
203223
.negotiationType(NegotiationType.valueOf(config.negotiationType().toUpperCase()));
224+
225+
if (context != null) {
226+
ncBuilder.sslContext(context);
227+
}
204228
}
205229

206230
if (config.retry()) {
@@ -242,10 +266,6 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
242266
if (plainText && provider == null) {
243267
builder.usePlaintext();
244268
}
245-
if (context != null && (builder instanceof NettyChannelBuilder)) {
246-
NettyChannelBuilder ncBuilder = (NettyChannelBuilder) builder;
247-
ncBuilder.sslContext(context);
248-
}
249269

250270
interceptorContainer.getSortedPerServiceInterceptors(perClientInterceptors).forEach(builder::intercept);
251271
interceptorContainer.getSortedGlobalInterceptors().forEach(builder::intercept);
@@ -259,6 +279,15 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
259279
HttpClientOptions options = new HttpClientOptions();
260280
options.setHttp2ClearTextUpgrade(false); // this fixes i30379
261281

282+
// Start with almost empty options and default max msg size ...
283+
GrpcClientOptions clientOptions = new GrpcClientOptions()
284+
.setTransportOptions(options)
285+
.setMaxMessageSize(config.maxInboundMessageSize().orElse(DEFAULT_MAX_MESSAGE_SIZE));
286+
287+
for (ChannelBuilderCustomizer customizer : channelBuilderCustomizers) {
288+
customizer.customize(name, config, clientOptions);
289+
}
290+
262291
if (!plainText) {
263292
TlsConfigurationRegistry registry = Arc.container().select(TlsConfigurationRegistry.class).get();
264293

@@ -330,9 +359,10 @@ public static Channel createChannel(String name, Set<String> perClientIntercepto
330359
options.setMetricsName("grpc|" + name);
331360

332361
Vertx vertx = container.instance(Vertx.class).get();
333-
io.vertx.grpc.client.GrpcClient client = io.vertx.grpc.client.GrpcClient.client(vertx,
334-
new GrpcClientOptions().setTransportOptions(options)
335-
.setMaxMessageSize(config.maxInboundMessageSize().orElse(DEFAULT_MAX_MESSAGE_SIZE)));
362+
io.vertx.grpc.client.GrpcClient client = io.vertx.grpc.client.GrpcClient.client(
363+
vertx,
364+
clientOptions);
365+
336366
Channel channel;
337367
if (stork) {
338368
ManagedExecutor executor = container.instance(ManagedExecutor.class).get();

integration-tests/grpc-interceptors/src/main/java/io/quarkus/grpc/examples/interceptors/HelloWorldEndpoint.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public Response helloBlocking(@PathParam("name") String name) {
3232

3333
return Response.ok(helloReply.getMessage())
3434
.header("interceptors", String.join(",", invoked))
35+
.header("used", MyCBC.USED.get() + "")
3536
.build();
3637
}
3738

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.quarkus.grpc.examples.interceptors;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.atomic.AtomicBoolean;
5+
6+
import jakarta.enterprise.context.ApplicationScoped;
7+
8+
import io.grpc.netty.NettyChannelBuilder;
9+
import io.quarkus.grpc.api.ChannelBuilderCustomizer;
10+
import io.quarkus.grpc.runtime.config.GrpcClientConfiguration;
11+
import io.vertx.grpc.client.GrpcClientOptions;
12+
13+
@ApplicationScoped
14+
public class MyCBC implements ChannelBuilderCustomizer<NettyChannelBuilder> {
15+
public static final AtomicBoolean USED = new AtomicBoolean(false);
16+
17+
@Override
18+
public Map<String, Object> customize(String name, GrpcClientConfiguration config, NettyChannelBuilder builder) {
19+
USED.set(true);
20+
return Map.of();
21+
}
22+
23+
@Override
24+
public void customize(String name, GrpcClientConfiguration config, GrpcClientOptions options) {
25+
USED.set(true);
26+
}
27+
}

integration-tests/grpc-interceptors/src/test/java/io/quarkus/grpc/example/interceptors/HelloWorldEndpointTestBase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ public void testHelloWorldServiceUsingBlockingStub() {
2626
"io.quarkus.grpc.examples.interceptors.ServerInterceptors$MethodTarget");
2727

2828
ensureThatMetricsAreProduced();
29+
30+
String used = response.getHeader("used");
31+
assertThat(Boolean.parseBoolean(used)).isTrue();
2932
}
3033

3134
@Test

0 commit comments

Comments
 (0)