diff --git a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift index f1b12fac..65fc442b 100644 --- a/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift @@ -247,6 +247,8 @@ extension NIOHTTP2Handler { /// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`. /// /// - Parameters: + /// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel. + /// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel. /// - inboundType: The ``NIOAsyncChannel/inboundStream`` message type for the created channel. /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `initializer` /// or ``HTTP2Frame/FramePayload`` if there are none. @@ -255,15 +257,21 @@ extension NIOHTTP2Handler { /// or ``HTTP2Frame/FramePayload`` if there are none. /// - initializer: A callback that will be invoked to allow you to configure the /// `ChannelPipeline` for the newly created channel. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + @_spi(AsyncChannel) public func createStreamChannel( - inboundType: Inbound.Type, - outboundType: Outbound.Type, + backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + isOutboundHalfClosureEnabled: Bool = false, + inboundType: Inbound.Type = Inbound.self, + outboundType: Outbound.Type = Outbound.self, initializer: @escaping NIOHTTP2Handler.StreamInitializer ) async throws -> NIOAsyncChannel { return try await self.createStreamChannel { channel in initializer(channel).flatMapThrowing { _ in return try NIOAsyncChannel( synchronouslyWrapping: channel, + backpressureStrategy: backpressureStrategy, + isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, inboundType: Inbound.self, outboundType: Outbound.self ) diff --git a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift index cd14c5db..e61a7840 100644 --- a/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift +++ b/Sources/NIOHTTP2/HTTP2PipelineHelpers.swift @@ -276,6 +276,8 @@ extension Channel { /// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control. /// - streamDelegate: The delegate to be notified in the event of stream creation and close. /// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`. + /// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams. + /// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams. /// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels. /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer` /// or ``HTTP2Frame/FramePayload`` if there are none. @@ -294,8 +296,10 @@ extension Channel { streamConfiguration: NIOHTTP2Handler.StreamConfiguration, streamDelegate: NIOHTTP2StreamDelegate? = nil, position: ChannelPipeline.Position = .last, - streamInboundType: StreamInbound.Type, - streamOutboundType: StreamOutbound.Type, + inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + isInboundStreamOutboundHalfClosureEnabled: Bool = false, + streamInboundType: StreamInbound.Type = StreamInbound.self, + streamOutboundType: StreamOutbound.Type = StreamOutbound.self, inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer ) throws -> EventLoopFuture>> { if self.eventLoop.inEventLoop { @@ -306,6 +310,8 @@ extension Channel { streamConfiguration: streamConfiguration, streamDelegate: streamDelegate, position: position, + inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy, + isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled, streamInboundType: streamInboundType, streamOutboundType: streamOutboundType, inboundStreamInitializer: inboundStreamInitializer @@ -319,6 +325,8 @@ extension Channel { streamConfiguration: streamConfiguration, streamDelegate: streamDelegate, position: position, + inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy, + isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled, streamInboundType: streamInboundType, streamOutboundType: streamOutboundType, inboundStreamInitializer: inboundStreamInitializer @@ -505,10 +513,14 @@ extension Channel { /// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control. /// - streamDelegate: The delegate to be notified in the event of stream creation and close. /// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`. + /// - connectionBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel`` wrapping the HTTP/2 connection channel. + /// - isConnectionOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel`` wrapping the HTTP/2 connection channel. /// - connectionInboundType: The ``NIOAsyncChannel/inboundStream`` message type for the HTTP/2 connection channel. /// This type must match the `InboundOut` type of the final handler in the connection channel. /// - connectionOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the HTTP/2 connection channel. /// This type must match the `OutboundIn` type of the final handler in the connection channel. + /// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams. + /// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams. /// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels. /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer` /// or ``HTTP2Frame/FramePayload`` if there are none. @@ -527,10 +539,14 @@ extension Channel { connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration, streamConfiguration: NIOHTTP2Handler.StreamConfiguration, streamDelegate: NIOHTTP2StreamDelegate? = nil, - connectionInboundType: ConnectionInbound.Type, - connectionOutboundType: ConnectionOutbound.Type, - streamInboundType: StreamInbound.Type, - streamOutboundType: StreamOutbound.Type, + connectionBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + isConnectionOutboundHalfClosureEnabled: Bool = false, + connectionInboundType: ConnectionInbound.Type = ConnectionInbound.self, + connectionOutboundType: ConnectionOutbound.Type = ConnectionOutbound.self, + inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + isInboundStreamOutboundHalfClosureEnabled: Bool = false, + streamInboundType: StreamInbound.Type = StreamInbound.self, + streamOutboundType: StreamOutbound.Type = StreamOutbound.self, connectionInitializer: @escaping NIOHTTP2Handler.ConnectionInitializer, inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer ) throws -> EventLoopFuture<( @@ -542,6 +558,8 @@ extension Channel { connectionConfiguration: connectionConfiguration, streamConfiguration: streamConfiguration, streamDelegate: streamDelegate, + inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy, + isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled, streamInboundType: streamInboundType, streamOutboundType: streamOutboundType, inboundStreamInitializer: inboundStreamInitializer @@ -549,6 +567,8 @@ extension Channel { return connectionInitializer(self).flatMapThrowing { _ in let connectionAsyncChannel = try NIOAsyncChannel( synchronouslyWrapping: self, + backpressureStrategy: connectionBackpressureStrategy, + isOutboundHalfClosureEnabled: isConnectionOutboundHalfClosureEnabled, inboundType: ConnectionInbound.self, outboundType: ConnectionOutbound.self ) @@ -660,6 +680,8 @@ extension ChannelPipeline.SynchronousOperations { /// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control. /// - streamDelegate: The delegate to be notified in the event of stream creation and close. /// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`. + /// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams. + /// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams. /// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels. /// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer` /// or ``HTTP2Frame/FramePayload`` if there are none. @@ -678,8 +700,10 @@ extension ChannelPipeline.SynchronousOperations { streamConfiguration: NIOHTTP2Handler.StreamConfiguration, streamDelegate: NIOHTTP2StreamDelegate? = nil, position: ChannelPipeline.Position = .last, - streamInboundType: StreamInbound.Type, - streamOutboundType: StreamOutbound.Type, + inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, + isInboundStreamOutboundHalfClosureEnabled: Bool = false, + streamInboundType: StreamInbound.Type = StreamInbound.self, + streamOutboundType: StreamOutbound.Type = StreamOutbound.self, inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer ) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer> { return try self.configureAsyncHTTP2Pipeline( @@ -692,6 +716,8 @@ extension ChannelPipeline.SynchronousOperations { inboundStreamInitializer(channel).flatMapThrowing { _ in return try NIOAsyncChannel( synchronouslyWrapping: channel, + backpressureStrategy: inboundStreamBackpressureStrategy, + isOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled, inboundType: StreamInbound.self, outboundType: StreamOutbound.self )