Skip to content

Commit 4222f77

Browse files
authored
xds:Move creating the retry timer in handleRpcStreamClosed to as late as possible and call close() (#11776)
* Move creating the retry timer in handleRpcStreamClosed to as late as possible and call `close` so that the `call` is cancelled. Also add some debug logging.
1 parent 6c12c2b commit 4222f77

File tree

3 files changed

+25
-12
lines changed

3 files changed

+25
-12
lines changed

core/src/main/java/io/grpc/internal/AbstractStream.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,17 @@
2727
import io.perfmark.PerfMark;
2828
import io.perfmark.TaskCloseable;
2929
import java.io.InputStream;
30+
import java.util.logging.Level;
31+
import java.util.logging.Logger;
3032
import javax.annotation.concurrent.GuardedBy;
3133

3234
/**
3335
* The stream and stream state as used by the application. Must only be called from the sending
3436
* application thread.
3537
*/
3638
public abstract class AbstractStream implements Stream {
39+
private static final Logger log = Logger.getLogger(AbstractStream.class.getName());
40+
3741
/** The framer to use for sending messages. */
3842
protected abstract Framer framer();
3943

@@ -371,6 +375,12 @@ private void notifyIfReady() {
371375
boolean doNotify;
372376
synchronized (onReadyLock) {
373377
doNotify = isReady();
378+
if (!doNotify && log.isLoggable(Level.FINEST)) {
379+
log.log(Level.FINEST,
380+
"Stream not ready so skip notifying listener.\n"
381+
+ "details: allocated/deallocated:{0}/{3}, sent queued: {1}, ready thresh: {2}",
382+
new Object[] {allocated, numSentBytesQueued, onReadyThreshold, deallocated});
383+
}
374384
}
375385
if (doNotify) {
376386
listener().onReady();

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -453,16 +453,6 @@ private void handleRpcStreamClosed(Status status) {
453453
stopwatch.reset();
454454
}
455455

456-
// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
457-
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
458-
// concurrently with the stopwatch and schedule.
459-
460-
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
461-
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
462-
463-
rpcRetryTimer =
464-
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
465-
466456
Status newStatus = status;
467457
if (responseReceived) {
468458
// A closed ADS stream after a successful response is not considered an error. Servers may
@@ -490,9 +480,17 @@ private void handleRpcStreamClosed(Status status) {
490480
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
491481
}
492482

493-
closed = true;
483+
close(newStatus.asException());
484+
485+
// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
486+
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
487+
// concurrently with the stopwatch and schedule.
488+
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
489+
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
490+
rpcRetryTimer =
491+
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
492+
494493
xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
495-
cleanUp();
496494
}
497495

498496
private void close(Exception error) {

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,9 @@ private Set<String> getResourceKeys(XdsResourceType<?> xdsResourceType) {
444444
// cpcForThisStream is null when doing shutdown
445445
private void cleanUpResourceTimers(ControlPlaneClient cpcForThisStream) {
446446
Collection<String> authoritiesForCpc = getActiveAuthorities(cpcForThisStream);
447+
String target = cpcForThisStream == null ? "null" : cpcForThisStream.getServerInfo().target();
448+
logger.log(XdsLogLevel.DEBUG, "Cleaning up resource timers for CPC {0}, authorities {1}",
449+
target, authoritiesForCpc);
447450

448451
for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
449452
for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
@@ -957,6 +960,8 @@ public void handleStreamClosed(Status status, boolean shouldTryFallback) {
957960

958961
ControlPlaneClient cpcClosed = serverCpClientMap.get(serverInfo);
959962
if (cpcClosed == null) {
963+
logger.log(XdsLogLevel.DEBUG,
964+
"Couldn't find closing CPC for {0}, so skipping cleanup and reporting", serverInfo);
960965
return;
961966
}
962967

0 commit comments

Comments
 (0)