13
13
14
14
import io .netty .bootstrap .Bootstrap ;
15
15
import io .netty .buffer .ByteBuf ;
16
- import io .netty .channel .Channel ;
17
- import io .netty .channel .ChannelFuture ;
18
- import io .netty .channel .ChannelHandlerContext ;
19
- import io .netty .channel .ChannelInitializer ;
16
+ import io .netty .channel .*;
20
17
import io .netty .channel .nio .NioEventLoopGroup ;
21
18
import io .netty .channel .socket .nio .NioSocketChannel ;
22
19
import io .netty .handler .codec .http2 .AbstractHttp2ConnectionHandlerBuilder ;
44
41
import io .vertx .test .core .Repeat ;
45
42
import org .junit .Test ;
46
43
44
+ import java .io .IOException ;
47
45
import java .net .InetSocketAddress ;
48
46
import java .util .concurrent .CompletableFuture ;
49
47
import java .util .concurrent .TimeUnit ;
@@ -82,7 +80,7 @@ private void testMYR(boolean multiplexImplementation) throws Exception {
82
80
AtomicInteger inflightRequests = new AtomicInteger ();
83
81
AtomicInteger maxInflightRequests = new AtomicInteger ();
84
82
AtomicInteger receivedRstFrames = new AtomicInteger ();
85
- CompletableFuture <Void > goAway = new CompletableFuture <>();
83
+ CompletableFuture <Boolean > goAway = new CompletableFuture <>();
86
84
87
85
server .requestHandler (req -> {
88
86
int val = inflightRequests .incrementAndGet ();
@@ -154,7 +152,7 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) th
154
152
155
153
@ Override
156
154
public void onGoAwayRead (ChannelHandlerContext ctx , int lastStreamId , long errorCode , ByteBuf debugData ) throws Http2Exception {
157
- goAway .complete (null );
155
+ goAway .complete (true );
158
156
}
159
157
});
160
158
return super .build ();
@@ -164,6 +162,16 @@ public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long error
164
162
Builder clientHandlerBuilder = new Builder ();
165
163
Http2ConnectionHandler clientHandler = clientHandlerBuilder .build ();
166
164
ch .pipeline ().addLast (clientHandler );
165
+ ch .pipeline ().addLast (new ChannelDuplexHandler () {
166
+ @ Override
167
+ public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) throws Exception {
168
+ if (cause instanceof IOException && cause .getMessage ().startsWith ("Connection reset" )) {
169
+ goAway .complete (false );
170
+ } else {
171
+ goAway .completeExceptionally (cause );
172
+ }
173
+ }
174
+ });
167
175
}
168
176
};
169
177
}
@@ -192,10 +200,14 @@ public ChannelFuture connect(int port, String host, BiConsumer<ChannelHandlerCon
192
200
chctx .flush ();
193
201
}).sync ();
194
202
195
- goAway .get (10 , TimeUnit .SECONDS );
196
-
197
203
// Check the number of rst frame received before getting a go away
198
- assertEquals (receivedRstFrames .get (), maxRstFramePerWindow + 1 );
204
+ if (goAway .get (20 , TimeUnit .SECONDS )) {
205
+ assertEquals (receivedRstFrames .get (), maxRstFramePerWindow + 1 );
206
+ } else {
207
+ // Mitigate CI behavior
208
+ assertTrue (receivedRstFrames .get () < maxRstFramePerWindow + 1 );
209
+ }
210
+
199
211
assertTrue (maxInflightRequests .get () <= 2 * maxRstFramePerWindow );
200
212
}
201
213
}
0 commit comments