diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift index bb58488b..f1b12fac 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -import NIOCore +@_spi(AsyncChannel) import NIOCore internal struct InlineStreamMultiplexer { private let context: ChannelHandlerContext @@ -238,8 +238,37 @@ extension NIOHTTP2Handler { self.inbound = inboundStreamChannels } + /// Create a stream channel initialized with the provided closure public func createStreamChannel(_ initializer: @escaping NIOHTTP2Handler.StreamInitializerWithOutput) async throws -> OutboundStreamOutput { return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get() } + + + /// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`. + /// + /// - Parameters: + /// - inboundType: The ``NIOAsyncChannel/inboundStream`` message type for the created channel. + /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `initializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - outboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the created channel. + /// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `initializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - initializer: A callback that will be invoked to allow you to configure the + /// `ChannelPipeline` for the newly created channel. + public func createStreamChannel( + inboundType: Inbound.Type, + outboundType: Outbound.Type, + initializer: @escaping NIOHTTP2Handler.StreamInitializer + ) async throws -> NIOAsyncChannel { + return try await self.createStreamChannel { channel in + initializer(channel).flatMapThrowing { _ in + return try NIOAsyncChannel( + synchronouslyWrapping: channel, + inboundType: Inbound.self, + outboundType: Outbound.self + ) + } + } + } } } diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift index ec7fc0ed..de3ea2f1 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler.swift @@ -990,11 +990,15 @@ extension NIOHTTP2Handler { #if swift(>=5.7) /// The type of all `inboundStreamInitializer` callbacks. public typealias StreamInitializer = @Sendable (Channel) -> EventLoopFuture + /// The type of all `connectionInitializer` callbacks. + public typealias ConnectionInitializer = @Sendable (Channel) -> EventLoopFuture /// The type of `inboundStreamInitializer` callbacks which return non-void results. public typealias StreamInitializerWithOutput = @Sendable (Channel) -> EventLoopFuture #else /// The type of all `inboundStreamInitializer` callbacks. public typealias StreamInitializer = (Channel) -> EventLoopFuture + /// The type of all `connectionInitializer` callbacks. + public typealias ConnectionInitializer = (Channel) -> EventLoopFuture /// The type of `inboundStreamInitializer` callbacks which return non-void results. public typealias StreamInitializerWithOutput = (Channel) -> EventLoopFuture #endif diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index f23c0a8a..cd14c5db 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -import NIOCore +@_spi(AsyncChannel) import NIOCore import NIOTLS /// The supported ALPN protocol tokens for NIO's HTTP/2 abstraction layer. @@ -212,7 +212,8 @@ extension Channel { /// /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. - /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams, as it allows that pipeline to evolve without breaking your code. + /// Use this function to setup a HTTP/2 pipeline if you wish to use async sequence abstractions over inbound and outbound streams. + /// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code. /// /// - parameters: /// - mode: The mode this pipeline will operate in, server or client. @@ -222,7 +223,8 @@ extension Channel { /// - streamDelegate: The delegate to be notified in the event of stream creation and close. /// - position: The position in the pipeline into which to insert this handler. /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. - /// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can be used to initiate new streams and iterate over inbound HTTP/2 stream channels. + /// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can + /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @_spi(AsyncChannel) public func configureAsyncHTTP2Pipeline( @@ -258,6 +260,73 @@ extension Channel { } } + /// Configures a `ChannelPipeline` to speak HTTP/2 and wraps any created inbound stream channels in `NIOAsyncChannel`s. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions + /// over inbound and outbound streams whilst handling back-pressure. + /// + /// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code. + /// + /// - parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the + /// handshake. + /// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control. + /// - streamDelegate: The delegate to be notified in the event of stream creation and close. + /// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`. + /// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels. + /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels. + /// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps + /// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams + /// and iterate over inbound HTTP/2 stream channels. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + @_spi(AsyncChannel) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration, + streamConfiguration: NIOHTTP2Handler.StreamConfiguration, + streamDelegate: NIOHTTP2StreamDelegate? = nil, + position: ChannelPipeline.Position = .last, + streamInboundType: StreamInbound.Type, + streamOutboundType: StreamOutbound.Type, + inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer + ) throws -> EventLoopFuture>> { + if self.eventLoop.inEventLoop { + return self.eventLoop.makeCompletedFuture { + return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( + mode: mode, + connectionConfiguration: connectionConfiguration, + streamConfiguration: streamConfiguration, + streamDelegate: streamDelegate, + position: position, + streamInboundType: streamInboundType, + streamOutboundType: streamOutboundType, + inboundStreamInitializer: inboundStreamInitializer + ) + } + } else { + return self.eventLoop.submit { + return try self.pipeline.syncOperations.configureAsyncHTTP2Pipeline( + mode: mode, + connectionConfiguration: connectionConfiguration, + streamConfiguration: streamConfiguration, + streamDelegate: streamDelegate, + position: position, + streamInboundType: streamInboundType, + streamOutboundType: streamOutboundType, + inboundStreamInitializer: inboundStreamInitializer + ) + } + } + } + /// Configures a channel to perform a HTTP/2 secure upgrade. /// /// HTTP/2 secure upgrade uses the Application Layer Protocol Negotiation TLS extension to @@ -418,6 +487,75 @@ extension Channel { }.map { _ in () } } } + + /// Configures a `ChannelPipeline` to speak HTTP/2 and wraps both the connection channel and any + /// created inbound stream channels in `NIOAsyncChannel`s. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions + /// over inbound and outbound streams whilst handling back-pressure. + /// + /// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code. + /// + /// - parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the + /// handshake. + /// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control. + /// - streamDelegate: The delegate to be notified in the event of stream creation and close. + /// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`. + /// - connectionInboundType: The ``NIOAsyncChannel/inboundStream`` message type for the HTTP/2 connection channel. + /// This type must match the `InboundOut` type of the final handler in the connection channel. + /// - connectionOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the HTTP/2 connection channel. + /// This type must match the `OutboundIn` type of the final handler in the connection channel. + /// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels. + /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels. + /// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - connectionInitializer: A closure that will be called once the `NIOHTTP2Handler` has been added to the pipeline. + /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps + /// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams + /// and iterate over inbound HTTP/2 stream channels. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + @_spi(AsyncChannel) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration, + streamConfiguration: NIOHTTP2Handler.StreamConfiguration, + streamDelegate: NIOHTTP2StreamDelegate? = nil, + connectionInboundType: ConnectionInbound.Type, + connectionOutboundType: ConnectionOutbound.Type, + streamInboundType: StreamInbound.Type, + streamOutboundType: StreamOutbound.Type, + connectionInitializer: @escaping NIOHTTP2Handler.ConnectionInitializer, + inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer + ) throws -> EventLoopFuture<( + NIOAsyncChannel, + NIOHTTP2Handler.AsyncStreamMultiplexer> + )> { + return try self.configureAsyncHTTP2Pipeline( + mode: mode, + connectionConfiguration: connectionConfiguration, + streamConfiguration: streamConfiguration, + streamDelegate: streamDelegate, + streamInboundType: streamInboundType, + streamOutboundType: streamOutboundType, + inboundStreamInitializer: inboundStreamInitializer + ).flatMap { multiplexer in + return connectionInitializer(self).flatMapThrowing { _ in + let connectionAsyncChannel = try NIOAsyncChannel( + synchronouslyWrapping: self, + inboundType: ConnectionInbound.self, + outboundType: ConnectionOutbound.self + ) + return (connectionAsyncChannel, multiplexer) + } + } + } } extension ChannelPipeline.SynchronousOperations { @@ -476,8 +614,8 @@ extension ChannelPipeline.SynchronousOperations { /// - streamDelegate: The delegate to be notified in the event of stream creation and close. /// - position: The position in the pipeline into which to insert this handler. /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. - /// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can be used to initiate new streams and iterate over inbound HTTP/2 stream channels. - /// inserted into this pipeline, which can be used to initiate new streams. + /// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline, which can + /// be used to initiate new streams and iterate over inbound HTTP/2 stream channels. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @_spi(AsyncChannel) public func configureAsyncHTTP2Pipeline( @@ -504,4 +642,60 @@ extension ChannelPipeline.SynchronousOperations { return try handler.syncAsyncStreamMultiplexer(continuation: continuation, inboundStreamChannels: inboundStreamChannels) } + /// Configures a `ChannelPipeline` to speak HTTP/2 and wraps any created inbound stream channels in `NIOAsyncChannel`s. + /// + /// This operation **must** be called on the event loop. + /// + /// In general this is not entirely useful by itself, as HTTP/2 is a negotiated protocol. This helper does not handle negotiation. + /// Instead, this simply adds the handler required to speak HTTP/2 after negotiation has completed, or when agreed by prior knowledge. + /// Use this function to setup a HTTP/2 pipeline if you wish to use `NIOAsyncChannel`s to provide async sequence abstractions + /// over inbound and outbound streams whilst handling back-pressure. + /// + /// Using this rather than implementing a similar function yourself allows that pipeline to evolve without breaking your code. + /// + /// - parameters: + /// - mode: The mode this pipeline will operate in, server or client. + /// - connectionConfiguration: The settings that will be used when establishing the connection. These will be sent to the peer as part of the + /// handshake. + /// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control. + /// - streamDelegate: The delegate to be notified in the event of stream creation and close. + /// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`. + /// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels. + /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - streamOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for inbound stream channels. + /// This type must match the `OutboundIn` type of the final handler added to the stream channel by the `inboundStreamInitializer` + /// or ``HTTP2Frame/FramePayload`` if there are none. + /// - inboundStreamInitializer: A closure that will be called whenever the remote peer initiates a new stream. + /// - returns: An `EventLoopFuture` containing the `AsyncStreamMultiplexer` inserted into this pipeline which wraps + /// inbound streams as `NIOAsyncChannels` after initialization. The multiplexer can be used to initiate new streams + /// and iterate over inbound HTTP/2 stream channels. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + @_spi(AsyncChannel) + public func configureAsyncHTTP2Pipeline( + mode: NIOHTTP2Handler.ParserMode, + connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration, + streamConfiguration: NIOHTTP2Handler.StreamConfiguration, + streamDelegate: NIOHTTP2StreamDelegate? = nil, + position: ChannelPipeline.Position = .last, + streamInboundType: StreamInbound.Type, + streamOutboundType: StreamOutbound.Type, + inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer + ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer> { + return try self.configureAsyncHTTP2Pipeline( + mode: mode, + connectionConfiguration: connectionConfiguration, + streamConfiguration: streamConfiguration, + streamDelegate: streamDelegate, + position: position + ) { channel in + inboundStreamInitializer(channel).flatMapThrowing { _ in + return try NIOAsyncChannel( + synchronouslyWrapping: channel, + inboundType: StreamInbound.self, + outboundType: StreamOutbound.self + ) + } + } + } } diff --git a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift index a9ed50ac..86f15a2c 100644 --- a/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift +++ b/Tests/NIOHTTP2Tests/ConfiguringPipelineAsyncMultiplexerTests.swift @@ -14,7 +14,7 @@ import XCTest -import NIOCore +@_spi(AsyncChannel) import NIOCore import NIOEmbedded import NIOHPACK import NIOHTTP1 @@ -132,6 +132,165 @@ final class ConfiguringPipelineAsyncMultiplexerTests: XCTestCase { serverRecorder.receivedFrames.assertFramePayloadsMatch(Array(repeating: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload, count: requestCount)) } + + func testNIOAsyncConnectionStreamChannelPipelineCommunicates() async throws { + let requestCount = 100 + + let (clientAsyncChannel, clientMultiplexer) = try await assertNoThrowWithValue( + try await self.clientChannel.configureAsyncHTTP2Pipeline( + mode: .client, + connectionConfiguration: .init(), + streamConfiguration: .init(), + connectionInboundType: HTTP2Frame.self, + connectionOutboundType: HTTP2Frame.self, + streamInboundType: HTTP2Frame.FramePayload.self, + streamOutboundType: HTTP2Frame.FramePayload.self + ) { channel in + channel.eventLoop.makeSucceededVoidFuture() + } inboundStreamInitializer: { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededVoidFuture() + }.get() + ) + + let (serverAsyncChannel, serverMultiplexer) = try await assertNoThrowWithValue( + try await self.serverChannel.configureAsyncHTTP2Pipeline( + mode: .server, + connectionConfiguration: .init(), + streamConfiguration: .init(), + connectionInboundType: HTTP2Frame.self, + connectionOutboundType: HTTP2Frame.self, + streamInboundType: HTTP2Frame.FramePayload.self, + streamOutboundType: HTTP2Frame.FramePayload.self + ) { channel in + channel.eventLoop.makeSucceededVoidFuture() + } inboundStreamInitializer: { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededVoidFuture() + }.get() + ) + + try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) + + try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in + // server + group.addTask { + var serverInboundChannelCount = 0 + for try await streamChannel in serverMultiplexer.inbound { + for try await receivedFrame in streamChannel.inboundStream { + receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) + + try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) + streamChannel.outboundWriter.finish() + + try await self.interactInMemory(self.clientChannel, self.serverChannel) + } + serverInboundChannelCount += 1 + } + return serverInboundChannelCount + } + + // client + for _ in 0 ..< requestCount { + let streamChannel = try await clientMultiplexer.createStreamChannel( + inboundType: HTTP2Frame.FramePayload.self, + outboundType: HTTP2Frame.FramePayload.self + ) { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededVoidFuture() + } + // Let's try sending some requests + try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) + streamChannel.outboundWriter.finish() + + try await self.interactInMemory(self.clientChannel, self.serverChannel) + + for try await receivedFrame in streamChannel.inboundStream { + receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) + } + } + + clientAsyncChannel.outboundWriter.finish() + serverAsyncChannel.outboundWriter.finish() + + try await assertNoThrow(try await self.clientChannel.finish()) + try await assertNoThrow(try await self.serverChannel.finish()) + + let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) + XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the one HTTP/2 stream used.") + } + } + + func testNIOAsyncStreamChannelPipelineCommunicates() async throws { + let requestCount = 100 + + let clientMultiplexer = try await assertNoThrowWithValue( + try await self.clientChannel.configureAsyncHTTP2Pipeline( + mode: .client, + connectionConfiguration: .init(), + streamConfiguration: .init(), + streamInboundType: HTTP2Frame.FramePayload.self, + streamOutboundType: HTTP2Frame.FramePayload.self + ) { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededVoidFuture() + }.get() + ) + + let serverMultiplexer = try await assertNoThrowWithValue( + try await self.serverChannel.configureAsyncHTTP2Pipeline( + mode: .server, + connectionConfiguration: .init(), + streamConfiguration: .init(), + streamInboundType: HTTP2Frame.FramePayload.self, + streamOutboundType: HTTP2Frame.FramePayload.self + ) { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededVoidFuture() + }.get() + ) + + try await assertNoThrow(try await self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)) + + try await withThrowingTaskGroup(of: Int.self, returning: Void.self) { group in + // server + group.addTask { + var serverInboundChannelCount = 0 + for try await streamChannel in serverMultiplexer.inbound { + for try await receivedFrame in streamChannel.inboundStream { + receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) + + try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) + streamChannel.outboundWriter.finish() + + try await self.interactInMemory(self.clientChannel, self.serverChannel) + } + serverInboundChannelCount += 1 + } + return serverInboundChannelCount + } + + // client + for _ in 0 ..< requestCount { + let streamChannel = try await clientMultiplexer.createStreamChannel( + inboundType: HTTP2Frame.FramePayload.self, + outboundType: HTTP2Frame.FramePayload.self + ) { channel -> EventLoopFuture in + channel.eventLoop.makeSucceededVoidFuture() + } + // Let's try sending some requests + try await streamChannel.outboundWriter.write(ConfiguringPipelineAsyncMultiplexerTests.requestFramePayload) + streamChannel.outboundWriter.finish() + + try await self.interactInMemory(self.clientChannel, self.serverChannel) + + for try await receivedFrame in streamChannel.inboundStream { + receivedFrame.assertFramePayloadMatches(this: ConfiguringPipelineAsyncMultiplexerTests.responseFramePayload) + } + } + + try await assertNoThrow(try await self.clientChannel.finish()) + try await assertNoThrow(try await self.serverChannel.finish()) + + let serverInboundChannelCount = try await assertNoThrowWithValue(try await group.next()!) + XCTAssertEqual(serverInboundChannelCount, requestCount, "We should have created one server-side channel as a result of the one HTTP/2 stream used.") + } + } } #if swift(<5.9)