Skip to content

Commit

Permalink
Remove the requirement that messages conform to GRPCPayload on the se…
Browse files Browse the repository at this point in the history
…rver (grpc#886)

Motivation:

To support payloads other than `SwiftProtobuf.Message` we required that
all messages conform to `GRPCPayload`. For protobuf messages we added
`GRPCProtobufPayload` which provides a default implemenation of
`GRPCPayload` for protobuf messages. We generated this conformance for
all protobuf messages we saw. This lead to a number issues and
workarounds including: grpc#738, grpc#778, grpc#801, grpc#837, grpc#877, grpc#881.

The intention is to continue to support `GRPCPayload` in addition to
protobuf, however, support for protobuf will not be via the
`GRPCProtobufPayload` protocol.

This PR adjusts server components such that they are not constrained to
`GRPCPayload`. At the moment only `SwiftProtobuf.Message` is supported.
Once the client side has had the same treatment and
`GRPCProtobufPayload` no longer inherits from `SwiftProtobuf.Message`,
support for `GRPCPayload` will be added back.

Modifications:

- The `HTTP1ToGRPCServerCodec` has had the message encoding and decoding
  removed. It now deals in `ByteBuffer`s rather than request/response
  messages.
- An additional `GRPCServerCodecHandler` which sits between the
  `HTTP1ToGRPCServerCodec` and `_BaseCallHandler` has been added which
  serializes/deserializes messages.
- Custom payload tests have been commented out. They will return when
  the transition has completed.

Result:

- Servers only support SwiftProtobuf
- Genertic constraints on the server have been removed; the constraints
  are place on the `init` of public handlers instead.
- `GRPCProtobufPayload` is no longer required on the server.
  • Loading branch information
glbrntt authored Jul 13, 2020
1 parent 9b9d3db commit 6fb9826
Show file tree
Hide file tree
Showing 31 changed files with 855 additions and 382 deletions.
8 changes: 4 additions & 4 deletions Sources/Examples/Echo/Model/echo.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,26 @@ extension Echo_EchoProvider {
public func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler? {
switch methodName {
case "Get":
return UnaryCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeUnary(callHandlerContext: callHandlerContext) { context in
return { request in
self.get(request: request, context: context)
}
}

case "Expand":
return ServerStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeServerStreaming(callHandlerContext: callHandlerContext) { context in
return { request in
self.expand(request: request, context: context)
}
}

case "Collect":
return ClientStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeClientStreaming(callHandlerContext: callHandlerContext) { context in
return self.collect(context: context)
}

case "Update":
return BidirectionalStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeBidirectionalStreaming(callHandlerContext: callHandlerContext) { context in
return self.update(context: context)
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/Examples/HelloWorld/Model/helloworld.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ extension Helloworld_GreeterProvider {
public func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler? {
switch methodName {
case "SayHello":
return UnaryCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeUnary(callHandlerContext: callHandlerContext) { context in
return { request in
self.sayHello(request: request, context: context)
}
Expand Down
8 changes: 4 additions & 4 deletions Sources/Examples/RouteGuide/Model/route_guide.grpc.swift
Original file line number Diff line number Diff line change
Expand Up @@ -195,26 +195,26 @@ extension Routeguide_RouteGuideProvider {
public func handleMethod(_ methodName: String, callHandlerContext: CallHandlerContext) -> GRPCCallHandler? {
switch methodName {
case "GetFeature":
return UnaryCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeUnary(callHandlerContext: callHandlerContext) { context in
return { request in
self.getFeature(request: request, context: context)
}
}

case "ListFeatures":
return ServerStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeServerStreaming(callHandlerContext: callHandlerContext) { context in
return { request in
self.listFeatures(request: request, context: context)
}
}

case "RecordRoute":
return ClientStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeClientStreaming(callHandlerContext: callHandlerContext) { context in
return self.recordRoute(context: context)
}

case "RouteChat":
return BidirectionalStreamingCallHandler(callHandlerContext: callHandlerContext) { context in
return CallHandlerFactory.makeBidirectionalStreaming(callHandlerContext: callHandlerContext) { context in
return self.routeChat(context: context)
}

Expand Down
19 changes: 9 additions & 10 deletions Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import Logging
/// If the framework user wants to return a call error (e.g. in case of authentication failure),
/// they can fail the observer block future.
/// - To close the call and send the status, complete `context.statusPromise`.
public class BidirectionalStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public class BidirectionalStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias Context = StreamingResponseCallContext<ResponsePayload>
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
Expand All @@ -39,15 +36,17 @@ public class BidirectionalStreamingCallHandler<

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventLoopFuture<EventObserver>
) {
// Delay the creation of the event observer until we actually get a request head, otherwise it
// would be possible for the observer to write into the pipeline (by completing the status
// promise) before the pipeline is configured.
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
81 changes: 81 additions & 0 deletions Sources/GRPC/CallHandlers/CallHandlerFactory.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIO
import SwiftProtobuf

// We can't use a 'where' clause on 'init's to constrain the generic requirements of a type. Instead
// we'll use static methods on this factory.
public enum CallHandlerFactory {
public typealias UnaryContext<Response> = UnaryResponseCallContext<Response>
public typealias UnaryEventObserver<Request, Response> = (Request) -> EventLoopFuture<Response>

public static func makeUnary<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (UnaryContext<Response>) -> UnaryEventObserver<Request, Response>
) -> UnaryCallHandler<Request, Response> {
return UnaryCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}

public typealias ClientStreamingContext<Response> = UnaryResponseCallContext<Response>
public typealias ClientStreamingEventObserver<Request> = EventLoopFuture<(StreamEvent<Request>) -> Void>

public static func makeClientStreaming<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (ClientStreamingContext<Response>) -> ClientStreamingEventObserver<Request>
) -> ClientStreamingCallHandler<Request, Response> {
return ClientStreamingCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}

public typealias ServerStreamingContext<Response> = StreamingResponseCallContext<Response>
public typealias ServerStreamingEventObserver<Request> = (Request) -> EventLoopFuture<GRPCStatus>

public static func makeServerStreaming<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (ServerStreamingContext<Response>) -> ServerStreamingEventObserver<Request>
) -> ServerStreamingCallHandler<Request, Response> {
return ServerStreamingCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}

public typealias BidirectionalStreamingContext<Response> = StreamingResponseCallContext<Response>
public typealias BidirectionalStreamingEventObserver<Request> = EventLoopFuture<(StreamEvent<Request>) -> Void>

public static func makeBidirectionalStreaming<Request: Message, Response: Message>(
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (BidirectionalStreamingContext<Response>) -> BidirectionalStreamingEventObserver<Request>
) -> BidirectionalStreamingCallHandler<Request, Response> {
return BidirectionalStreamingCallHandler(
serializer: ProtobufSerializer(),
deserializer: ProtobufDeserializer(),
callHandlerContext: callHandlerContext,
eventObserverFactory: eventObserverFactory
)
}
}
17 changes: 11 additions & 6 deletions Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ import Logging
/// If the framework user wants to return a call error (e.g. in case of authentication failure),
/// they can fail the observer block future.
/// - To close the call and send the response, complete `context.responsePromise`.
public final class ClientStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class ClientStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias Context = UnaryResponseCallContext<ResponsePayload>
public typealias EventObserver = (StreamEvent<RequestPayload>) -> Void
public typealias EventObserverFactory = (Context) -> EventLoopFuture<EventObserver>
Expand All @@ -39,9 +36,17 @@ public final class ClientStreamingCallHandler<

// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call.
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated.
public init(callHandlerContext: CallHandlerContext, eventObserverFactory: @escaping EventObserverFactory) {
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping EventObserverFactory
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
19 changes: 9 additions & 10 deletions Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,24 @@ import Logging
///
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed.
/// - To close the call and send the status, complete the status future returned by the observer block.
public final class ServerStreamingCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class ServerStreamingCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<GRPCStatus>

private var eventObserver: EventObserver?
private var callContext: StreamingResponseCallContext<ResponsePayload>?
private let eventObserverFactory: (StreamingResponseCallContext<ResponsePayload>) -> EventObserver

public init(
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (StreamingResponseCallContext<ResponsePayload>) -> EventObserver
) {
// Delay the creation of the event observer until we actually get a request head, otherwise it
// would be possible for the observer to write into the pipeline (by completing the status
// promise) before the pipeline is configured.
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

override internal func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
16 changes: 9 additions & 7 deletions Sources/GRPC/CallHandlers/UnaryCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ import Logging
/// - The observer block is implemented by the framework user and returns a future containing the call result.
/// - To return a response to the client, the framework user should complete that future
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor).
public final class UnaryCallHandler<
RequestPayload: GRPCPayload,
ResponsePayload: GRPCPayload
>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public final class UnaryCallHandler<RequestPayload, ResponsePayload>: _BaseCallHandler<RequestPayload, ResponsePayload> {
public typealias EventObserver = (RequestPayload) -> EventLoopFuture<ResponsePayload>
private var eventObserver: EventObserver?
private var callContext: UnaryResponseCallContext<ResponsePayload>?
private let eventObserverFactory: (UnaryResponseCallContext<ResponsePayload>) -> EventObserver

public init(
internal init<Serializer: MessageSerializer, Deserializer: MessageDeserializer>(
serializer: Serializer,
deserializer: Deserializer,
callHandlerContext: CallHandlerContext,
eventObserverFactory: @escaping (UnaryResponseCallContext<ResponsePayload>) -> EventObserver
) {
) where Serializer.Input == ResponsePayload, Deserializer.Output == RequestPayload {
self.eventObserverFactory = eventObserverFactory
super.init(callHandlerContext: callHandlerContext)
super.init(
callHandlerContext: callHandlerContext,
codec: GRPCServerCodecHandler(serializer: serializer, deserializer: deserializer)
)
}

internal override func processHead(_ head: HTTPRequestHead, context: ChannelHandlerContext) {
Expand Down
23 changes: 11 additions & 12 deletions Sources/GRPC/CallHandlers/_BaseCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,8 @@ import Logging
///
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses.
/// - Important: This is **NOT** part of the public API.
public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>: GRPCCallHandler {
public func makeGRPCServerCodec() -> ChannelHandler {
return HTTP1ToGRPCServerCodec<RequestPayload, ResponsePayload>(
encoding: self.callHandlerContext.encoding,
logger: self.logger
)
}
public class _BaseCallHandler<Request, Response>: GRPCCallHandler {
public let _codec: ChannelHandler

/// Called when the request head has been received.
///
Expand All @@ -41,7 +36,7 @@ public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPC
/// Called whenever a message has been received.
///
/// Overridden by subclasses.
internal func processMessage(_ message: RequestPayload) throws {
internal func processMessage(_ message: Request) throws {
fatalError("needs to be overridden")
}

Expand Down Expand Up @@ -69,13 +64,17 @@ public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPC
return self.callHandlerContext.logger
}

internal init(callHandlerContext: CallHandlerContext) {
internal init(
callHandlerContext: CallHandlerContext,
codec: ChannelHandler
) {
self.callHandlerContext = callHandlerContext
self._codec = codec
}
}

extension _BaseCallHandler: ChannelInboundHandler {
public typealias InboundIn = _GRPCServerRequestPart<RequestPayload>
public typealias InboundIn = _GRPCServerRequestPart<Request>

/// Passes errors to the user-provided `errorHandler`. After an error has been received an
/// appropriate status is written. Errors which don't conform to `GRPCStatusTransformable`
Expand Down Expand Up @@ -122,8 +121,8 @@ extension _BaseCallHandler: ChannelInboundHandler {
}

extension _BaseCallHandler: ChannelOutboundHandler {
public typealias OutboundIn = _GRPCServerResponsePart<ResponsePayload>
public typealias OutboundOut = _GRPCServerResponsePart<ResponsePayload>
public typealias OutboundIn = _GRPCServerResponsePart<Response>
public typealias OutboundOut = _GRPCServerResponsePart<Response>

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
guard self.serverCanWrite else {
Expand Down
Loading

0 comments on commit 6fb9826

Please sign in to comment.