diff --git a/IntegrationTests/tests_01_allocation_counters/test_01_resources/test_client_server_request_response.swift b/IntegrationTests/tests_01_allocation_counters/test_01_resources/test_client_server_request_response.swift new file mode 100644 index 00000000..4c888741 --- /dev/null +++ b/IntegrationTests/tests_01_allocation_counters/test_01_resources/test_client_server_request_response.swift @@ -0,0 +1,125 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO +import NIOHPACK +import NIOHTTP1 +import NIOHTTP2 + +/// Have two `EmbeddedChannel` objects send and receive data from each other until +/// they make no forward progress. +func interactInMemory(_ first: EmbeddedChannel, _ second: EmbeddedChannel) throws { + var operated: Bool + + func readBytesFromChannel(_ channel: EmbeddedChannel) throws -> ByteBuffer? { + return try channel.readOutbound(as: ByteBuffer.self) + } + + repeat { + operated = false + first.embeddedEventLoop.run() + + if let data = try readBytesFromChannel(first) { + operated = true + try second.writeInbound(data) + } + if let data = try readBytesFromChannel(second) { + operated = true + try first.writeInbound(data) + } + } while operated +} + +final class ServerHandler: ChannelInboundHandler { + typealias InboundIn = HTTP2Frame + typealias OutboundOut = HTTP2Frame + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let data = self.unwrapInboundIn(data) + switch data.payload { + case .headers(let headers) where headers.endStream: + break + case .data(let data) where data.endStream: + break + default: + // Ignore this frame + return + } + + // We got END_STREAM. Let's send a response. + let headers = HPACKHeaders([(":status", "200")]) + let responseFrame = HTTP2Frame(streamID: data.streamID, payload: .headers(.init(headers: headers, endStream: true))) + context.writeAndFlush(self.wrapOutboundOut(responseFrame), promise: nil) + } +} + + +final class ClientHandler: ChannelInboundHandler { + typealias InboundIn = HTTP2Frame + typealias OutboundOut = HTTP2Frame + + let streamID: HTTP2StreamID + + init(streamID: HTTP2StreamID) { + self.streamID = streamID + } + + func channelActive(context: ChannelHandlerContext) { + // Send a request. + let headers = HPACKHeaders([(":path", "/"), + (":authority", "localhost"), + (":method", "GET"), + (":scheme", "https")]) + let requestFrame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers, endStream: true))) + context.writeAndFlush(self.wrapOutboundOut(requestFrame), promise: nil) + } +} + +func run(identifier: String) { + let loop = EmbeddedEventLoop() + + measure(identifier: identifier) { + var sumOfStreamIDs = 0 + + for _ in 0..<1000 { + let clientChannel = EmbeddedChannel(loop: loop) + let serverChannel = EmbeddedChannel(loop: loop) + + let clientMultiplexer = try! clientChannel.configureHTTP2Pipeline(mode: .client).wait() + _ = try! serverChannel.configureHTTP2Pipeline(mode: .server) { (channel, streamID) in + return channel.pipeline.addHandler(ServerHandler()) + }.wait() + try! clientChannel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait() + try! serverChannel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait() + + let promise = clientChannel.eventLoop.makePromise(of: Channel.self) + clientMultiplexer.createStreamChannel(promise: promise) { (channel, streamID) in + return channel.pipeline.addHandler(ClientHandler(streamID: streamID)) + } + clientChannel.embeddedEventLoop.run() + let child = try! promise.futureResult.wait() + let streamID = try! Int(child.getOption(HTTP2StreamChannelOptions.streamID).wait()) + + sumOfStreamIDs += streamID + try! interactInMemory(clientChannel, serverChannel) + try! child.closeFuture.wait() + + try! clientChannel.close().wait() + try! serverChannel.close().wait() + } + + return sumOfStreamIDs + } +} + diff --git a/Sources/NIOHTTP2/HTTP2StreamChannel.swift b/Sources/NIOHTTP2/HTTP2StreamChannel.swift index 4ee2616d..943f90bd 100644 --- a/Sources/NIOHTTP2/HTTP2StreamChannel.swift +++ b/Sources/NIOHTTP2/HTTP2StreamChannel.swift @@ -549,13 +549,13 @@ internal extension HTTP2StreamChannel { /// - frame: The `HTTP2Frame` to send to the network. /// - promise: The promise associated with the frame write. private func receiveOutboundFrame(_ frame: HTTP2Frame, promise: EventLoopPromise?) { - guard let parent = self.parent, self.state != .closed else { + guard self.state != .closed else { let error = ChannelError.alreadyClosed promise?.fail(error) self.errorEncountered(error: error) return } - parent.write(frame, promise: promise) + self.multiplexer.childChannelWrite(frame, promise: promise) } /// Called when a stream closure is received from the network. @@ -576,7 +576,7 @@ internal extension HTTP2StreamChannel { let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment)) self.receiveOutboundFrame(frame, promise: nil) // This flush should really go away, but we need it for now until we sort out window management. - self.parent?.flush() + self.multiplexer.childChannelFlush() } } @@ -585,7 +585,7 @@ internal extension HTTP2StreamChannel { let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment)) self.receiveOutboundFrame(frame, promise: nil) // This flush should really go away, but we need it for now until we sort out window management. - self.parent?.flush() + self.multiplexer.childChannelFlush() } } } diff --git a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift index ed61a114..09b335d1 100644 --- a/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift +++ b/Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift @@ -30,6 +30,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun private var streams: [HTTP2StreamID: HTTP2StreamChannel] = [:] private let inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture)? private let channel: Channel + private var context: ChannelHandlerContext! private var nextOutboundStreamID: HTTP2StreamID private var connectionFlowControlManager: InboundWindowManager @@ -37,6 +38,11 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun // We now need to check that we're on the same event loop as the one we were originally given. // If we weren't, this is a hard failure, as there is a thread-safety issue here. self.channel.eventLoop.preconditionInEventLoop() + self.context = context + } + + public func handlerRemoved(context: ChannelHandlerContext) { + self.context = nil } public func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -122,10 +128,6 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun context.fireUserInboundEventTriggered(event) } - internal func childChannelClosed(streamID: HTTP2StreamID) { - self.streams.removeValue(forKey: streamID) - } - private func newConnectionWindowSize(newSize: Int, context: ChannelHandlerContext) { guard let increment = self.connectionFlowControlManager.newWindowSize(newSize) else { return @@ -189,3 +191,19 @@ extension HTTP2StreamMultiplexer { } } } + + +// MARK:- Child to parent calls +extension HTTP2StreamMultiplexer { + internal func childChannelClosed(streamID: HTTP2StreamID) { + self.streams.removeValue(forKey: streamID) + } + + internal func childChannelWrite(_ frame: HTTP2Frame, promise: EventLoopPromise?) { + self.context.write(self.wrapOutboundOut(frame), promise: promise) + } + + internal func childChannelFlush() { + self.context.flush() + } +} diff --git a/docker/docker-compose.1804.50.yaml b/docker/docker-compose.1804.50.yaml index 0fc107c1..18986c33 100644 --- a/docker/docker-compose.1804.50.yaml +++ b/docker/docker-compose.1804.50.yaml @@ -15,6 +15,7 @@ services: environment: - MAX_ALLOCS_ALLOWED_create_client_stream_channel=70010 - MAX_ALLOCS_ALLOWED_hpack_decoding=5050 + - MAX_ALLOCS_ALLOWED_client_server_request_response=360000 test: image: swift-nio-http2:18.04-5.0 @@ -22,6 +23,7 @@ services: environment: - MAX_ALLOCS_ALLOWED_create_client_stream_channel=70010 - MAX_ALLOCS_ALLOWED_hpack_decoding=5050 + - MAX_ALLOCS_ALLOWED_client_server_request_response=360000 h2spec: image: swift-nio-http2:18.04-5.0