|
55 | 55 | import io.netty.channel.ChannelFutureListener; |
56 | 56 | import io.netty.channel.ChannelHandlerContext; |
57 | 57 | import io.netty.channel.ChannelOption; |
| 58 | +import io.netty.channel.EventLoop; |
58 | 59 | import io.netty.channel.EventLoopGroup; |
59 | 60 | import io.netty.channel.socket.nio.NioSocketChannel; |
60 | 61 | import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException; |
@@ -165,14 +166,21 @@ public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) { |
165 | 166 | public Runnable start(Listener transportListener) { |
166 | 167 | lifecycleManager = new ClientTransportLifecycleManager( |
167 | 168 | Preconditions.checkNotNull(transportListener, "listener")); |
| 169 | + EventLoop eventLoop = group.next(); |
| 170 | + if (enableKeepAlive) { |
| 171 | + keepAliveManager = new KeepAliveManager( |
| 172 | + new ClientKeepAlivePinger(this), eventLoop, keepAliveDelayNanos, keepAliveTimeoutNanos, |
| 173 | + false); |
| 174 | + } |
168 | 175 |
|
169 | | - handler = newHandler(); |
| 176 | + handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow, |
| 177 | + maxHeaderListSize, Ticker.systemTicker()); |
170 | 178 | HandlerSettings.setAutoWindow(handler); |
171 | 179 |
|
172 | 180 | negotiationHandler = negotiator.newHandler(handler); |
173 | 181 |
|
174 | 182 | Bootstrap b = new Bootstrap(); |
175 | | - b.group(group); |
| 183 | + b.group(eventLoop); |
176 | 184 | b.channel(channelType); |
177 | 185 | if (NioSocketChannel.class.isAssignableFrom(channelType)) { |
178 | 186 | b.option(SO_KEEPALIVE, true); |
@@ -231,10 +239,7 @@ public void operationComplete(ChannelFuture future) throws Exception { |
231 | 239 | } |
232 | 240 | }); |
233 | 241 |
|
234 | | - if (enableKeepAlive) { |
235 | | - keepAliveManager = new KeepAliveManager( |
236 | | - new ClientKeepAlivePinger(this), channel.eventLoop(), keepAliveDelayNanos, |
237 | | - keepAliveTimeoutNanos, false); |
| 242 | + if (keepAliveManager != null) { |
238 | 243 | keepAliveManager.onTransportStarted(); |
239 | 244 | } |
240 | 245 |
|
@@ -307,9 +312,4 @@ private Status statusFromFailedFuture(ChannelFuture f) { |
307 | 312 | } |
308 | 313 | return Utils.statusFromThrowable(t); |
309 | 314 | } |
310 | | - |
311 | | - private NettyClientHandler newHandler() { |
312 | | - return NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow, |
313 | | - maxHeaderListSize, Ticker.systemTicker()); |
314 | | - } |
315 | 315 | } |
0 commit comments