Skip to content

Commit 5beae3a

Browse files
authored
Updating ServerInterceptors.java to support different marshallers for Request and Response messages. (#9877)
Fixes #9870
1 parent f6a0028 commit 5beae3a

File tree

2 files changed

+101
-2
lines changed

2 files changed

+101
-2
lines changed

api/src/main/java/io/grpc/ServerInterceptors.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,40 @@ private static final class KnownLengthBufferedInputStream extends BufferedInputS
184184
public static <T> ServerServiceDefinition useMarshalledMessages(
185185
final ServerServiceDefinition serviceDef,
186186
final MethodDescriptor.Marshaller<T> marshaller) {
187+
return useMarshalledMessages(serviceDef, marshaller, marshaller);
188+
}
189+
190+
/**
191+
* Create a new {@code ServerServiceDefinition} with {@link MethodDescriptor} for deserializing
192+
* requests and separate {@link MethodDescriptor} for serializing responses. The {@code
193+
* ServerCallHandler} created will automatically convert back to the original types for request
194+
* and response before calling the existing {@code ServerCallHandler}. Calling this method
195+
* combined with the intercept methods will allow the developer to choose whether to intercept
196+
* messages of ReqT/RespT, or the modeled types of their application. This can also be chained
197+
* to allow for interceptors to handle messages as multiple different ReqT/RespT types within
198+
* the chain if the added cost of serialization is not a concern.
199+
*
200+
* @param serviceDef the sevice definition to add request and response marshallers to.
201+
* @param requestMarshaller request marshaller
202+
* @param responseMarshaller response marshaller
203+
* @param <ReqT> the request payload type
204+
* @param <RespT> the response payload type.
205+
* @return a wrapped version of {@code serviceDef} with the ReqT and RespT conversion applied.
206+
*/
207+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9870")
208+
public static <ReqT, RespT> ServerServiceDefinition useMarshalledMessages(
209+
final ServerServiceDefinition serviceDef,
210+
final MethodDescriptor.Marshaller<ReqT> requestMarshaller,
211+
final MethodDescriptor.Marshaller<RespT> responseMarshaller) {
187212
List<ServerMethodDefinition<?, ?>> wrappedMethods =
188213
new ArrayList<>();
189214
List<MethodDescriptor<?, ?>> wrappedDescriptors =
190215
new ArrayList<>();
191216
// Wrap the descriptors
192217
for (final ServerMethodDefinition<?, ?> definition : serviceDef.getMethods()) {
193218
final MethodDescriptor<?, ?> originalMethodDescriptor = definition.getMethodDescriptor();
194-
final MethodDescriptor<T, T> wrappedMethodDescriptor =
195-
originalMethodDescriptor.toBuilder(marshaller, marshaller).build();
219+
final MethodDescriptor<ReqT, RespT> wrappedMethodDescriptor =
220+
originalMethodDescriptor.toBuilder(requestMarshaller, responseMarshaller).build();
196221
wrappedDescriptors.add(wrappedMethodDescriptor);
197222
wrappedMethods.add(wrapMethod(definition, wrappedMethodDescriptor));
198223
}

api/src/test/java/io/grpc/ServerInterceptorsTest.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,80 @@ public void onMessage(ReqT message) {
425425
order);
426426
}
427427

428+
/**
429+
* Tests the ServerInterceptors#useMarshalledMessages()} with two marshallers. Makes sure that
430+
* on incoming request the request marshaller's stream method is called and on response the
431+
* response marshaller's parse method is called
432+
*/
433+
@Test
434+
@SuppressWarnings("unchecked")
435+
public void distinctMarshallerForRequestAndResponse() {
436+
final List<String> requestFlowOrder = new ArrayList<>();
437+
438+
final Marshaller<String> requestMarshaller = new Marshaller<String>() {
439+
@Override
440+
public InputStream stream(String value) {
441+
requestFlowOrder.add("RequestStream");
442+
return null;
443+
}
444+
445+
@Override
446+
public String parse(InputStream stream) {
447+
requestFlowOrder.add("RequestParse");
448+
return null;
449+
}
450+
};
451+
final Marshaller<String> responseMarshaller = new Marshaller<String>() {
452+
@Override
453+
public InputStream stream(String value) {
454+
requestFlowOrder.add("ResponseStream");
455+
return null;
456+
}
457+
458+
@Override
459+
public String parse(InputStream stream) {
460+
requestFlowOrder.add("ResponseParse");
461+
return null;
462+
}
463+
};
464+
final Marshaller<Holder> dummyMarshaller = new Marshaller<Holder>() {
465+
@Override
466+
public InputStream stream(Holder value) {
467+
return value.get();
468+
}
469+
470+
@Override
471+
public Holder parse(InputStream stream) {
472+
return new Holder(stream);
473+
}
474+
};
475+
ServerCallHandler<Holder, Holder> handler = (call, headers) -> new Listener<Holder>() {
476+
@Override
477+
public void onMessage(Holder message) {
478+
requestFlowOrder.add("handler");
479+
call.sendMessage(message);
480+
}
481+
};
482+
483+
MethodDescriptor<Holder, Holder> wrappedMethod = MethodDescriptor.<Holder, Holder>newBuilder()
484+
.setType(MethodType.UNKNOWN)
485+
.setFullMethodName("basic/wrapped")
486+
.setRequestMarshaller(dummyMarshaller)
487+
.setResponseMarshaller(dummyMarshaller)
488+
.build();
489+
ServerServiceDefinition serviceDef = ServerServiceDefinition.builder(
490+
new ServiceDescriptor("basic", wrappedMethod))
491+
.addMethod(wrappedMethod, handler).build();
492+
ServerServiceDefinition intercepted = ServerInterceptors.useMarshalledMessages(serviceDef,
493+
requestMarshaller, responseMarshaller);
494+
ServerMethodDefinition<String, String> serverMethod =
495+
(ServerMethodDefinition<String, String>) intercepted.getMethod("basic/wrapped");
496+
ServerCall<String, String> serverCall = new NoopServerCall<>();
497+
serverMethod.getServerCallHandler().startCall(serverCall, headers).onMessage("TestMessage");
498+
499+
assertEquals(Arrays.asList("RequestStream", "handler", "ResponseParse"), requestFlowOrder);
500+
}
501+
428502
@SuppressWarnings("unchecked")
429503
private static ServerMethodDefinition<String, Integer> getSoleMethod(
430504
ServerServiceDefinition serviceDef) {

0 commit comments

Comments
 (0)