Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit fewer flushes when handling RPCs on the server #1110

Merged
merged 1 commit into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ public final class BidirectionalStreamingServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)

case let .message(message, metadata):
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public final class ClientStreamingServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)

case let .message(message, metadata):
do {
Expand Down
3 changes: 2 additions & 1 deletion Sources/GRPC/CallHandlers/ServerHandlerProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ internal protocol GRPCServerResponseWriter {
/// Send the initial response metadata.
/// - Parameters:
/// - metadata: The user-provided metadata to send to the client.
/// - flush: Whether a flush should be emitted after writing the metadata.
/// - promise: A promise to complete once the metadata has been handled.
func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?)
func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?)

/// Send the serialized bytes of a response message.
/// - Parameters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public final class ServerStreamingServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise)

case let .message(message, metadata):
do {
Expand Down
3 changes: 2 additions & 1 deletion Sources/GRPC/CallHandlers/UnaryServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ public final class UnaryServerHandler<
) {
switch part {
case let .metadata(headers):
self.context.responseWriter.sendMetadata(headers, promise: promise)
// We can delay this flush until the end of the RPC.
self.context.responseWriter.sendMetadata(headers, flush: false, promise: promise)

case let .message(message, metadata):
do {
Expand Down
46 changes: 22 additions & 24 deletions Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer

switch responsePart {
case let .metadata(headers):
self.sendMetadata(headers, promise: promise)
// We're in 'write' so we're using the old type of RPC handler which emits its own flushes,
// no need to emit an extra one.
self.sendMetadata(headers, flush: false, promise: promise)

case let .message(buffer, metadata):
self.sendMessage(buffer, metadata: metadata, promise: promise)
Expand All @@ -181,13 +183,7 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
}

internal func flush(context: ChannelHandlerContext) {
if self.isReading {
// We're already reading; record the flush and emit it when the read completes.
self.flushPending = true
} else {
// Not reading: flush now.
context.flush()
}
self.markFlushPoint()
}

/// Called when the pipeline has finished configuring.
Expand Down Expand Up @@ -280,17 +276,15 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer

internal func sendMetadata(
_ headers: HPACKHeaders,
flush: Bool,
promise: EventLoopPromise<Void>?
) {
switch self.state.send(headers: headers) {
case let .success(headers):
let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers))
self.context.write(self.wrapOutboundOut(payload), promise: promise)

if self.isReading {
self.flushPending = true
} else {
self.context.flush()
if flush {
self.markFlushPoint()
}

case let .failure(error):
Expand All @@ -313,11 +307,8 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
case let .success(buffer):
let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer)))
self.context.write(self.wrapOutboundOut(payload), promise: promise)

if self.isReading {
self.flushPending = true
} else {
self.context.flush()
if metadata.flush {
self.markFlushPoint()
}

case let .failure(error):
Expand All @@ -335,15 +326,22 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer
// Always end stream for status and trailers.
let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true))
self.context.write(self.wrapOutboundOut(payload), promise: promise)

if self.isReading {
self.flushPending = true
} else {
self.context.flush()
}
// We'll always flush on end.
self.markFlushPoint()

case let .failure(error):
promise?.fail(error)
}
}

/// Mark a flush as pending - to be emitted once the read completes - if we're currently reading,
/// or emit a flush now if we are not.
private func markFlushPoint() {
if self.isReading {
self.flushPending = true
} else {
self.flushPending = false
self.context.flush()
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
2 changes: 1 addition & 1 deletion Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ extension ServerMessageEncoding {
}

class NoOpResponseWriter: GRPCServerResponseWriter {
func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?) {
func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
promise?.succeed(())
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/GRPCTests/UnaryServerHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ final class ResponseRecorder: GRPCServerResponseWriter {
var status: GRPCStatus?
var trailers: HPACKHeaders?

func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise<Void>?) {
func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise<Void>?) {
XCTAssertNil(self.metadata)
self.metadata = metadata
promise?.succeed(())
Expand Down