-
Notifications
You must be signed in to change notification settings - Fork 420
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
First implementation of gRPC based on SwiftNIO (#281)
* First experiments with a NIO-based gRPC server. Contains the following commits: - Refactor gRPC decoding into dedicated codec classes. - Start work on GRPCServerHandler. - Add a "unary call handler" and use that for the tests. - Refactoring starting a GRPC server into a dedicated class. - Fix sending unary responses. - Add a handler for client-streaming calls. - Also implement bidirectional-streaming calls. - Make sure to flush in server-streaming calls after each sent message. - Add the missing test cases to `allTests`. - Refactor `StatusSendingHandler` into its own class. - Rename `GRPCServerHandler` to `GRPCChannelHandler`. - Remove a FIXME. - Add a few more comments. - Attach the actual call handlers as channel handlers instead of manually forwarding messages to them. Remove SwiftGRPCNIO's dependency on SwiftGRPC and move the responsibility for encoding GRPC statuses to HTTP1ToRawGRPCServerCoded. Temporarily disable two test cases that are failing at the moment. Add SwiftGRPCNIO as an exposed library. Another try at getting CI to work with SwiftGRPCNIO. More dependency fixes. Add `SwiftGRPCNIO.EchoServerTests` to LinuxMain.swift. Fix a string comparison in `.travis-install.sh`. Add nghttp2 to the list of CI dependencies. Another try with installing nghttp2 via brew. Another try at using libnghttp2-dev under Ubuntu 14.04. More Travis fixes. One last try. Disable two more tests for now, as they sometimes fail on CI. Make Carthage debug builds verbose. Only use SwiftGRPC-Carthage.xcodeproj for Carthage builds. * Make `ServerStreamingCallHandler.sendMessage` return a send future as well. * Re-enable two more tests and suppress two warnings. * Unify the interface across the different call handlers. * Rename `...CallHandler.handler` to `.eventObserver`. * Add support for returning custom statuses (e.g. with additional metadata attached) from calls that have a unary response. * Minor argument reordering. * Avoid forcing unary call handlers to return an event loop future. Instead, they should fulfill `handler.responsePromise`. * Add a TODO. * Add codegen support for non-TestStub NIO server code. * Add more properties to GRPCCallHandler. * Store the full `HTTPRequestHead` alongside a gRPC call handler. * Add support for having client-streaming request handlers return a future as their event handler. This allows them to perform some asynchronous work before having to return an event handler (e.g. to authenticate the request asynchronously before providing an event handler that relies on such authentication). * Make `StatusSendingHandler.statusPromise` public. * Convert a few non-blocking calls in tests to blocking ones to simplify things. * Refactoring: pass special `ResponseHandler` objects to NIO server call handlers with a much lower API surface. In addition, these `ResponseHandler` are easier to stub for testing. * Code review fixes, interface improvements. * Rename a few NIO tests. * Add documentation. * Rename "headers" to "request". * Minor performance improvement by avoiding one copy. * Make unary calls take a `StatusOnlyCallContext` instead of `UnaryResponseCallContext`, as suggested by @kevints. * Rename `sendOperationChain` in tests to `endOfSendOperationQueue`. * Review fixes. * Add one more comment to the README. * Oops, fix the tests. * Remove two unnecessary server channel options. * Add some more documentation. * Pin `SwiftNIOHTTP2` for the time being.
- Loading branch information
Showing
30 changed files
with
1,361 additions
and
50 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import Foundation | ||
import SwiftProtobuf | ||
import NIO | ||
import NIOHTTP1 | ||
|
||
/// Provides a means for decoding incoming gRPC messages into protobuf objects. | ||
/// | ||
/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses. | ||
public class BaseCallHandler<RequestMessage: Message, ResponseMessage: Message>: GRPCCallHandler { | ||
public func makeGRPCServerCodec() -> ChannelHandler { return GRPCServerCodec<RequestMessage, ResponseMessage>() } | ||
|
||
/// Called whenever a message has been received. | ||
/// | ||
/// Overridden by subclasses. | ||
public func processMessage(_ message: RequestMessage) { | ||
fatalError("needs to be overridden") | ||
} | ||
|
||
/// Called when the client has half-closed the stream, indicating that they won't send any further data. | ||
/// | ||
/// Overridden by subclasses if the "end-of-stream" event is relevant. | ||
public func endOfStreamReceived() { } | ||
} | ||
|
||
extension BaseCallHandler: ChannelInboundHandler { | ||
public typealias InboundIn = GRPCServerRequestPart<RequestMessage> | ||
public typealias OutboundOut = GRPCServerResponsePart<ResponseMessage> | ||
|
||
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { | ||
switch self.unwrapInboundIn(data) { | ||
case .head: preconditionFailure("should not have received headers") | ||
case .message(let message): processMessage(message) | ||
case .end: endOfStreamReceived() | ||
} | ||
} | ||
} |
44 changes: 44 additions & 0 deletions
44
Sources/SwiftGRPCNIO/CallHandlers/BidirectionalStreamingCallHandler.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import Foundation | ||
import SwiftProtobuf | ||
import NIO | ||
import NIOHTTP1 | ||
|
||
/// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block. | ||
/// | ||
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed. | ||
/// - To close the call and send the status, fulfill `context.statusPromise`. | ||
public class BidirectionalStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> { | ||
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void | ||
private var eventObserver: EventLoopFuture<EventObserver>? | ||
|
||
private var context: StreamingResponseCallContext<ResponseMessage>? | ||
|
||
// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call. | ||
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated. | ||
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) { | ||
super.init() | ||
let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request) | ||
self.context = context | ||
let eventObserver = eventObserverFactory(context) | ||
self.eventObserver = eventObserver | ||
// Terminate the call if no observer is provided. | ||
eventObserver.cascadeFailure(promise: context.statusPromise) | ||
context.statusPromise.futureResult.whenComplete { | ||
// When done, reset references to avoid retain cycles. | ||
self.eventObserver = nil | ||
self.context = nil | ||
} | ||
} | ||
|
||
public override func processMessage(_ message: RequestMessage) { | ||
eventObserver?.whenSuccess { observer in | ||
observer(.message(message)) | ||
} | ||
} | ||
|
||
public override func endOfStreamReceived() { | ||
eventObserver?.whenSuccess { observer in | ||
observer(.end) | ||
} | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
Sources/SwiftGRPCNIO/CallHandlers/ClientStreamingCallHandler.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import Foundation | ||
import SwiftProtobuf | ||
import NIO | ||
import NIOHTTP1 | ||
|
||
/// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block. | ||
/// | ||
/// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done. | ||
public class ClientStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> { | ||
public typealias EventObserver = (StreamEvent<RequestMessage>) -> Void | ||
private var eventObserver: EventLoopFuture<EventObserver>? | ||
|
||
private var context: UnaryResponseCallContext<ResponseMessage>? | ||
|
||
// We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call. | ||
// If authentication fails, they can simply fail the observer future, which causes the call to be terminated. | ||
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventLoopFuture<EventObserver>) { | ||
super.init() | ||
let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request) | ||
self.context = context | ||
let eventObserver = eventObserverFactory(context) | ||
self.eventObserver = eventObserver | ||
// Terminate the call if no observer is provided. | ||
eventObserver.cascadeFailure(promise: context.responsePromise) | ||
context.responsePromise.futureResult.whenComplete { | ||
// When done, reset references to avoid retain cycles. | ||
self.eventObserver = nil | ||
self.context = nil | ||
} | ||
} | ||
|
||
public override func processMessage(_ message: RequestMessage) { | ||
eventObserver?.whenSuccess { observer in | ||
observer(.message(message)) | ||
} | ||
} | ||
|
||
public override func endOfStreamReceived() { | ||
eventObserver?.whenSuccess { observer in | ||
observer(.end) | ||
} | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
Sources/SwiftGRPCNIO/CallHandlers/ServerStreamingCallHandler.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import Foundation | ||
import SwiftProtobuf | ||
import NIO | ||
import NIOHTTP1 | ||
|
||
/// Handles server-streaming calls. Calls the observer block with the request message. | ||
/// | ||
/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed. | ||
/// - To close the call and send the status, complete the status future returned by the observer block. | ||
public class ServerStreamingCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> { | ||
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<GRPCStatus> | ||
private var eventObserver: EventObserver? | ||
|
||
private var context: StreamingResponseCallContext<ResponseMessage>? | ||
|
||
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (StreamingResponseCallContext<ResponseMessage>) -> EventObserver) { | ||
super.init() | ||
let context = StreamingResponseCallContextImpl<ResponseMessage>(channel: channel, request: request) | ||
self.context = context | ||
self.eventObserver = eventObserverFactory(context) | ||
context.statusPromise.futureResult.whenComplete { | ||
// When done, reset references to avoid retain cycles. | ||
self.eventObserver = nil | ||
self.context = nil | ||
} | ||
} | ||
|
||
|
||
public override func processMessage(_ message: RequestMessage) { | ||
guard let eventObserver = self.eventObserver, | ||
let context = self.context else { | ||
//! FIXME: Better handle this error? | ||
print("multiple messages received on unary call") | ||
return | ||
} | ||
|
||
let resultFuture = eventObserver(message) | ||
resultFuture | ||
// Fulfill the status promise with whatever status the framework user has provided. | ||
.cascade(promise: context.statusPromise) | ||
self.eventObserver = nil | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import Foundation | ||
import SwiftProtobuf | ||
import NIO | ||
import NIOHTTP1 | ||
|
||
/// Handles unary calls. Calls the observer block with the request message. | ||
/// | ||
/// - The observer block is implemented by the framework user and returns a future containing the call result. | ||
/// - To return a response to the client, the framework user should complete that future | ||
/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor). | ||
public class UnaryCallHandler<RequestMessage: Message, ResponseMessage: Message>: BaseCallHandler<RequestMessage, ResponseMessage> { | ||
public typealias EventObserver = (RequestMessage) -> EventLoopFuture<ResponseMessage> | ||
private var eventObserver: EventObserver? | ||
|
||
private var context: UnaryResponseCallContext<ResponseMessage>? | ||
|
||
public init(channel: Channel, request: HTTPRequestHead, eventObserverFactory: (UnaryResponseCallContext<ResponseMessage>) -> EventObserver) { | ||
super.init() | ||
let context = UnaryResponseCallContextImpl<ResponseMessage>(channel: channel, request: request) | ||
self.context = context | ||
self.eventObserver = eventObserverFactory(context) | ||
context.responsePromise.futureResult.whenComplete { | ||
// When done, reset references to avoid retain cycles. | ||
self.eventObserver = nil | ||
self.context = nil | ||
} | ||
} | ||
|
||
public override func processMessage(_ message: RequestMessage) { | ||
guard let eventObserver = self.eventObserver, | ||
let context = self.context else { | ||
//! FIXME: Better handle this error? | ||
print("multiple messages received on unary call") | ||
return | ||
} | ||
|
||
let resultFuture = eventObserver(message) | ||
resultFuture | ||
// Fulfill the response promise with whatever response (or error) the framework user has provided. | ||
.cascade(promise: context.responsePromise) | ||
self.eventObserver = nil | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import Foundation | ||
import SwiftProtobuf | ||
import NIO | ||
import NIOHTTP1 | ||
|
||
/// Processes individual gRPC messages and stream-close events on a HTTP2 channel. | ||
public protocol GRPCCallHandler: ChannelHandler { | ||
func makeGRPCServerCodec() -> ChannelHandler | ||
} | ||
|
||
/// Provides `GRPCCallHandler` objects for the methods on a particular service name. | ||
/// | ||
/// Implemented by the generated code. | ||
public protocol CallHandlerProvider: class { | ||
/// The name of the service this object is providing methods for, including the package path. | ||
/// | ||
/// - Example: "io.grpc.Echo.EchoService" | ||
var serviceName: String { get } | ||
|
||
/// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's | ||
/// method. Returns nil for methods not handled by this service. | ||
func handleMethod(_ methodName: String, request: HTTPRequestHead, serverHandler: GRPCChannelHandler, channel: Channel) -> GRPCCallHandler? | ||
} | ||
|
||
/// Listens on a newly-opened HTTP2 subchannel and yields to the sub-handler matching a call, if available. | ||
/// | ||
/// Once the request headers are available, asks the `CallHandlerProvider` corresponding to the request's service name | ||
/// for an `GRPCCallHandler` object. That object is then forwarded the individual gRPC messages. | ||
public final class GRPCChannelHandler { | ||
private let servicesByName: [String: CallHandlerProvider] | ||
|
||
public init(servicesByName: [String: CallHandlerProvider]) { | ||
self.servicesByName = servicesByName | ||
} | ||
} | ||
|
||
extension GRPCChannelHandler: ChannelInboundHandler { | ||
public typealias InboundIn = RawGRPCServerRequestPart | ||
public typealias OutboundOut = RawGRPCServerResponsePart | ||
|
||
public func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { | ||
let requestPart = self.unwrapInboundIn(data) | ||
switch requestPart { | ||
case .head(let requestHead): | ||
// URI format: "/package.Servicename/MethodName", resulting in the following components separated by a slash: | ||
// - uriComponents[0]: empty | ||
// - uriComponents[1]: service name (including the package name); | ||
// `CallHandlerProvider`s should provide the service name including the package name. | ||
// - uriComponents[2]: method name. | ||
let uriComponents = requestHead.uri.components(separatedBy: "/") | ||
guard uriComponents.count >= 3 && uriComponents[0].isEmpty, | ||
let providerForServiceName = servicesByName[uriComponents[1]], | ||
let callHandler = providerForServiceName.handleMethod(uriComponents[2], request: requestHead, serverHandler: self, channel: ctx.channel) else { | ||
ctx.writeAndFlush(self.wrapOutboundOut(.status(.unimplemented(method: requestHead.uri))), promise: nil) | ||
return | ||
} | ||
|
||
var responseHeaders = HTTPHeaders() | ||
responseHeaders.add(name: "content-type", value: "application/grpc") | ||
ctx.write(self.wrapOutboundOut(.headers(responseHeaders)), promise: nil) | ||
|
||
let codec = callHandler.makeGRPCServerCodec() | ||
ctx.pipeline.add(handler: codec, after: self) | ||
.then { ctx.pipeline.add(handler: callHandler, after: codec) } | ||
//! FIXME(lukasa): Fix the ordering of this with NIO 1.12 and replace with `remove(, promise:)`. | ||
.whenComplete { _ = ctx.pipeline.remove(handler: self) } | ||
|
||
case .message, .end: | ||
preconditionFailure("received \(requestPart), should have been removed as a handler at this point") | ||
} | ||
} | ||
} |
Oops, something went wrong.