Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,6 @@ public Runnable start(Listener transportListener) {
lifecycleManager = new ClientTransportLifecycleManager(
Preconditions.checkNotNull(transportListener, "listener"));

if (enableKeepAlive) {
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
keepAliveTimeoutNanos);
}

handler = newHandler();
HandlerSettings.setAutoWindow(handler);

Expand Down Expand Up @@ -234,6 +229,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
}
});

if (enableKeepAlive) {
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
keepAliveTimeoutNanos);
}

return null;
}

Expand Down Expand Up @@ -276,6 +277,11 @@ Channel channel() {
return channel;
}

@VisibleForTesting
KeepAliveManager keepAliveManager() {
return keepAliveManager;
}

/**
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
* from the pipeline when the channel is closed. Since handlers are removed, you may get an
Expand Down
42 changes: 35 additions & 7 deletions netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -179,7 +180,7 @@ address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
public void overrideDefaultUserAgent() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent");
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
callMeMaybe(transport.start(clientTransportListener));

new Rpc(transport, new Metadata()).halfClose().waitForResponse();
Expand All @@ -196,7 +197,7 @@ public void maxMessageSizeShouldBeEnforced() throws Throwable {
startServer();
// Allow the response payloads of up to 1 byte.
NettyClientTransport transport = newTransport(newNegotiator(),
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null);
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
callMeMaybe(transport.start(clientTransportListener));

try {
Expand Down Expand Up @@ -278,7 +279,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception {
startServer();

NettyClientTransport transport =
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null);
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
callMeMaybe(transport.start(clientTransportListener));

try {
Expand Down Expand Up @@ -344,6 +345,30 @@ public void clientStreamGetsAttributes() throws Exception {
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}

@Test
public void keepAliveEnabled() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do an RPC in each of these tests and just make sure you get OK? That will check many of the == null checks in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

startServer();
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();

assertNotNull(transport.keepAliveManager());
}

@Test
public void keepAliveDisabled() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();

assertNull(transport.keepAliveManager());
}

private Throwable getRootCause(Throwable t) {
if (t.getCause() == null) {
return t;
Expand All @@ -359,15 +384,18 @@ private ProtocolNegotiator newNegotiator() throws IOException {
}

private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
return newTransport(negotiator,
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */);
return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
null /* user agent */, true /* keep alive */);
}

private NettyClientTransport newTransport(
ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent) {
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, authority, userAgent);
if (enableKeepAlive) {
transport.enableKeepAlive(true, 1000, 1000);
}
transports.add(transport);
return transport;
}
Expand Down