3636import io .grpc .SynchronizationContext ;
3737import io .grpc .SynchronizationContext .ScheduledHandle ;
3838import io .grpc .internal .BackoffPolicy ;
39+ import io .grpc .stub .ClientCallStreamObserver ;
40+ import io .grpc .stub .ClientResponseObserver ;
3941import io .grpc .stub .StreamObserver ;
4042import io .grpc .xds .Bootstrapper .ServerInfo ;
4143import io .grpc .xds .EnvoyProtoData .Node ;
@@ -71,6 +73,7 @@ final class AbstractXdsClient {
7173 private final BackoffPolicy .Provider backoffPolicyProvider ;
7274 private final Stopwatch stopwatch ;
7375 private final Node bootstrapNode ;
76+ private final XdsClient .TimerLaunch timerLaunch ;
7477
7578 // Last successfully applied version_info for each resource type. Starts with empty string.
7679 // A version_info is used to update management server with client's most recent knowledge of
@@ -98,7 +101,8 @@ final class AbstractXdsClient {
98101 timeService ,
99102 SynchronizationContext syncContext ,
100103 BackoffPolicy .Provider backoffPolicyProvider ,
101- Supplier <Stopwatch > stopwatchSupplier ) {
104+ Supplier <Stopwatch > stopwatchSupplier ,
105+ XdsClient .TimerLaunch timerLaunch ) {
102106 this .serverInfo = checkNotNull (serverInfo , "serverInfo" );
103107 this .channel = checkNotNull (xdsChannelFactory , "xdsChannelFactory" ).create (serverInfo );
104108 this .xdsResponseHandler = checkNotNull (xdsResponseHandler , "xdsResponseHandler" );
@@ -108,6 +112,7 @@ final class AbstractXdsClient {
108112 this .timeService = checkNotNull (timeService , "timeService" );
109113 this .syncContext = checkNotNull (syncContext , "syncContext" );
110114 this .backoffPolicyProvider = checkNotNull (backoffPolicyProvider , "backoffPolicyProvider" );
115+ this .timerLaunch = checkNotNull (timerLaunch , "timerLaunch" );
111116 stopwatch = checkNotNull (stopwatchSupplier , "stopwatchSupplier" ).get ();
112117 logId = InternalLogId .allocate ("xds-client" , serverInfo .target ());
113118 logger = XdsLogger .withLogId (logId );
@@ -199,6 +204,22 @@ boolean isInBackoff() {
199204 return rpcRetryTimer != null && rpcRetryTimer .isPending ();
200205 }
201206
207+ boolean isReady () {
208+ return adsStream != null && adsStream .isReady ();
209+ }
210+
211+ /**
212+ * Starts a timer for each requested resource that hasn't been responded to and
213+ * has been waiting for the channel to get ready.
214+ */
215+ void readyHandler () {
216+ if (!isReady ()) {
217+ return ;
218+ }
219+
220+ timerLaunch .startSubscriberTimersIfNeeded (serverInfo );
221+ }
222+
202223 /**
203224 * Establishes the RPC connection by creating a new RPC stream on the given channel for
204225 * xDS protocol communication.
@@ -262,6 +283,8 @@ private abstract class AbstractAdsStream {
262283
263284 abstract void sendError (Exception error );
264285
286+ abstract boolean isReady ();
287+
265288 /**
266289 * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
267290 * {@code errorDetail}. Used for reacting to a specific discovery response. For
@@ -344,13 +367,26 @@ private void cleanUp() {
344367 private final class AdsStreamV2 extends AbstractAdsStream {
345368 private StreamObserver <io .envoyproxy .envoy .api .v2 .DiscoveryRequest > requestWriter ;
346369
370+ @ Override
371+ public boolean isReady () {
372+ return requestWriter != null && ((ClientCallStreamObserver <?>) requestWriter ).isReady ();
373+ }
374+
347375 @ Override
348376 void start () {
349377 io .envoyproxy .envoy .service .discovery .v2 .AggregatedDiscoveryServiceGrpc
350378 .AggregatedDiscoveryServiceStub stub =
351379 io .envoyproxy .envoy .service .discovery .v2 .AggregatedDiscoveryServiceGrpc .newStub (channel );
352380 StreamObserver <io .envoyproxy .envoy .api .v2 .DiscoveryResponse > responseReaderV2 =
353- new StreamObserver <io .envoyproxy .envoy .api .v2 .DiscoveryResponse >() {
381+ new ClientResponseObserver <io .envoyproxy .envoy .api .v2 .DiscoveryRequest ,
382+ io .envoyproxy .envoy .api .v2 .DiscoveryResponse >() {
383+
384+ @ Override
385+ public void beforeStart (
386+ ClientCallStreamObserver <io .envoyproxy .envoy .api .v2 .DiscoveryRequest > reqStream ) {
387+ reqStream .setOnReadyHandler (AbstractXdsClient .this ::readyHandler );
388+ }
389+
354390 @ Override
355391 public void onNext (final io .envoyproxy .envoy .api .v2 .DiscoveryResponse response ) {
356392 syncContext .execute (new Runnable () {
@@ -427,11 +463,23 @@ void sendError(Exception error) {
427463 private final class AdsStreamV3 extends AbstractAdsStream {
428464 private StreamObserver <DiscoveryRequest > requestWriter ;
429465
466+ @ Override
467+ public boolean isReady () {
468+ return requestWriter != null && ((ClientCallStreamObserver <?>) requestWriter ).isReady ();
469+ }
470+
430471 @ Override
431472 void start () {
432473 AggregatedDiscoveryServiceGrpc .AggregatedDiscoveryServiceStub stub =
433474 AggregatedDiscoveryServiceGrpc .newStub (channel );
434- StreamObserver <DiscoveryResponse > responseReader = new StreamObserver <DiscoveryResponse >() {
475+ StreamObserver <DiscoveryResponse > responseReader =
476+ new ClientResponseObserver <DiscoveryRequest ,DiscoveryResponse >() {
477+
478+ @ Override
479+ public void beforeStart (ClientCallStreamObserver <DiscoveryRequest > requestStream ) {
480+ requestStream .setOnReadyHandler (AbstractXdsClient .this ::readyHandler );
481+ }
482+
435483 @ Override
436484 public void onNext (final DiscoveryResponse response ) {
437485 syncContext .execute (new Runnable () {
0 commit comments