Skip to content

Commit

Permalink
Support protobuf separately to GRPCPayload for the client
Browse files Browse the repository at this point in the history
Motivation:

To support payloads other than `SwiftProtobuf.Message` we required that
all messages conform to `GRPCPayload`. For protobuf messages we added
`GRPCProtobufPayload` which provides a default implemenation of
`GRPCPayload` for protobuf messages. We generated this conformance for
all protobuf messages we saw. This lead to a number issues and
workarounds including: grpc#738, grpc#778, grpc#801, grpc#837, grpc#877, grpc#881.

The intention is to continue to support `GRPCPayload` in addition to
protobuf, however, support for protobuf will not be via the
`GRPCProtobufPayload` protocol.

This PR builds on grpc#886 by increasing the surface area of the client APIs
so that they are not constrained to `GRPCPayload`. The surface API now
has variants for `GRPCPayload` and `SwiftProtobuf.Message`. Internally
the client deals with serializers and deserializers.

Modifications:

- `GRPCClientChannelHandler` and `GRPCClientStateMachine` are no longer
  generic over a request and response type, rather they deal with the
  serialzed version of requests and response (i.e. `ByteBuffer`s) and
  defer the (de/)serialization to a separate handler.
- Added `GRCPClientCodecHandler` to handle (de/)serialization of
  messages
- Clients are no longer constrained to having their request/response
  payloads conform to `GRPCPayload`
- Conformance to `GRPCProtobufPayload` is no longer generated and the
  protocol is deprecated and has no requirements.
- Drop the 'GenerateConformance' option from the codegen since it is no
  longer required
- Reintroduce a filter to the codegen so that we only consider files
  which contain services, this avoids generating empty files
- Regenerate code where necessary

Result:

- `GRPCProtobufPayload` is no longer required
  • Loading branch information
glbrntt committed Jul 14, 2020
1 parent b29b16b commit 2b29c28
Show file tree
Hide file tree
Showing 31 changed files with 793 additions and 392 deletions.
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import Logging
///
/// Messages should be sent via the `sendMessage` and `sendMessages` methods; the stream of messages
/// must be terminated by calling `sendEnd` to indicate the final message has been sent.
public final class BidirectionalStreamingCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: StreamingRequestClientCall {
public final class BidirectionalStreamingCall<RequestPayload, ResponsePayload>: StreamingRequestClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -147,16 +144,20 @@ public final class BidirectionalStreamingCall<
}

extension BidirectionalStreamingCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<S: MessageSerializer, D: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: S,
deserializer: D,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> {
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: responseHandler),
callType: .bidirectionalStreaming,
timeLimit: callOptions.timeLimit,
Expand All @@ -167,12 +168,14 @@ extension BidirectionalStreamingCall {
return BidirectionalStreamingCall(transport: transport, options: callOptions)
}

