Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional NIOAsyncChannel config #406

Merged
merged 4 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ extension NIOHTTP2Handler {
/// Create a stream channel initialized with the provided closure and return it wrapped within a `NIOAsyncChannel`.
///
/// - Parameters:
/// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel.
/// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel`` wrapping the HTTP/2 stream channel.
/// - inboundType: The ``NIOAsyncChannel/inboundStream`` message type for the created channel.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `initializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -255,15 +257,21 @@ extension NIOHTTP2Handler {
/// or ``HTTP2Frame/FramePayload`` if there are none.
/// - initializer: A callback that will be invoked to allow you to configure the
/// `ChannelPipeline` for the newly created channel.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public func createStreamChannel<Inbound, Outbound>(
inboundType: Inbound.Type,
outboundType: Outbound.Type,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
inboundType: Inbound.Type = Inbound.self,
outboundType: Outbound.Type = Outbound.self,
initializer: @escaping NIOHTTP2Handler.StreamInitializer
) async throws -> NIOAsyncChannel<Inbound, Outbound> {
return try await self.createStreamChannel { channel in
initializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
backpressureStrategy: backpressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
inboundType: Inbound.self,
outboundType: Outbound.self
)
Expand Down
42 changes: 34 additions & 8 deletions Sources/NIOHTTP2/HTTP2PipelineHelpers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ extension Channel {
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -294,8 +296,10 @@ extension Channel {
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
streamInboundType: StreamInbound.Type = StreamInbound.self,
streamOutboundType: StreamOutbound.Type = StreamOutbound.self,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>>> {
if self.eventLoop.inEventLoop {
Expand All @@ -306,6 +310,8 @@ extension Channel {
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
Expand All @@ -319,6 +325,8 @@ extension Channel {
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
position: position,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
Expand Down Expand Up @@ -505,10 +513,14 @@ extension Channel {
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - connectionBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel`` wrapping the HTTP/2 connection channel.
/// - isConnectionOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel`` wrapping the HTTP/2 connection channel.
/// - connectionInboundType: The ``NIOAsyncChannel/inboundStream`` message type for the HTTP/2 connection channel.
/// This type must match the `InboundOut` type of the final handler in the connection channel.
/// - connectionOutboundType: The ``NIOAsyncChannel/outboundWriter`` message type for the HTTP/2 connection channel.
/// This type must match the `OutboundIn` type of the final handler in the connection channel.
/// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -527,10 +539,14 @@ extension Channel {
connectionConfiguration: NIOHTTP2Handler.ConnectionConfiguration,
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
connectionInboundType: ConnectionInbound.Type,
connectionOutboundType: ConnectionOutbound.Type,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
connectionBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isConnectionOutboundHalfClosureEnabled: Bool = false,
connectionInboundType: ConnectionInbound.Type = ConnectionInbound.self,
connectionOutboundType: ConnectionOutbound.Type = ConnectionOutbound.self,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
streamInboundType: StreamInbound.Type = StreamInbound.self,
streamOutboundType: StreamOutbound.Type = StreamOutbound.self,
connectionInitializer: @escaping NIOHTTP2Handler.ConnectionInitializer,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> EventLoopFuture<(
Expand All @@ -542,13 +558,17 @@ extension Channel {
connectionConfiguration: connectionConfiguration,
streamConfiguration: streamConfiguration,
streamDelegate: streamDelegate,
inboundStreamBackpressureStrategy: inboundStreamBackpressureStrategy,
isInboundStreamOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
streamInboundType: streamInboundType,
streamOutboundType: streamOutboundType,
inboundStreamInitializer: inboundStreamInitializer
).flatMap { multiplexer in
return connectionInitializer(self).flatMapThrowing { _ in
let connectionAsyncChannel = try NIOAsyncChannel(
synchronouslyWrapping: self,
backpressureStrategy: connectionBackpressureStrategy,
isOutboundHalfClosureEnabled: isConnectionOutboundHalfClosureEnabled,
inboundType: ConnectionInbound.self,
outboundType: ConnectionOutbound.self
)
Expand Down Expand Up @@ -660,6 +680,8 @@ extension ChannelPipeline.SynchronousOperations {
/// - streamConfiguration: The settings that will be used when establishing new streams. These mainly pertain to flow control.
/// - streamDelegate: The delegate to be notified in the event of stream creation and close.
/// - position: The position in the pipeline into which to insert the `NIOHTTP2Handler`.
/// - inboundStreamBackpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - isInboundStreamOutboundHalfClosureEnabled: If outbound half closure should be enabled for the ``NIOAsyncChannel``s wrapping inbound HTTP/2 streams.
/// - streamInboundType: The ``NIOAsyncChannel/inboundStream`` message type for inbound stream channels.
/// This type must match the `InboundOut` type of the final handler added to the stream channel by the `inboundStreamInitializer`
/// or ``HTTP2Frame/FramePayload`` if there are none.
Expand All @@ -678,8 +700,10 @@ extension ChannelPipeline.SynchronousOperations {
streamConfiguration: NIOHTTP2Handler.StreamConfiguration,
streamDelegate: NIOHTTP2StreamDelegate? = nil,
position: ChannelPipeline.Position = .last,
streamInboundType: StreamInbound.Type,
streamOutboundType: StreamOutbound.Type,
inboundStreamBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isInboundStreamOutboundHalfClosureEnabled: Bool = false,
streamInboundType: StreamInbound.Type = StreamInbound.self,
streamOutboundType: StreamOutbound.Type = StreamOutbound.self,
inboundStreamInitializer: @escaping NIOHTTP2Handler.StreamInitializer
) throws -> NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<StreamInbound, StreamOutbound>> {
return try self.configureAsyncHTTP2Pipeline(
Expand All @@ -692,6 +716,8 @@ extension ChannelPipeline.SynchronousOperations {
inboundStreamInitializer(channel).flatMapThrowing { _ in
return try NIOAsyncChannel(
synchronouslyWrapping: channel,
backpressureStrategy: inboundStreamBackpressureStrategy,
isOutboundHalfClosureEnabled: isInboundStreamOutboundHalfClosureEnabled,
inboundType: StreamInbound.self,
outboundType: StreamOutbound.self
)
Expand Down