Skip to content

Commit d116cc9

Browse files
lukaszx0ejona86
authored andcommitted
netty: Fix NPE in NettyClientTransport
Fixes NPE when keepalive is enabled. * Move creation of keepAliveManager to the bottom of start() * Enable keepAlive in NettyClientTransportTest * Add test cases checking if keepalive is enabled/disabled, specifically. Fixes #2726
1 parent 72923dc commit d116cc9

File tree

2 files changed

+46
-12
lines changed

2 files changed

+46
-12
lines changed

netty/src/main/java/io/grpc/netty/NettyClientTransport.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,6 @@ public Runnable start(Listener transportListener) {
165165
lifecycleManager = new ClientTransportLifecycleManager(
166166
Preconditions.checkNotNull(transportListener, "listener"));
167167

168-
if (enableKeepAlive) {
169-
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
170-
keepAliveTimeoutNanos);
171-
}
172-
173168
handler = newHandler();
174169
HandlerSettings.setAutoWindow(handler);
175170

@@ -234,6 +229,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
234229
Status.INTERNAL.withDescription("Connection closed with unknown cause"));
235230
}
236231
});
232+
233+
if (enableKeepAlive) {
234+
keepAliveManager = new KeepAliveManager(this, channel.eventLoop(), keepAliveDelayNanos,
235+
keepAliveTimeoutNanos);
236+
}
237+
237238
return null;
238239
}
239240

@@ -276,6 +277,11 @@ Channel channel() {
276277
return channel;
277278
}
278279

280+
@VisibleForTesting
281+
KeepAliveManager keepAliveManager() {
282+
return keepAliveManager;
283+
}
284+
279285
/**
280286
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
281287
* from the pipeline when the channel is closed. Since handlers are removed, you may get an

netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static org.junit.Assert.assertEquals;
4040
import static org.junit.Assert.assertFalse;
4141
import static org.junit.Assert.assertNotNull;
42+
import static org.junit.Assert.assertNull;
4243
import static org.junit.Assert.assertTrue;
4344
import static org.junit.Assert.fail;
4445

@@ -179,7 +180,7 @@ address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
179180
public void overrideDefaultUserAgent() throws Exception {
180181
startServer();
181182
NettyClientTransport transport = newTransport(newNegotiator(),
182-
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent");
183+
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
183184
callMeMaybe(transport.start(clientTransportListener));
184185

185186
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
@@ -196,7 +197,7 @@ public void maxMessageSizeShouldBeEnforced() throws Throwable {
196197
startServer();
197198
// Allow the response payloads of up to 1 byte.
198199
NettyClientTransport transport = newTransport(newNegotiator(),
199-
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null);
200+
1, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null, true);
200201
callMeMaybe(transport.start(clientTransportListener));
201202

202203
try {
@@ -278,7 +279,7 @@ public void maxHeaderListSizeShouldBeEnforcedOnClient() throws Exception {
278279
startServer();
279280

280281
NettyClientTransport transport =
281-
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null);
282+
newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE, 1, null, true);
282283
callMeMaybe(transport.start(clientTransportListener));
283284

284285
try {
@@ -344,6 +345,30 @@ public void clientStreamGetsAttributes() throws Exception {
344345
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
345346
}
346347

348+
@Test
349+
public void keepAliveEnabled() throws Exception {
350+
startServer();
351+
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
352+
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */);
353+
callMeMaybe(transport.start(clientTransportListener));
354+
Rpc rpc = new Rpc(transport).halfClose();
355+
rpc.waitForResponse();
356+
357+
assertNotNull(transport.keepAliveManager());
358+
}
359+
360+
@Test
361+
public void keepAliveDisabled() throws Exception {
362+
startServer();
363+
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
364+
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
365+
callMeMaybe(transport.start(clientTransportListener));
366+
Rpc rpc = new Rpc(transport).halfClose();
367+
rpc.waitForResponse();
368+
369+
assertNull(transport.keepAliveManager());
370+
}
371+
347372
private Throwable getRootCause(Throwable t) {
348373
if (t.getCause() == null) {
349374
return t;
@@ -359,15 +384,18 @@ private ProtocolNegotiator newNegotiator() throws IOException {
359384
}
360385

361386
private NettyClientTransport newTransport(ProtocolNegotiator negotiator) {
362-
return newTransport(negotiator,
363-
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */);
387+
return newTransport(negotiator, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
388+
null /* user agent */, true /* keep alive */);
364389
}
365390

366-
private NettyClientTransport newTransport(
367-
ProtocolNegotiator negotiator, int maxMsgSize, int maxHeaderListSize, String userAgent) {
391+
private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int maxMsgSize,
392+
int maxHeaderListSize, String userAgent, boolean enableKeepAlive) {
368393
NettyClientTransport transport = new NettyClientTransport(
369394
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
370395
DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, authority, userAgent);
396+
if (enableKeepAlive) {
397+
transport.enableKeepAlive(true, 1000, 1000);
398+
}
371399
transports.add(transport);
372400
return transport;
373401
}

0 commit comments

Comments
 (0)