diff --git a/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift b/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift index 038faabf3..e715aa5ce 100644 --- a/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift +++ b/Sources/GRPC/CallHandlers/BidirectionalStreamingServerHandler.swift @@ -353,7 +353,7 @@ public final class BidirectionalStreamingServerHandler< ) { switch part { case let .metadata(headers): - self.context.responseWriter.sendMetadata(headers, promise: promise) + self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise) case let .message(message, metadata): do { diff --git a/Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift b/Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift index 391cd5b5d..7fcdc5eef 100644 --- a/Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift +++ b/Sources/GRPC/CallHandlers/ClientStreamingServerHandler.swift @@ -339,7 +339,7 @@ public final class ClientStreamingServerHandler< ) { switch part { case let .metadata(headers): - self.context.responseWriter.sendMetadata(headers, promise: promise) + self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise) case let .message(message, metadata): do { diff --git a/Sources/GRPC/CallHandlers/ServerHandlerProtocol.swift b/Sources/GRPC/CallHandlers/ServerHandlerProtocol.swift index 58fcb0be8..bd425f018 100644 --- a/Sources/GRPC/CallHandlers/ServerHandlerProtocol.swift +++ b/Sources/GRPC/CallHandlers/ServerHandlerProtocol.swift @@ -49,8 +49,9 @@ internal protocol GRPCServerResponseWriter { /// Send the initial response metadata. /// - Parameters: /// - metadata: The user-provided metadata to send to the client. + /// - flush: Whether a flush should be emitted after writing the metadata. /// - promise: A promise to complete once the metadata has been handled. - func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise?) + func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise?) /// Send the serialized bytes of a response message. /// - Parameters: diff --git a/Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift b/Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift index 56a8afaed..2e9a91462 100644 --- a/Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift +++ b/Sources/GRPC/CallHandlers/ServerStreamingServerHandler.swift @@ -279,7 +279,7 @@ public final class ServerStreamingServerHandler< ) { switch part { case let .metadata(headers): - self.context.responseWriter.sendMetadata(headers, promise: promise) + self.context.responseWriter.sendMetadata(headers, flush: true, promise: promise) case let .message(message, metadata): do { diff --git a/Sources/GRPC/CallHandlers/UnaryServerHandler.swift b/Sources/GRPC/CallHandlers/UnaryServerHandler.swift index 8578b5da9..d4ed37a29 100644 --- a/Sources/GRPC/CallHandlers/UnaryServerHandler.swift +++ b/Sources/GRPC/CallHandlers/UnaryServerHandler.swift @@ -262,7 +262,8 @@ public final class UnaryServerHandler< ) { switch part { case let .metadata(headers): - self.context.responseWriter.sendMetadata(headers, promise: promise) + // We can delay this flush until the end of the RPC. + self.context.responseWriter.sendMetadata(headers, flush: false, promise: promise) case let .message(message, metadata): do { diff --git a/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift b/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift index b8d29d98f..3dbde9c25 100644 --- a/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift +++ b/Sources/GRPC/HTTP2ToRawGRPCServerCodec.swift @@ -170,7 +170,9 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer switch responsePart { case let .metadata(headers): - self.sendMetadata(headers, promise: promise) + // We're in 'write' so we're using the old type of RPC handler which emits its own flushes, + // no need to emit an extra one. + self.sendMetadata(headers, flush: false, promise: promise) case let .message(buffer, metadata): self.sendMessage(buffer, metadata: metadata, promise: promise) @@ -181,13 +183,7 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer } internal func flush(context: ChannelHandlerContext) { - if self.isReading { - // We're already reading; record the flush and emit it when the read completes. - self.flushPending = true - } else { - // Not reading: flush now. - context.flush() - } + self.markFlushPoint() } /// Called when the pipeline has finished configuring. @@ -280,17 +276,15 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer internal func sendMetadata( _ headers: HPACKHeaders, + flush: Bool, promise: EventLoopPromise? ) { switch self.state.send(headers: headers) { case let .success(headers): let payload = HTTP2Frame.FramePayload.headers(.init(headers: headers)) self.context.write(self.wrapOutboundOut(payload), promise: promise) - - if self.isReading { - self.flushPending = true - } else { - self.context.flush() + if flush { + self.markFlushPoint() } case let .failure(error): @@ -313,11 +307,8 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer case let .success(buffer): let payload = HTTP2Frame.FramePayload.data(.init(data: .byteBuffer(buffer))) self.context.write(self.wrapOutboundOut(payload), promise: promise) - - if self.isReading { - self.flushPending = true - } else { - self.context.flush() + if metadata.flush { + self.markFlushPoint() } case let .failure(error): @@ -335,15 +326,22 @@ internal final class HTTP2ToRawGRPCServerCodec: ChannelDuplexHandler, GRPCServer // Always end stream for status and trailers. let payload = HTTP2Frame.FramePayload.headers(.init(headers: trailers, endStream: true)) self.context.write(self.wrapOutboundOut(payload), promise: promise) - - if self.isReading { - self.flushPending = true - } else { - self.context.flush() - } + // We'll always flush on end. + self.markFlushPoint() case let .failure(error): promise?.fail(error) } } + + /// Mark a flush as pending - to be emitted once the read completes - if we're currently reading, + /// or emit a flush now if we are not. + private func markFlushPoint() { + if self.isReading { + self.flushPending = true + } else { + self.flushPending = false + self.context.flush() + } + } } diff --git a/Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift b/Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift index 19ef99066..6fe909333 100644 --- a/Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift +++ b/Tests/GRPCTests/HTTP2ToRawGRPCStateMachineTests.swift @@ -656,7 +656,7 @@ extension ServerMessageEncoding { } class NoOpResponseWriter: GRPCServerResponseWriter { - func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise?) { + func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise?) { promise?.succeed(()) } diff --git a/Tests/GRPCTests/UnaryServerHandlerTests.swift b/Tests/GRPCTests/UnaryServerHandlerTests.swift index 9852aa479..7f34807f3 100644 --- a/Tests/GRPCTests/UnaryServerHandlerTests.swift +++ b/Tests/GRPCTests/UnaryServerHandlerTests.swift @@ -27,7 +27,7 @@ final class ResponseRecorder: GRPCServerResponseWriter { var status: GRPCStatus? var trailers: HPACKHeaders? - func sendMetadata(_ metadata: HPACKHeaders, promise: EventLoopPromise?) { + func sendMetadata(_ metadata: HPACKHeaders, flush: Bool, promise: EventLoopPromise?) { XCTAssertNil(self.metadata) self.metadata = metadata promise?.succeed(())