Skip to content

Commit 1879e72

Browse files
authored
Finish multiplexer's inbound streams in more cases (#483)
## Motivation Currently, the multiplexer's inbound streams stream is finished only when the channel becomes inactive. There are some scenarios in which the channel may be closed before it has a chance to become active, and the stream will never be finished. This can cause any users iterating over the stream to hang. ## Modifications This PR finishes the inbound streams stream when a connection error is fired, and when the handler is removed. ## Result Fewer bugs.
1 parent bb19976 commit 1879e72

5 files changed

+103
-0
lines changed

Sources/NIOHTTP2/HTTP2ChannelHandler+InboundStreamMultiplexer.swift

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ extension NIOHTTP2Handler.InboundStreamMultiplexer {
146146
}
147147
}
148148

149+
func errorCaughtReceived(_ error: any Error) {
150+
switch self {
151+
case .inline(let inlineStreamMultiplexer):
152+
inlineStreamMultiplexer.propagateErrorCaught(error)
153+
case .legacy:
154+
break // do nothing
155+
}
156+
}
157+
158+
func handlerRemovedReceived() {
159+
switch self {
160+
case .inline(let inlineStreamMultiplexer):
161+
inlineStreamMultiplexer.propagateHandlerRemoved()
162+
case .legacy:
163+
break // do nothing
164+
}
165+
}
166+
149167
func processedFrame(_ frame: HTTP2Frame) {
150168
switch self {
151169
case .inline(let inlineStreamMultiplexer):

Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,14 @@ extension InlineStreamMultiplexer {
150150
}
151151
}
152152

153+
internal func propagateErrorCaught(_ error: any Error) {
154+
self._commonStreamMultiplexer.propagateErrorCaught(error)
155+
}
156+
157+
internal func propagateHandlerRemoved() {
158+
self._commonStreamMultiplexer.propagateHandlerRemoved()
159+
}
160+
153161
internal func processedFrame(frame: HTTP2Frame) {
154162
self._commonStreamMultiplexer.processedFrame(streamID: frame.streamID, size: frame.payload.flowControlledSize)
155163
}

Sources/NIOHTTP2/HTTP2ChannelHandler.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
414414
public func handlerRemoved(context: ChannelHandlerContext) {
415415
// Any frames we're buffering need to be dropped.
416416
self.outboundBuffer.invalidateBuffer()
417+
self.inboundStreamMultiplexer?.handlerRemovedReceived()
417418
self.inboundStreamMultiplexerState = .deinitialized
418419
}
419420

@@ -550,6 +551,11 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
550551
self.inboundStreamMultiplexer?.channelWritabilityChangedReceived()
551552
context.fireChannelWritabilityChanged()
552553
}
554+
555+
public func errorCaught(context: ChannelHandlerContext, error: any Error) {
556+
self.inboundStreamMultiplexer?.errorCaughtReceived(error)
557+
context.fireErrorCaught(error)
558+
}
553559
}
554560

555561
/// Inbound frame handling.

Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ extension HTTP2CommonInboundStreamMultiplexer {
213213
self.streamChannelContinuation?.finish()
214214
}
215215

216+
internal func propagateErrorCaught(_ error: any Error) {
217+
self.streamChannelContinuation?.finish(throwing: error)
218+
}
219+
220+
internal func propagateHandlerRemoved() {
221+
self.streamChannelContinuation?.finish()
222+
}
223+
216224
internal func selectivelyPropagateUserInboundEvent(context: ChannelHandlerContext, event: Any) {
217225
func propagateEvent(_ event: Any) {
218226
for channel in self.streams.values {

Tests/NIOHTTP2Tests/SimpleClientServerInlineStreamMultiplexerTests.swift

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,4 +503,67 @@ class SimpleClientServerInlineStreamMultiplexerTests: XCTestCase {
503503
XCTAssertNoThrow(try self.clientChannel.finish())
504504
XCTAssertNoThrow(try self.serverChannel.finish())
505505
}
506+
507+
func testChannelInactiveFinishesAsyncStreamMultiplexerInboundStream() async throws {
508+
let asyncClientChannel = NIOAsyncTestingChannel()
509+
let asyncServerChannel = NIOAsyncTestingChannel()
510+
511+
// Setup the connection.
512+
let clientMultiplexer = try await asyncClientChannel.configureAsyncHTTP2Pipeline(mode: .client) { _ in
513+
asyncClientChannel.eventLoop.makeSucceededVoidFuture()
514+
}.get()
515+
516+
let serverMultiplexer = try await asyncServerChannel.configureAsyncHTTP2Pipeline(mode: .server) { _ in
517+
asyncServerChannel.eventLoop.makeSucceededVoidFuture()
518+
}.get()
519+
520+
// Create the stream channel
521+
let stream = try await clientMultiplexer.openStream { $0.eventLoop.makeSucceededFuture($0) }
522+
523+
// Initiate request to open the stream on the server.
524+
let headers = HPACKHeaders([(":path", "/"), (":method", "POST"), (":scheme", "http")])
525+
let frame: HTTP2Frame.FramePayload = .headers(.init(headers: headers))
526+
stream.writeAndFlush(frame, promise: nil)
527+
try await self.interactInMemory(asyncClientChannel, asyncServerChannel)
528+
529+
// Close server to fire channel inactive down the pipeline: it should be propagated.
530+
try await asyncServerChannel.close()
531+
for try await _ in serverMultiplexer.inbound {}
532+
}
533+
534+
enum ErrorCaughtPropagated: Error, Equatable {
535+
case error
536+
}
537+
538+
func testErrorCaughtFinishesAsyncStreamMultiplexerInboundStream() async throws {
539+
let asyncClientChannel = NIOAsyncTestingChannel()
540+
let asyncServerChannel = NIOAsyncTestingChannel()
541+
542+
// Setup the connection.
543+
let clientMultiplexer = try await asyncClientChannel.configureAsyncHTTP2Pipeline(mode: .client) { _ in
544+
asyncClientChannel.eventLoop.makeSucceededVoidFuture()
545+
}.get()
546+
547+
let serverMultiplexer = try await asyncServerChannel.configureAsyncHTTP2Pipeline(mode: .server) { _ in
548+
asyncServerChannel.eventLoop.makeSucceededVoidFuture()
549+
}.get()
550+
551+
// Create the stream channel
552+
let stream = try await clientMultiplexer.openStream { $0.eventLoop.makeSucceededFuture($0) }
553+
554+
// Initiate request to open the stream on the server.
555+
let headers = HPACKHeaders([(":path", "/"), (":method", "POST"), (":scheme", "http")])
556+
let frame: HTTP2Frame.FramePayload = .headers(.init(headers: headers))
557+
stream.writeAndFlush(frame, promise: nil)
558+
try await self.interactInMemory(asyncClientChannel, asyncServerChannel)
559+
560+
// Fire an error down the server pipeline: it should cause the inbound stream to finish with error
561+
asyncServerChannel.pipeline.fireErrorCaught(ErrorCaughtPropagated.error)
562+
do {
563+
for try await _ in serverMultiplexer.inbound {}
564+
XCTFail("Expected error to be thrown")
565+
} catch {
566+
XCTAssert(error is ErrorCaughtPropagated)
567+
}
568+
}
506569
}

0 commit comments

Comments
 (0)