Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/io/grpc/internal/ClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.grpc.InternalInstrumented;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -90,6 +91,6 @@ interface PingCallback {
*
* @param cause the cause of the ping failure
*/
void onFailure(Throwable cause);
void onFailure(Status cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
public void ping(final PingCallback callback, Executor executor) {
executor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(error.asException());
callback.onFailure(error);

Check warning on line 58 in core/src/main/java/io/grpc/internal/FailingClientTransport.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/FailingClientTransport.java#L58

Added line #L58 was not covered by tests
}
});
}
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/io/grpc/internal/Http2Ping.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.base.Stopwatch;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.internal.ClientTransport.PingCallback;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -62,7 +63,7 @@ public class Http2Ping {
/**
* If non-null, indicates the ping failed.
*/
@GuardedBy("this") private Throwable failureCause;
@GuardedBy("this") private Status failureCause;

/**
* The round-trip time for the ping, in nanoseconds. This value is only meaningful when
Expand Down Expand Up @@ -144,7 +145,7 @@ public boolean complete() {
*
* @param failureCause the cause of failure
*/
public void failed(Throwable failureCause) {
public void failed(Status failureCause) {
Map<ClientTransport.PingCallback, Executor> callbacks;
synchronized (this) {
if (completed) {
Expand All @@ -167,7 +168,7 @@ public void failed(Throwable failureCause) {
* @param executor the executor used to invoke the callback
* @param cause the cause of failure
*/
public static void notifyFailed(PingCallback callback, Executor executor, Throwable cause) {
public static void notifyFailed(PingCallback callback, Executor executor, Status cause) {
doExecute(executor, asRunnable(callback, cause));
}

Expand Down Expand Up @@ -203,7 +204,7 @@ public void run() {
* failure.
*/
private static Runnable asRunnable(final ClientTransport.PingCallback callback,
final Throwable failureCause) {
final Status failureCause) {
return new Runnable() {
@Override
public void run() {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/grpc/internal/KeepAliveManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void ping() {
public void onSuccess(long roundTripTimeNanos) {}

@Override
public void onFailure(Throwable cause) {
public void onFailure(Status cause) {
transport.shutdownNow(Status.UNAVAILABLE.withDescription(
"Keepalive failed. The connection is likely gone"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void clientKeepAlivePinger_pingFailure() {
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
ClientTransport.PingCallback pingCallback = pingCallbackCaptor.getValue();

pingCallback.onFailure(new Throwable());
pingCallback.onFailure(Status.UNAVAILABLE);

ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(transport).shutdownNow(statusCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
protected ManagedClientTransport.Listener mockClientTransportListener
= mock(ManagedClientTransport.Listener.class);
protected MockServerListener serverListener = new MockServerListener();
private ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
private ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
protected final TestClientStreamTracer clientStreamTracer1 = new TestHeaderClientStreamTracer();
private final TestClientStreamTracer clientStreamTracer2 = new TestHeaderClientStreamTracer();
protected final ClientStreamTracer[] tracers = new ClientStreamTracer[] {
Expand Down Expand Up @@ -626,8 +626,8 @@ public void ping_afterTermination() throws Exception {
// Transport doesn't support ping, so this neither passes nor fails.
assumeTrue(false);
}
verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture());
Status status = Status.fromThrowable(throwableCaptor.getValue());
verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(statusCaptor.capture());
Status status = statusCaptor.getValue();
assertSame(shutdownReason, status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public synchronized void ping(final PingCallback callback, Executor executor) {
executor.execute(new Runnable() {
@Override
public void run() {
callback.onFailure(shutdownStatus.asRuntimeException());
callback.onFailure(shutdownStatus);
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ final class ClientTransportLifecycleManager {
/** null iff !transportShutdown. */
private Status shutdownStatus;
/** null iff !transportShutdown. */
private Throwable shutdownThrowable;
private boolean transportTerminated;

public ClientTransportLifecycleManager(ManagedClientTransport.Listener listener) {
Expand Down Expand Up @@ -72,7 +71,6 @@ public boolean notifyShutdown(Status s) {
return false;
}
shutdownStatus = s;
shutdownThrowable = s.asException();
return true;
}

Expand All @@ -97,7 +95,4 @@ public Status getShutdownStatus() {
return shutdownStatus;
}

public Throwable getShutdownThrowable() {
return shutdownThrowable;
}
}
23 changes: 11 additions & 12 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@
streamStatus = lifecycleManager.getShutdownStatus();
}
try {
cancelPing(lifecycleManager.getShutdownThrowable());
cancelPing(lifecycleManager.getShutdownStatus());
// Report status to the application layer for any open streams
connection().forEachActiveStream(new Http2StreamVisitor() {
@Override
Expand Down Expand Up @@ -593,13 +593,13 @@
*/
private void createStream(CreateStreamCommand command, ChannelPromise promise)
throws Exception {
if (lifecycleManager.getShutdownThrowable() != null) {
if (lifecycleManager.getShutdownStatus() != null) {
command.stream().setNonExistent();
// The connection is going away (it is really the GOAWAY case),
// just terminate the stream now.
command.stream().transportReportStatus(
lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
promise.setFailure(lifecycleManager.getShutdownThrowable());
promise.setFailure(lifecycleManager.getShutdownStatus().asException());
return;
}

Expand Down Expand Up @@ -854,14 +854,13 @@
transportTracer.reportKeepAliveSent();
} else {
Throwable cause = future.cause();
if (cause instanceof ClosedChannelException) {
cause = lifecycleManager.getShutdownThrowable();
if (cause == null) {
cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
.withCause(future.cause()).asException();
}
Status status = lifecycleManager.getShutdownStatus();

Check warning on line 857 in netty/src/main/java/io/grpc/netty/NettyClientHandler.java

View check run for this annotation

Codecov / codecov/patch

netty/src/main/java/io/grpc/netty/NettyClientHandler.java#L857

Added line #L857 was not covered by tests
if (cause instanceof ClosedChannelException && status == null) {
status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
.withCause(future.cause());
finalPing.failed(status);

Check warning on line 861 in netty/src/main/java/io/grpc/netty/NettyClientHandler.java

View check run for this annotation

Codecov / codecov/patch

netty/src/main/java/io/grpc/netty/NettyClientHandler.java#L859-L861

Added lines #L859 - L861 were not covered by tests
}
finalPing.failed(cause);
// else finalPing.failed(cause); ??
if (ping == finalPing) {
ping = null;
}
Expand Down Expand Up @@ -963,9 +962,9 @@
}
}

private void cancelPing(Throwable t) {
private void cancelPing(Status s) {
if (ping != null) {
ping.failed(t);
ping.failed(s);
ping = null;
}
}
Expand Down
4 changes: 2 additions & 2 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void ping(final PingCallback callback, final Executor executor) {
executor.execute(new Runnable() {
@Override
public void run() {
callback.onFailure(statusExplainingWhyTheChannelIsNull.asException());
callback.onFailure(statusExplainingWhyTheChannelIsNull);
}
});
return;
Expand All @@ -177,7 +177,7 @@ public void run() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Status s = statusFromFailedFuture(future);
Http2Ping.notifyFailed(callback, executor, s.asException());
Http2Ping.notifyFailed(callback, executor, s);
}
}
};
Expand Down
9 changes: 3 additions & 6 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.AbstractStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientStreamListener.RpcProgress;
Expand Down Expand Up @@ -812,9 +811,7 @@ public void ping_failsWhenChannelCloses() throws Exception {
handler().channelInactive(ctx());
// ping failed on channel going inactive
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertEquals(Status.Code.UNAVAILABLE, callback.failureCause.getCode());
// A failed ping is still counted
assertEquals(1, transportTracer.getStats().keepAlivesSent);
}
Expand Down Expand Up @@ -1169,7 +1166,7 @@ private static CreateStreamCommand newCreateStreamCommand(
private static class PingCallbackImpl implements ClientTransport.PingCallback {
int invocationCount;
long roundTripTime;
Throwable failureCause;
Status failureCause;

@Override
public void onSuccess(long roundTripTimeNanos) {
Expand All @@ -1178,7 +1175,7 @@ public void onSuccess(long roundTripTimeNanos) {
}

@Override
public void onFailure(Throwable cause) {
public void onFailure(Status cause) {
invocationCount++;
this.failureCause = cause;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ public void onSuccess(long roundTripTimeNanos) {
}

@Override
public void onFailure(Throwable cause) {
pingResult.setException(cause);
public void onFailure(Status cause) {
pingResult.setException(cause.asException());
}
};
transport.ping(pingCallback, clock.getScheduledExecutorService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,12 +1062,12 @@
}
}

private Throwable getPingFailure() {
private Status getPingFailure() {
synchronized (lock) {
if (goAwayStatus != null) {
return goAwayStatus.asException();
return goAwayStatus;
} else {
return Status.UNAVAILABLE.withDescription("Connection closed").asException();
return Status.UNAVAILABLE.withDescription("Connection closed");

Check warning on line 1070 in okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

View check run for this annotation

Codecov / codecov/patch

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java#L1070

Added line #L1070 was not covered by tests
}
}
}
Expand Down
19 changes: 6 additions & 13 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.internal.AbstractStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
Expand Down Expand Up @@ -1664,16 +1663,14 @@ public void ping_failsWhenTransportShutdown() throws Exception {
clientTransport.shutdown(SHUTDOWN_REASON);
// ping failed on channel shutdown
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
assertSame(SHUTDOWN_REASON, callback.failureCause);

// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertSame(SHUTDOWN_REASON, ((StatusException) callback.failureCause).getStatus());
assertSame(SHUTDOWN_REASON, callback.failureCause);
shutdownAndVerify();
}

Expand All @@ -1688,18 +1685,14 @@ public void ping_failsIfTransportFails() throws Exception {
clientTransport.onException(new IOException());
// ping failed on error
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertEquals(Status.Code.UNAVAILABLE, callback.failureCause.getCode());

// now that handler is in terminal state, all future pings fail immediately
callback = new PingCallbackImpl();
clientTransport.ping(callback, MoreExecutors.directExecutor());
assertEquals(1, getTransportStats(clientTransport).keepAlivesSent);
assertEquals(1, callback.invocationCount);
assertTrue(callback.failureCause instanceof StatusException);
assertEquals(Status.Code.UNAVAILABLE,
((StatusException) callback.failureCause).getStatus().getCode());
assertEquals(Status.Code.UNAVAILABLE, callback.failureCause.getCode());
shutdownAndVerify();
}

Expand Down Expand Up @@ -2385,7 +2378,7 @@ public InputStream getInputStream() {
static class PingCallbackImpl implements ClientTransport.PingCallback {
int invocationCount;
long roundTripTime;
Throwable failureCause;
Status failureCause;

@Override
public void onSuccess(long roundTripTimeNanos) {
Expand All @@ -2394,7 +2387,7 @@ public void onSuccess(long roundTripTimeNanos) {
}

@Override
public void onFailure(Throwable cause) {
public void onFailure(Status cause) {
invocationCount++;
this.failureCause = cause;
}
Expand Down
Loading