@@ -149,11 +149,10 @@ public void uncaughtException(Thread t, Throwable e) {
149149 this .throttle = throttle ;
150150 }
151151
152- @ SuppressWarnings ("GuardedBy" )
152+ @ SuppressWarnings ("GuardedBy" ) // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153153 @ Nullable // null if already committed
154154 @ CheckReturnValue
155155 private Runnable commit (final Substream winningSubstream ) {
156-
157156 synchronized (lock ) {
158157 if (state .winningSubstream != null ) {
159158 return null ;
@@ -165,10 +164,9 @@ private Runnable commit(final Substream winningSubstream) {
165164 // subtract the share of this RPC from channelBufferUsed.
166165 channelBufferUsed .addAndGet (-perRpcBufferUsed );
167166
167+ final boolean wasCancelled = (scheduledRetry != null ) ? scheduledRetry .isCancelled () : false ;
168168 final Future <?> retryFuture ;
169169 if (scheduledRetry != null ) {
170- // TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171- // found: 'this.lock'
172170 retryFuture = scheduledRetry .markCancelled ();
173171 scheduledRetry = null ;
174172 } else {
@@ -177,8 +175,6 @@ private Runnable commit(final Substream winningSubstream) {
177175 // cancel the scheduled hedging if it is scheduled prior to the commitment
178176 final Future <?> hedgingFuture ;
179177 if (scheduledHedging != null ) {
180- // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181- // found: 'this.lock'
182178 hedgingFuture = scheduledHedging .markCancelled ();
183179 scheduledHedging = null ;
184180 } else {
@@ -196,7 +192,21 @@ public void run() {
196192 }
197193 if (retryFuture != null ) {
198194 retryFuture .cancel (false );
195+ if (!wasCancelled && inFlightSubStreams .decrementAndGet () == Integer .MIN_VALUE ) {
196+ assert savedCloseMasterListenerReason != null ;
197+ listenerSerializeExecutor .execute (
198+ new Runnable () {
199+ @ Override
200+ public void run () {
201+ isClosed = true ;
202+ masterListener .closed (savedCloseMasterListenerReason .status ,
203+ savedCloseMasterListenerReason .progress ,
204+ savedCloseMasterListenerReason .metadata );
205+ }
206+ });
207+ }
199208 }
209+
200210 if (hedgingFuture != null ) {
201211 hedgingFuture .cancel (false );
202212 }
@@ -415,7 +425,7 @@ public final void start(ClientStreamListener listener) {
415425 drain (substream );
416426 }
417427
418- @ SuppressWarnings ("GuardedBy" )
428+ @ SuppressWarnings ("GuardedBy" ) // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
419429 private void pushbackHedging (@ Nullable Integer delayMillis ) {
420430 if (delayMillis == null ) {
421431 return ;
@@ -434,8 +444,6 @@ private void pushbackHedging(@Nullable Integer delayMillis) {
434444 return ;
435445 }
436446
437- // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
438- // found: 'this.lock'
439447 futureToBeCancelled = scheduledHedging .markCancelled ();
440448 scheduledHedging = future = new FutureCanceller (lock );
441449 }
@@ -469,16 +477,13 @@ public void run() {
469477 }
470478 callExecutor .execute (
471479 new Runnable () {
472- @ SuppressWarnings ("GuardedBy" )
480+ @ SuppressWarnings ("GuardedBy" ) //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
473481 @ Override
474482 public void run () {
475483 boolean cancelled = false ;
476484 FutureCanceller future = null ;
477485
478486 synchronized (lock ) {
479- // TODO(b/145386688): This access should be guarded by
480- // 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
481- // 'RetriableStream.this.lock'
482487 if (scheduledHedgingRef .isCancelled ()) {
483488 cancelled = true ;
484489 } else {
@@ -810,13 +815,11 @@ private boolean hasPotentialHedging(State state) {
810815 && !state .hedgingFrozen ;
811816 }
812817
813- @ SuppressWarnings ("GuardedBy" )
818+ @ SuppressWarnings ("GuardedBy" ) // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
814819 private void freezeHedging () {
815820 Future <?> futureToBeCancelled = null ;
816821 synchronized (lock ) {
817822 if (scheduledHedging != null ) {
818- // TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
819- // found: 'this.lock'
820823 futureToBeCancelled = scheduledHedging .markCancelled ();
821824 scheduledHedging = null ;
822825 }
@@ -999,9 +1002,19 @@ public void run() {
9991002 synchronized (lock ) {
10001003 scheduledRetry = scheduledRetryCopy = new FutureCanceller (lock );
10011004 }
1005+
10021006 class RetryBackoffRunnable implements Runnable {
10031007 @ Override
1008+ @ SuppressWarnings ("FutureReturnValueIgnored" )
10041009 public void run () {
1010+ synchronized (scheduledRetryCopy .lock ) {
1011+ if (scheduledRetryCopy .isCancelled ()) {
1012+ return ;
1013+ } else {
1014+ scheduledRetryCopy .markCancelled ();
1015+ }
1016+ }
1017+
10051018 callExecutor .execute (
10061019 new Runnable () {
10071020 @ Override
@@ -1563,11 +1576,16 @@ private static final class FutureCanceller {
15631576 }
15641577
15651578 void setFuture (Future <?> future ) {
1579+ boolean wasCancelled ;
15661580 synchronized (lock ) {
1567- if (!cancelled ) {
1581+ wasCancelled = cancelled ;
1582+ if (!wasCancelled ) {
15681583 this .future = future ;
15691584 }
15701585 }
1586+ if (wasCancelled ) {
1587+ future .cancel (false );
1588+ }
15711589 }
15721590
15731591 @ GuardedBy ("lock" )
0 commit comments