Skip to content

Commit

Permalink
Remove indirection between channel and multiplexer
Browse files Browse the repository at this point in the history
Motivation:

Previously the HTTP2StreamChannel forwarded all frames to the parent
Channel by way of the abstract Channel existential API. While this was
straightforward, it added unnecessary overhead due to the extra pipeline
steps and the extra indirection of the types providing an inlining
barrier.

Given that in #116 we gave the HTTP2StreamChannel a reference to its
parent multiplexer, we may as well take advantage of this and allow
direct calls from the stream channel into the multiplexer.

Modifications:

Avoided calls to parent.write and parent.flush.
Replaced them with a new interface.
Added new allocation test for this path.

Result:

Slightly better performance.
  • Loading branch information
Lukasa committed May 17, 2019
1 parent 8fb84b2 commit 9bedd22
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIO
import NIOHPACK
import NIOHTTP1
import NIOHTTP2

/// Have two `EmbeddedChannel` objects send and receive data from each other until
/// they make no forward progress.
func interactInMemory(_ first: EmbeddedChannel, _ second: EmbeddedChannel) throws {
var operated: Bool

func readBytesFromChannel(_ channel: EmbeddedChannel) throws -> ByteBuffer? {
return try channel.readOutbound(as: ByteBuffer.self)
}

repeat {
operated = false
first.embeddedEventLoop.run()

if let data = try readBytesFromChannel(first) {
operated = true
try second.writeInbound(data)
}
if let data = try readBytesFromChannel(second) {
operated = true
try first.writeInbound(data)
}
} while operated
}

final class ServerHandler: ChannelInboundHandler {
typealias InboundIn = HTTP2Frame
typealias OutboundOut = HTTP2Frame

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let data = self.unwrapInboundIn(data)
switch data.payload {
case .headers(let headers) where headers.endStream:
break
case .data(let data) where data.endStream:
break
default:
// Ignore this frame
return
}

// We got END_STREAM. Let's send a response.
let headers = HPACKHeaders([(":status", "200")])
let responseFrame = HTTP2Frame(streamID: data.streamID, payload: .headers(.init(headers: headers, endStream: true)))
context.writeAndFlush(self.wrapOutboundOut(responseFrame), promise: nil)
}
}


final class ClientHandler: ChannelInboundHandler {
typealias InboundIn = HTTP2Frame
typealias OutboundOut = HTTP2Frame

let streamID: HTTP2StreamID

init(streamID: HTTP2StreamID) {
self.streamID = streamID
}

func channelActive(context: ChannelHandlerContext) {
// Send a request.
let headers = HPACKHeaders([(":path", "/"),
(":authority", "localhost"),
(":method", "GET"),
(":scheme", "https")])
let requestFrame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers, endStream: true)))
context.writeAndFlush(self.wrapOutboundOut(requestFrame), promise: nil)
}
}

func run(identifier: String) {
let loop = EmbeddedEventLoop()

measure(identifier: identifier) {
var sumOfStreamIDs = 0

for _ in 0..<1000 {
let clientChannel = EmbeddedChannel(loop: loop)
let serverChannel = EmbeddedChannel(loop: loop)

let clientMultiplexer = try! clientChannel.configureHTTP2Pipeline(mode: .client).wait()
_ = try! serverChannel.configureHTTP2Pipeline(mode: .server) { (channel, streamID) in
return channel.pipeline.addHandler(ServerHandler())
}.wait()
try! clientChannel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait()
try! serverChannel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait()

let promise = clientChannel.eventLoop.makePromise(of: Channel.self)
clientMultiplexer.createStreamChannel(promise: promise) { (channel, streamID) in
return channel.pipeline.addHandler(ClientHandler(streamID: streamID))
}
clientChannel.embeddedEventLoop.run()
let child = try! promise.futureResult.wait()
let streamID = try! Int(child.getOption(HTTP2StreamChannelOptions.streamID).wait())

sumOfStreamIDs += streamID
try! interactInMemory(clientChannel, serverChannel)
try! child.closeFuture.wait()

try! clientChannel.close().wait()
try! serverChannel.close().wait()
}

return sumOfStreamIDs
}
}