internal static func make(
internal static func make<S: MessageSerializer, D: MessageDeserializer>(
serializer: S,
deserializer: D,
fakeResponse: FakeStreamingResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> {
) -> BidirectionalStreamingCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, streamingResponseHandler: responseHandler)

Expand Down
8 changes: 4 additions & 4 deletions Sources/GRPC/ClientCalls/ClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import SwiftProtobuf
/// Base protocol for a client call to a gRPC service.
public protocol ClientCall {
/// The type of the request message for the call.
associatedtype RequestPayload: GRPCPayload
associatedtype RequestPayload
/// The type of the response message for the call.
associatedtype ResponsePayload: GRPCPayload
associatedtype ResponsePayload

/// The event loop this call is running on.
var eventLoop: EventLoop { get }
Expand Down Expand Up @@ -159,7 +159,7 @@ public protocol UnaryResponseClientCall: ClientCall {
// a NIO HTTP/2 stream channel.

internal protocol ClientCallInbound {
associatedtype Response: GRPCPayload
associatedtype Response
typealias ResponsePart = _GRPCClientResponsePart<Response>

/// Receive an error.
Expand All @@ -170,7 +170,7 @@ internal protocol ClientCallInbound {
}

internal protocol ClientCallOutbound {
associatedtype Request: GRPCPayload
associatedtype Request
typealias RequestPart = _GRPCClientRequestPart<Request>

/// Send a single request part and complete the promise once the part has been sent.
Expand Down
11 changes: 7 additions & 4 deletions Sources/GRPC/ClientCalls/ClientCallTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import Logging
///```
///
/// Note: the "main" pipeline provided by the channel in `ClientConnection`.
internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
internal class ChannelTransport<Request, Response> {
internal typealias RequestPart = _GRPCClientRequestPart<Request>
internal typealias ResponsePart = _GRPCClientResponsePart<Response>

Expand Down Expand Up @@ -148,14 +148,16 @@ internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
channelProvider(self, channelPromise)
}

internal convenience init(
internal convenience init<S: MessageSerializer, D: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: S,
deserializer: D,
responseContainer: ResponsePartContainer<Response>,
callType: GRPCCallType,
timeLimit: TimeLimit,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) {
) where S.Input == Request, D.Output == Response {
self.init(
eventLoop: multiplexer.eventLoop,
responseContainer: responseContainer,
Expand All @@ -168,7 +170,8 @@ internal class ChannelTransport<Request: GRPCPayload, Response: GRPCPayload> {
case .success(let mux):
mux.createStreamChannel(promise: streamPromise) { stream, streamID in
stream.pipeline.addHandlers([
_GRPCClientChannelHandler<Request, Response>(streamID: streamID, callType: callType, logger: logger),
_GRPCClientChannelHandler(streamID: streamID, callType: callType, logger: logger),
GRPCClientCodecHandler(serializer: serializer, deserializer: deserializer),
GRPCClientCallHandler(call: call)
])
}
Expand Down
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/ClientStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import Logging
///
/// Messages should be sent via the `sendMessage` and `sendMessages` methods; the stream of messages
/// must be terminated by calling `sendEnd` to indicate the final message has been sent.
public final class ClientStreamingCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
> : StreamingRequestClientCall, UnaryResponseClientCall {
public final class ClientStreamingCall<RequestPayload, ResponsePayload>: StreamingRequestClientCall, UnaryResponseClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -152,16 +149,20 @@ public final class ClientStreamingCall<
}

extension ClientStreamingCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<S: MessageSerializer, D: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: S,
deserializer: D,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> ClientStreamingCall<RequestPayload, ResponsePayload> {
) -> ClientStreamingCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, unaryResponsePromise: responsePromise),
callType: .clientStreaming,
timeLimit: callOptions.timeLimit,
Expand All @@ -171,11 +172,13 @@ extension ClientStreamingCall {
return ClientStreamingCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}

internal static func make(
internal static func make<S: MessageSerializer, D: MessageDeserializer>(
serializer: S,
deserializer: D,
fakeResponse: FakeUnaryResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger
) -> ClientStreamingCall<RequestPayload, ResponsePayload> {
) -> ClientStreamingCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, unaryResponsePromise: responsePromise)
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/ClientCalls/GRPCClientCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import NIO

/// An inbound channel handler which forwards events and messages to a client call.
internal class GRPCClientCallHandler<Request: GRPCPayload, Response: GRPCPayload>: ChannelInboundHandler {
internal class GRPCClientCallHandler<Request, Response>: ChannelInboundHandler {
typealias InboundIn = _GRPCClientResponsePart<Response>
private var call: ChannelTransport<Request, Response>

Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/ClientCalls/ResponsePartContainer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import NIO
import NIOHPACK

/// A container for RPC response parts.
internal struct ResponsePartContainer<Response: GRPCPayload> {
internal struct ResponsePartContainer<Response> {
/// The type of handler for response message part.
enum ResponseHandler {
case unary(EventLoopPromise<Response>)
Expand Down
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/ServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import Logging

/// A server-streaming gRPC call. The request is sent on initialization, each response is passed to
/// the provided observer block.
public final class ServerStreamingCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: ClientCall {
public final class ServerStreamingCall<RequestPayload, ResponsePayload>: ClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -93,16 +90,20 @@ public final class ServerStreamingCall<
}

extension ServerStreamingCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<S: MessageSerializer, D: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: S,
deserializer: D,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> ServerStreamingCall<RequestPayload, ResponsePayload> {
) -> ServerStreamingCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, streamingResponseHandler: responseHandler),
callType: .serverStreaming,
timeLimit: callOptions.timeLimit,
Expand All @@ -113,12 +114,14 @@ extension ServerStreamingCall {
return ServerStreamingCall(transport: transport, options: callOptions)
}

internal static func make(
internal static func make<S: MessageSerializer, D: MessageDeserializer>(
serializer: S,
deserializer: D,
fakeResponse: FakeStreamingResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger,
responseHandler: @escaping (ResponsePayload) -> Void
) -> ServerStreamingCall<RequestPayload, ResponsePayload> {
) -> ServerStreamingCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, streamingResponseHandler: responseHandler)

Expand Down
19 changes: 11 additions & 8 deletions Sources/GRPC/ClientCalls/UnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import NIOHPACK
import Logging

/// A unary gRPC call. The request is sent on initialization.
public final class UnaryCall<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: UnaryResponseClientCall {
public final class UnaryCall<RequestPayload, ResponsePayload>: UnaryResponseClientCall {
private let transport: ChannelTransport<RequestPayload, ResponsePayload>

/// The options used to make the RPC.
Expand Down Expand Up @@ -100,16 +97,20 @@ public final class UnaryCall<
}

extension UnaryCall {
internal static func makeOnHTTP2Stream(
internal static func makeOnHTTP2Stream<S: MessageSerializer, D: MessageDeserializer>(
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
serializer: S,
deserializer: D,
callOptions: CallOptions,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> UnaryCall<RequestPayload, ResponsePayload> {
) -> UnaryCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = multiplexer.eventLoop
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let transport = ChannelTransport<RequestPayload, ResponsePayload>(
multiplexer: multiplexer,
serializer: serializer,
deserializer: deserializer,
responseContainer: .init(eventLoop: eventLoop, unaryResponsePromise: responsePromise),
callType: .unary,
timeLimit: callOptions.timeLimit,
Expand All @@ -119,11 +120,13 @@ extension UnaryCall {
return UnaryCall(response: responsePromise.futureResult, transport: transport, options: callOptions)
}

internal static func make(
internal static func make<S: MessageSerializer, D: MessageDeserializer>(
serializer: S,
deserializer: D,
fakeResponse: FakeUnaryResponse<RequestPayload, ResponsePayload>?,
callOptions: CallOptions,
logger: Logger
) -> UnaryCall<RequestPayload, ResponsePayload> {
) -> UnaryCall<RequestPayload, ResponsePayload> where S.Input == RequestPayload, D.Output == ResponsePayload {
let eventLoop = fakeResponse?.channel.eventLoop ?? EmbeddedEventLoop()
let responsePromise: EventLoopPromise<ResponsePayload> = eventLoop.makePromise()
let responseContainer = ResponsePartContainer(eventLoop: eventLoop, unaryResponsePromise: responsePromise)
Expand Down
Loading

0 comments on commit 2b29c28

Please sign in to comment.