@@ -134,7 +134,8 @@ public void uncaughtException(Thread t, Throwable e) {
134134 private final FakeHelper helperDelegate = new FakeHelper ();
135135 private final Helper helper =
136136 mock (Helper .class , AdditionalAnswers .delegatesTo (helperDelegate ));
137- private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl ();
137+ private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl (
138+ fakeClock .getScheduledExecutorService ());
138139 private final Deque <FakeSubchannel > subchannels = new LinkedList <>();
139140 private final FakeThrottler fakeThrottler = new FakeThrottler ();
140141 private final String channelTarget = "channelTarget" ;
@@ -296,6 +297,38 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
296297 verifyLongCounterAdd ("grpc.lb.rls.target_picks" , 1 , 1 , "wilderness" , "fail" );
297298 }
298299
300+ @ Test
301+ public void fallbackWithDelay_succeeds () throws Exception {
302+ fakeRlsServerImpl .setResponseDelay (100 , TimeUnit .MILLISECONDS );
303+ grpcCleanupRule .register (
304+ InProcessServerBuilder .forName ("fake-bigtable.googleapis.com" )
305+ .addService (ServerServiceDefinition .builder ("com.google" )
306+ .addMethod (fakeSearchMethod , (call , headers ) -> {
307+ call .sendHeaders (new Metadata ());
308+ call .sendMessage (null );
309+ call .close (Status .OK , new Metadata ());
310+ return new ServerCall .Listener <Void >() {};
311+ })
312+ .build ())
313+ .addService (fakeRlsServerImpl )
314+ .directExecutor ()
315+ .build ()
316+ .start ());
317+ ManagedChannel channel = grpcCleanupRule .register (
318+ InProcessChannelBuilder .forName ("fake-bigtable.googleapis.com" )
319+ .defaultServiceConfig (parseJson (getServiceConfigJsonStr ()))
320+ .directExecutor ()
321+ .build ());
322+
323+ StreamRecorder <Void > recorder = StreamRecorder .create ();
324+ StreamObserver <Void > requestObserver = ClientCalls .asyncClientStreamingCall (
325+ channel .newCall (fakeSearchMethod , CallOptions .DEFAULT ), recorder );
326+ requestObserver .onCompleted ();
327+ fakeClock .forwardTime (100 , TimeUnit .MILLISECONDS );
328+ assertThat (recorder .awaitCompletion (10 , TimeUnit .SECONDS )).isTrue ();
329+ assertThat (recorder .getError ()).isNull ();
330+ }
331+
299332 @ Test
300333 public void metricsWithRealChannel () throws Exception {
301334 grpcCleanupRule .register (
@@ -308,6 +341,7 @@ public void metricsWithRealChannel() throws Exception {
308341 return new ServerCall .Listener <Void >() {};
309342 })
310343 .build ())
344+ .addService (fakeRlsServerImpl )
311345 .directExecutor ()
312346 .build ()
313347 .start ());
@@ -761,17 +795,41 @@ private static final class FakeRlsServerImpl
761795 private static final Converter <RouteLookupResponse , io .grpc .lookup .v1 .RouteLookupResponse >
762796 RESPONSE_CONVERTER = new RouteLookupResponseConverter ().reverse ();
763797
798+ private final ScheduledExecutorService scheduler ;
799+ private long delay ;
800+ private TimeUnit delayUnit ;
801+
802+ public FakeRlsServerImpl (ScheduledExecutorService scheduler ) {
803+ this .scheduler = scheduler ;
804+ }
805+
764806 private Map <RouteLookupRequest , RouteLookupResponse > lookupTable = ImmutableMap .of ();
765807
766808 private void setLookupTable (Map <RouteLookupRequest , RouteLookupResponse > lookupTable ) {
767809 this .lookupTable = checkNotNull (lookupTable , "lookupTable" );
768810 }
769811
812+ void setResponseDelay (long delay , TimeUnit unit ) {
813+ this .delay = delay ;
814+ this .delayUnit = unit ;
815+ }
816+
770817 @ Override
818+ @ SuppressWarnings ("FutureReturnValueIgnored" )
771819 public void routeLookup (io .grpc .lookup .v1 .RouteLookupRequest request ,
772820 StreamObserver <io .grpc .lookup .v1 .RouteLookupResponse > responseObserver ) {
773821 RouteLookupResponse response =
774822 lookupTable .get (REQUEST_CONVERTER .convert (request ));
823+ Runnable sendResponse = () -> sendResponse (response , responseObserver );
824+ if (delay != 0 ) {
825+ scheduler .schedule (sendResponse , delay , delayUnit );
826+ } else {
827+ sendResponse .run ();
828+ }
829+ }
830+
831+ private void sendResponse (RouteLookupResponse response ,
832+ StreamObserver <io .grpc .lookup .v1 .RouteLookupResponse > responseObserver ) {
775833 if (response == null ) {
776834 responseObserver .onError (new RuntimeException ("not found" ));
777835 } else {
0 commit comments