8 changes: 4 additions & 4 deletions Sources/NIOHTTP2/HTTP2StreamChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -549,13 +549,13 @@ internal extension HTTP2StreamChannel {
/// - frame: The `HTTP2Frame` to send to the network.
/// - promise: The promise associated with the frame write.
private func receiveOutboundFrame(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) {
guard let parent = self.parent, self.state != .closed else {
guard self.state != .closed else {
let error = ChannelError.alreadyClosed
promise?.fail(error)
self.errorEncountered(error: error)
return
}
parent.write(frame, promise: promise)
self.multiplexer.childChannelWrite(frame, promise: promise)
}

/// Called when a stream closure is received from the network.
Expand All @@ -576,7 +576,7 @@ internal extension HTTP2StreamChannel {
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
self.receiveOutboundFrame(frame, promise: nil)
// This flush should really go away, but we need it for now until we sort out window management.
self.parent?.flush()
self.multiplexer.childChannelFlush()
}
}

Expand All @@ -585,7 +585,7 @@ internal extension HTTP2StreamChannel {
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
self.receiveOutboundFrame(frame, promise: nil)
// This flush should really go away, but we need it for now until we sort out window management.
self.parent?.flush()
self.multiplexer.childChannelFlush()
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
private var streams: [HTTP2StreamID: HTTP2StreamChannel] = [:]
private let inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)?
private let channel: Channel
private var context: ChannelHandlerContext!
private var nextOutboundStreamID: HTTP2StreamID
private var connectionFlowControlManager: InboundWindowManager

public func handlerAdded(context: ChannelHandlerContext) {
// We now need to check that we're on the same event loop as the one we were originally given.
// If we weren't, this is a hard failure, as there is a thread-safety issue here.
self.channel.eventLoop.preconditionInEventLoop()
self.context = context
}

public func handlerRemoved(context: ChannelHandlerContext) {
self.context = nil
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
Expand Down Expand Up @@ -122,10 +128,6 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
context.fireUserInboundEventTriggered(event)
}

internal func childChannelClosed(streamID: HTTP2StreamID) {
self.streams.removeValue(forKey: streamID)
}

private func newConnectionWindowSize(newSize: Int, context: ChannelHandlerContext) {
guard let increment = self.connectionFlowControlManager.newWindowSize(newSize) else {
return
Expand Down Expand Up @@ -189,3 +191,19 @@ extension HTTP2StreamMultiplexer {
}
}
}


// MARK:- Child to parent calls
extension HTTP2StreamMultiplexer {
internal func childChannelClosed(streamID: HTTP2StreamID) {
self.streams.removeValue(forKey: streamID)
}

internal func childChannelWrite(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) {
self.context.write(self.wrapOutboundOut(frame), promise: promise)
}

internal func childChannelFlush() {
self.context.flush()
}
}
2 changes: 2 additions & 0 deletions docker/docker-compose.1804.50.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ services:
environment:
- MAX_ALLOCS_ALLOWED_create_client_stream_channel=70010
- MAX_ALLOCS_ALLOWED_hpack_decoding=5050
- MAX_ALLOCS_ALLOWED_client_server_request_response=360000

test:
image: swift-nio-http2:18.04-5.0
command: /bin/bash -cl "swift test -Xswiftc -warnings-as-errors && ./scripts/integration_tests.sh"
environment:
- MAX_ALLOCS_ALLOWED_create_client_stream_channel=70010
- MAX_ALLOCS_ALLOWED_hpack_decoding=5050
- MAX_ALLOCS_ALLOWED_client_server_request_response=360000

h2spec:
image: swift-nio-http2:18.04-5.0
Expand Down

0 comments on commit 9bedd22

Please sign in to comment.