From 223c7a1080a5ddff236000985d102114f031527d Mon Sep 17 00:00:00 2001 From: George Barnett Date: Mon, 25 Jan 2021 16:00:32 +0000 Subject: [PATCH] Remove deprecated/dead code Motivation: We made no commitment to carry API which was deprecated before 1.0.0, so let's remove all shims, deprecated and unreachable code. Modifications: - Remove shims - Remove some call contexts, since they're for use with the old RPC handling Result: Less code. --- .../BidirectionalStreamingCallHandler.swift | 247 ------ .../CallHandlers/CallHandlerFactory.swift | 183 ----- .../ClientStreamingCallHandler.swift | 251 ------ .../ServerStreamingCallHandler.swift | 268 ------- .../GRPC/CallHandlers/UnaryCallHandler.swift | 296 ------- .../GRPC/CallHandlers/_BaseCallHandler.swift | 742 ------------------ .../GRPCServerRequestRoutingHandler.swift | 28 +- .../ServerCallContext.swift | 8 - .../StreamingResponseCallContext.swift | 102 --- .../UnaryResponseCallContext.swift | 94 --- Sources/GRPC/Shims.swift | 467 ----------- Sources/GRPC/TimeLimit.swift | 4 +- 12 files changed, 3 insertions(+), 2687 deletions(-) delete mode 100644 Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift delete mode 100644 Sources/GRPC/CallHandlers/CallHandlerFactory.swift delete mode 100644 Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift delete mode 100644 Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift delete mode 100644 Sources/GRPC/CallHandlers/UnaryCallHandler.swift delete mode 100644 Sources/GRPC/CallHandlers/_BaseCallHandler.swift delete mode 100644 Sources/GRPC/Shims.swift diff --git a/Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift b/Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift deleted file mode 100644 index 50f3a5610..000000000 --- a/Sources/GRPC/CallHandlers/BidirectionalStreamingCallHandler.swift +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright 2019, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Foundation -import Logging -import NIO -import NIOHPACK -import SwiftProtobuf - -/// Handles bidirectional streaming calls. Forwards incoming messages and end-of-stream events to the observer block. -/// -/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed. -/// If the framework user wants to return a call error (e.g. in case of authentication failure), -/// they can fail the observer block future. -/// - To close the call and send the status, complete `context.statusPromise`. -public class BidirectionalStreamingCallHandler< - RequestDeserializer: MessageDeserializer, - ResponseSerializer: MessageSerializer ->: _BaseCallHandler { - @usableFromInline - internal typealias _Context = StreamingResponseCallContext - @usableFromInline - internal typealias _Observer = EventLoopFuture<(StreamEvent) -> Void> - - @usableFromInline - internal var _callHandlerState: _CallHandlerState - - // See 'UnaryCallHandler.State'. - @usableFromInline - internal enum _CallHandlerState { - case requestIdleResponseIdle((_Context) -> _Observer) - case requestOpenResponseOpen(_Context, _Observer) - case requestClosedResponseOpen(_Context) - case requestClosedResponseClosed - } - - // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call. - // If authentication fails, they can simply fail the observer future, which causes the call to be terminated. - @inlinable - internal init( - serializer: ResponseSerializer, - deserializer: RequestDeserializer, - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor], - eventObserverFactory: @escaping (StreamingResponseCallContext) - -> EventLoopFuture<(StreamEvent) -> Void> - ) { - self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory) - super.init( - callHandlerContext: callHandlerContext, - requestDeserializer: deserializer, - responseSerializer: serializer, - callType: .bidirectionalStreaming, - interceptors: interceptors - ) - } - - override public func channelInactive(context: ChannelHandlerContext) { - super.channelInactive(context: context) - - // Fail any remaining promise. - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseClosed: - self._callHandlerState = .requestClosedResponseClosed - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - self._callHandlerState = .requestClosedResponseClosed - context.statusPromise.fail(GRPCError.AlreadyComplete()) - } - } - - /// Handle an error from the event observer. - private func handleObserverError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request observer hasn't been created") - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - let (status, trailers) = self.processObserverError( - error, - headers: context.headers, - trailers: context.trailers - ) - // This will handle the response promise as well. - self.sendEnd(status: status, trailers: trailers) - - case .requestClosedResponseClosed: - // We hit an error, but we're already closed (because we hit a library error first). - () - } - } - - /// Handle a 'library' error, i.e. an error emanating from the `Channel`. - private func handleLibraryError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestOpenResponseOpen: - // We'll never see end of stream, we'll close. - let (status, trailers) = self.processLibraryError(error) - self.sendEnd(status: status, trailers: trailers) - - case .requestClosedResponseOpen: - // We've invoked the observer and seen the end of the request stream. We'll let this one play - // out. - () - - case .requestClosedResponseClosed: - // We're already closed, no need to do anything here. - () - } - } - - // MARK: - Inbound - - override func observeLibraryError(_ error: Error) { - self.handleLibraryError(error) - } - - override internal func observeHeaders(_ headers: HPACKHeaders) { - switch self._callHandlerState { - case let .requestIdleResponseIdle(factory): - let context = _StreamingResponseCallContext( - eventLoop: self.eventLoop, - headers: headers, - logger: self.logger, - userInfoRef: self._userInfoRef, - compressionIsEnabled: self._callHandlerContext.encoding.isEnabled, - sendResponse: self.sendResponse(_:metadata:promise:) - ) - let observer = factory(context) - - // Fully open. We'll send the response headers back in a moment. - self._callHandlerState = .requestOpenResponseOpen(context, observer) - - // Register a failure callback for the observer failing. - observer.whenFailure(self.handleObserverError(_:)) - - // Register actions on the status promise. - context.statusPromise.futureResult.whenComplete { result in - switch result { - case let .success(status): - self.sendEnd(status: status, trailers: context.trailers) - case let .failure(error): - self.handleObserverError(error) - } - } - - // Write back the response headers. - self.sendResponsePartFromObserver(.metadata([:]), promise: nil) - - // The main state machine guards against this. - case .requestOpenResponseOpen, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: request headers already received") - } - } - - override internal func observeRequest(_ message: RequestPayload) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request received before headers") - - case let .requestOpenResponseOpen(_, observer): - observer.whenSuccess { - $0(.message(message)) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: the request stream has already been closed") - } - } - - override internal func observeEnd() { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: no request headers received") - - case let .requestOpenResponseOpen(context, observer): - self._callHandlerState = .requestClosedResponseOpen(context) - observer.whenSuccess { - $0(.end) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: request stream is already closed") - } - } - - // MARK: - Outbound - - private func sendResponse( - _ message: ResponsePayload, - metadata: MessageMetadata, - promise: EventLoopPromise? - ) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: can't send response before receiving headers and request") - - case .requestOpenResponseOpen, - .requestClosedResponseOpen: - self.sendResponsePartFromObserver(.message(message, metadata), promise: promise) - - case .requestClosedResponseClosed: - // We're already closed. This isn't a precondition failure because we may have encountered - // an error before the observer block completed. - promise?.fail(GRPCError.AlreadyComplete()) - } - } - - private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseOpen: - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - - case let .requestOpenResponseOpen(context, _): - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - // Fail the promise. - context.statusPromise.fail(status) - - case .requestClosedResponseClosed: - // Already closed, do nothing. - () - } - } -} diff --git a/Sources/GRPC/CallHandlers/CallHandlerFactory.swift b/Sources/GRPC/CallHandlers/CallHandlerFactory.swift deleted file mode 100644 index ca97951f7..000000000 --- a/Sources/GRPC/CallHandlers/CallHandlerFactory.swift +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Copyright 2020, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import NIO -import SwiftProtobuf - -// We can't use a 'where' clause on 'init's to constrain the generic requirements of a type. Instead -// we'll use static methods on this factory. -public enum CallHandlerFactory { - public typealias UnaryContext = UnaryResponseCallContext - public typealias UnaryEventObserver = (Request) -> EventLoopFuture - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeUnary( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (UnaryContext) - -> UnaryEventObserver - ) -> UnaryCallHandler, ProtobufSerializer> { - return UnaryCallHandler( - serializer: ProtobufSerializer(), - deserializer: ProtobufDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeUnary( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (UnaryContext) - -> UnaryEventObserver - ) -> UnaryCallHandler, GRPCPayloadSerializer> { - return UnaryCallHandler( - serializer: GRPCPayloadSerializer(), - deserializer: GRPCPayloadDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } - - public typealias ClientStreamingContext = UnaryResponseCallContext - public typealias ClientStreamingEventObserver = - EventLoopFuture<(StreamEvent) -> Void> - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeClientStreaming( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (ClientStreamingContext) - -> ClientStreamingEventObserver - ) -> ClientStreamingCallHandler, ProtobufSerializer> { - return ClientStreamingCallHandler( - serializer: ProtobufSerializer(), - deserializer: ProtobufDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeClientStreaming( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (ClientStreamingContext) - -> ClientStreamingEventObserver - ) -> ClientStreamingCallHandler< - GRPCPayloadDeserializer, - GRPCPayloadSerializer - > { - return ClientStreamingCallHandler( - serializer: GRPCPayloadSerializer(), - deserializer: GRPCPayloadDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } - - public typealias ServerStreamingContext = StreamingResponseCallContext - public typealias ServerStreamingEventObserver = (Request) -> EventLoopFuture - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeServerStreaming( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (ServerStreamingContext) - -> ServerStreamingEventObserver - ) -> ServerStreamingCallHandler, ProtobufSerializer> { - return ServerStreamingCallHandler( - serializer: ProtobufSerializer(), - deserializer: ProtobufDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeServerStreaming( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (ServerStreamingContext) - -> ServerStreamingEventObserver - ) -> ServerStreamingCallHandler< - GRPCPayloadDeserializer, - GRPCPayloadSerializer - > { - return ServerStreamingCallHandler( - serializer: GRPCPayloadSerializer(), - deserializer: GRPCPayloadDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } - - public typealias BidirectionalStreamingContext = StreamingResponseCallContext - public typealias BidirectionalStreamingEventObserver = - EventLoopFuture<(StreamEvent) -> Void> - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeBidirectionalStreaming( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (BidirectionalStreamingContext) - -> BidirectionalStreamingEventObserver - ) -> BidirectionalStreamingCallHandler< - ProtobufDeserializer, - ProtobufSerializer - > { - return BidirectionalStreamingCallHandler( - serializer: ProtobufSerializer(), - deserializer: ProtobufDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } - - @available(*, deprecated, message: "Please regenerate your server code.") - @inlinable - public static func makeBidirectionalStreaming( - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor] = [], - eventObserverFactory: @escaping (BidirectionalStreamingContext) - -> BidirectionalStreamingEventObserver - ) -> BidirectionalStreamingCallHandler< - GRPCPayloadDeserializer, - GRPCPayloadSerializer - > { - return BidirectionalStreamingCallHandler( - serializer: GRPCPayloadSerializer(), - deserializer: GRPCPayloadDeserializer(), - callHandlerContext: callHandlerContext, - interceptors: interceptors, - eventObserverFactory: eventObserverFactory - ) - } -} diff --git a/Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift b/Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift deleted file mode 100644 index 4d831e3fa..000000000 --- a/Sources/GRPC/CallHandlers/ClientStreamingCallHandler.swift +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Copyright 2019, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Foundation -import Logging -import NIO -import NIOHPACK -import NIOHTTP1 -import SwiftProtobuf - -/// Handles client-streaming calls. Forwards incoming messages and end-of-stream events to the observer block. -/// -/// - The observer block is implemented by the framework user and fulfills `context.responsePromise` when done. -/// If the framework user wants to return a call error (e.g. in case of authentication failure), -/// they can fail the observer block future. -/// - To close the call and send the response, complete `context.responsePromise`. -public final class ClientStreamingCallHandler< - RequestDeserializer: MessageDeserializer, - ResponseSerializer: MessageSerializer ->: _BaseCallHandler { - @usableFromInline - internal typealias _Context = UnaryResponseCallContext - @usableFromInline - internal typealias _Observer = EventLoopFuture<(StreamEvent) -> Void> - - @usableFromInline - internal var _callHandlerState: _CallHandlerState - - // See 'UnaryCallHandler.State'. - @usableFromInline - internal enum _CallHandlerState { - case requestIdleResponseIdle((_Context) -> _Observer) - case requestOpenResponseOpen(_Context, _Observer) - case requestClosedResponseOpen(_Context) - case requestClosedResponseClosed - } - - // We ask for a future of type `EventObserver` to allow the framework user to e.g. asynchronously authenticate a call. - // If authentication fails, they can simply fail the observer future, which causes the call to be terminated. - @inlinable - internal init( - serializer: ResponseSerializer, - deserializer: RequestDeserializer, - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor], - eventObserverFactory: @escaping (UnaryResponseCallContext) - -> EventLoopFuture<(StreamEvent) -> Void> - ) { - self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory) - super.init( - callHandlerContext: callHandlerContext, - requestDeserializer: deserializer, - responseSerializer: serializer, - callType: .clientStreaming, - interceptors: interceptors - ) - } - - override public func channelInactive(context: ChannelHandlerContext) { - super.channelInactive(context: context) - - // Fail any remaining promise. - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseClosed: - self._callHandlerState = .requestClosedResponseClosed - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - self._callHandlerState = .requestClosedResponseClosed - context.responsePromise.fail(GRPCError.AlreadyComplete()) - } - } - - /// Handle an error from the event observer. - private func handleObserverError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request observer hasn't been created") - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - let (status, trailers) = self.processObserverError( - error, - headers: context.headers, - trailers: context.trailers - ) - // This will handle the response promise as well. - self.sendEnd(status: status, trailers: trailers) - - case .requestClosedResponseClosed: - // We hit an error, but we're already closed (because we hit a library error first). - () - } - } - - /// Handle a 'library' error, i.e. an error emanating from the `Channel`. - private func handleLibraryError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestOpenResponseOpen: - // We'll never see a request message, so just send end. - let (status, trailers) = self.processLibraryError(error) - self.sendEnd(status: status, trailers: trailers) - - case .requestClosedResponseOpen: - // We've invoked the observer and have seen the end of the request stream. We'll let that - // play out. - () - - case .requestClosedResponseClosed: - // We're already closed, no need to do anything here. - () - } - } - - // MARK: - Inbound - - override func observeLibraryError(_ error: Error) { - self.handleLibraryError(error) - } - - override internal func observeHeaders(_ headers: HPACKHeaders) { - switch self._callHandlerState { - case let .requestIdleResponseIdle(factory): - let context = UnaryResponseCallContext( - eventLoop: self.eventLoop, - headers: headers, - logger: self.logger, - userInfoRef: self._userInfoRef - ) - - let observer = factory(context) - - // Fully open. We'll send the response headers back in a moment. - self._callHandlerState = .requestOpenResponseOpen(context, observer) - - // Register a failure callback for the observer failing. - observer.whenFailure(self.handleObserverError(_:)) - - // Register callbacks on the response promise. - context.responsePromise.futureResult.whenComplete { result in - switch result { - case let .success(response): - self.sendResponse(response) - case let .failure(error): - self.handleObserverError(error) - } - } - - // Write back the response headers. - self.sendResponsePartFromObserver(.metadata([:]), promise: nil) - - // The main state machine guards against this. - case .requestOpenResponseOpen, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: request headers already received") - } - } - - override internal func observeRequest(_ message: RequestPayload) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request received before headers") - - case let .requestOpenResponseOpen(_, observer): - observer.whenSuccess { - $0(.message(message)) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: the request stream has already been closed") - } - } - - override internal func observeEnd() { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: no request headers received") - - case let .requestOpenResponseOpen(context, observer): - self._callHandlerState = .requestClosedResponseOpen(context) - observer.whenSuccess { - $0(.end) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: request stream is already closed") - } - } - - // MARK: - Outbound - - private func sendResponse(_ message: ResponsePayload) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: can't send response before receiving headers and request") - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver( - .message(message, .init(compress: context.compressionEnabled, flush: false)), - promise: nil - ) - self.sendResponsePartFromObserver( - .end(context.responseStatus, context.trailers), - promise: nil - ) - - case .requestClosedResponseClosed: - // We're already closed. This isn't a precondition failure because we may have encountered - // an error before the observer block completed. - () - } - } - - private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseOpen: - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - - case let .requestOpenResponseOpen(context, _): - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - // Fail the promise. - context.responsePromise.fail(status) - - case .requestClosedResponseClosed: - // Already closed, do nothing. - () - } - } -} diff --git a/Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift b/Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift deleted file mode 100644 index d4740e9e6..000000000 --- a/Sources/GRPC/CallHandlers/ServerStreamingCallHandler.swift +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Copyright 2019, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Foundation -import Logging -import NIO -import NIOHPACK -import NIOHTTP1 -import SwiftProtobuf - -/// Handles server-streaming calls. Calls the observer block with the request message. -/// -/// - The observer block is implemented by the framework user and calls `context.sendResponse` as needed. -/// - To close the call and send the status, complete the status future returned by the observer block. -public final class ServerStreamingCallHandler< - RequestDeserializer: MessageDeserializer, - ResponseSerializer: MessageSerializer ->: _BaseCallHandler { - @usableFromInline - internal typealias _Context = StreamingResponseCallContext - @usableFromInline - internal typealias _Observer = (RequestPayload) -> EventLoopFuture - - @usableFromInline - internal var _callHandlerState: _CallHandlerState - - // See 'UnaryCallHandler.State'. - @usableFromInline - internal enum _CallHandlerState { - case requestIdleResponseIdle((_Context) -> _Observer) - case requestOpenResponseOpen(_Context, ObserverState) - case requestClosedResponseOpen(_Context) - case requestClosedResponseClosed - - @usableFromInline - enum ObserverState { - case notObserved(_Observer) - case observed - } - } - - @inlinable - internal init( - serializer: ResponseSerializer, - deserializer: RequestDeserializer, - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor], - eventObserverFactory: @escaping (StreamingResponseCallContext) - -> (RequestPayload) -> EventLoopFuture - ) { - self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory) - super.init( - callHandlerContext: callHandlerContext, - requestDeserializer: deserializer, - responseSerializer: serializer, - callType: .serverStreaming, - interceptors: interceptors - ) - } - - override public func channelInactive(context: ChannelHandlerContext) { - super.channelInactive(context: context) - - // Fail any remaining promise. - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseClosed: - self._callHandlerState = .requestClosedResponseClosed - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - self._callHandlerState = .requestClosedResponseClosed - context.statusPromise.fail(GRPCError.AlreadyComplete()) - } - } - - /// Handle an error from the event observer. - private func handleObserverError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request observer hasn't been created") - - case .requestOpenResponseOpen(_, .notObserved): - preconditionFailure("Invalid state: request observer hasn't been invoked") - - case let .requestOpenResponseOpen(context, .observed), - let .requestClosedResponseOpen(context): - let (status, trailers) = self.processObserverError( - error, - headers: context.headers, - trailers: context.trailers - ) - // This will handle the response promise as well. - self.sendEnd(status: status, trailers: trailers) - - case .requestClosedResponseClosed: - // We hit an error, but we're already closed (because we hit a library error first). - () - } - } - - /// Handle a 'library' error, i.e. an error emanating from the `Channel`. - private func handleLibraryError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestOpenResponseOpen(_, .notObserved): - // We'll never see a request message: send end. - let (status, trailers) = self.processLibraryError(error) - self.sendEnd(status: status, trailers: trailers) - - case .requestOpenResponseOpen(_, .observed), - .requestClosedResponseOpen: - // We've invoked the observer, we expect a response. We'll let this play out. - () - - case .requestClosedResponseClosed: - // We're already closed, no need to do anything here. - () - } - } - - // MARK: - Inbound - - override func observeLibraryError(_ error: Error) { - self.handleLibraryError(error) - } - - override internal func observeHeaders(_ headers: HPACKHeaders) { - switch self._callHandlerState { - case let .requestIdleResponseIdle(factory): - let context = _StreamingResponseCallContext( - eventLoop: self.eventLoop, - headers: headers, - logger: self.logger, - userInfoRef: self._userInfoRef, - compressionIsEnabled: self._callHandlerContext.encoding.isEnabled, - sendResponse: self.sendResponse(_:metadata:promise:) - ) - let observer = factory(context) - - // Fully open. We'll send the response headers back in a moment. - self._callHandlerState = .requestOpenResponseOpen(context, .notObserved(observer)) - - // Register callbacks for the status promise. - context.statusPromise.futureResult.whenComplete { result in - switch result { - case let .success(status): - self.sendEnd(status: status, trailers: context.trailers) - case let .failure(error): - self.handleObserverError(error) - } - } - - // Write back the response headers. - self.sendResponsePartFromObserver(.metadata([:]), promise: nil) - - // The main state machine guards against this. - case .requestOpenResponseOpen, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state") - } - } - - override internal func observeRequest(_ message: RequestPayload) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request received before headers") - - case let .requestOpenResponseOpen(context, request): - switch request { - case .observed: - // We've already observed the request message. The main state machine doesn't guard against - // too many messages for unary streams. Assuming downstream handlers protect against this - // then this must be an errant interceptor. - () - - case let .notObserved(observer): - self._callHandlerState = .requestOpenResponseOpen(context, .observed) - // Complete the status promise with the observer block. - context.statusPromise.completeWith(observer(message)) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: the request stream has already been closed") - } - } - - override internal func observeEnd() { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: no request headers received") - - case let .requestOpenResponseOpen(context, request): - switch request { - case .observed: - // Close the request stream. - self._callHandlerState = .requestClosedResponseOpen(context) - - case .notObserved: - // We haven't received a request: this is an empty stream, the observer will never be - // invoked. Fail the response promise (which will have no side effect). - context.statusPromise.fail(GRPCError.StreamCardinalityViolation.request) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: request stream is already closed") - } - } - - // MARK: - Outbound - - private func sendResponse( - _ message: ResponsePayload, - metadata: MessageMetadata, - promise: EventLoopPromise? - ) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: can't send response before receiving headers and request") - - case .requestOpenResponseOpen(_, .notObserved): - preconditionFailure("Invalid state: can't send response before receiving request") - - case .requestOpenResponseOpen(_, .observed), - .requestClosedResponseOpen: - self.sendResponsePartFromObserver(.message(message, metadata), promise: promise) - - case .requestClosedResponseClosed: - // We're already closed. This isn't a precondition failure because we may have encountered - // an error before the observer block completed. - promise?.fail(GRPCError.AlreadyComplete()) - } - } - - private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseOpen: - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - - case let .requestOpenResponseOpen(context, _): - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - // Fail the promise. - context.statusPromise.fail(status) - - case .requestClosedResponseClosed: - // Already closed, do nothing. - () - } - } -} diff --git a/Sources/GRPC/CallHandlers/UnaryCallHandler.swift b/Sources/GRPC/CallHandlers/UnaryCallHandler.swift deleted file mode 100644 index 1dc6b5ddd..000000000 --- a/Sources/GRPC/CallHandlers/UnaryCallHandler.swift +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Copyright 2019, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Foundation -import Logging -import NIO -import NIOHPACK -import NIOHTTP1 -import SwiftProtobuf - -/// Handles unary calls. Calls the observer block with the request message. -/// -/// - The observer block is implemented by the framework user and returns a future containing the call result. -/// - To return a response to the client, the framework user should complete that future -/// (similar to e.g. serving regular HTTP requests in frameworks such as Vapor). -public final class UnaryCallHandler< - RequestDeserializer: MessageDeserializer, - ResponseSerializer: MessageSerializer ->: _BaseCallHandler { - @usableFromInline - internal typealias _Context = UnaryResponseCallContext - @usableFromInline - internal typealias _Observer = (RequestPayload) -> EventLoopFuture - - @usableFromInline - internal var _callHandlerState: _CallHandlerState - - @usableFromInline - internal enum _CallHandlerState { - // We don't have the following states (which we do have in the main state machine): - // - 'requestOpenResponseIdle', - // - 'requestClosedResponseIdle' - // - // We'll send headers back when we transition away from 'requestIdleResponseIdle' so the - // response stream can never be less idle than the request stream. - - /// Fully idle, we haven't seen the request headers yet and we haven't made an event observer - /// yet. - case requestIdleResponseIdle((_Context) -> _Observer) - - /// Received the request headers, created an observer and have sent back response headers. - /// We may or may not have observer the request message yet. - case requestOpenResponseOpen(_Context, ObserverState) - - /// Received the request headers, a message and the end of the request stream. The observer has - /// been invoked but it hasn't yet finished processing the request. - /// - /// Note: we know we've received a message if we're in this state, if we had seen the request - /// headers followed by end we'd fully close. - case requestClosedResponseOpen(_Context) - - /// We're done. - case requestClosedResponseClosed - - /// The state of the event observer. - @usableFromInline - enum ObserverState { - /// We have an event observer, but haven't yet received a request. - case notObserved(_Observer) - /// We've invoked the event observer with a request. - case observed - } - } - - @inlinable - internal init( - serializer: ResponseSerializer, - deserializer: RequestDeserializer, - callHandlerContext: CallHandlerContext, - interceptors: [ServerInterceptor], - eventObserverFactory: @escaping (UnaryResponseCallContext) - -> (RequestPayload) -> EventLoopFuture - ) { - self._callHandlerState = .requestIdleResponseIdle(eventObserverFactory) - super.init( - callHandlerContext: callHandlerContext, - requestDeserializer: deserializer, - responseSerializer: serializer, - callType: .unary, - interceptors: interceptors - ) - } - - override public func channelInactive(context: ChannelHandlerContext) { - super.channelInactive(context: context) - - // Fail any remaining promise. - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseClosed: - self._callHandlerState = .requestClosedResponseClosed - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - self._callHandlerState = .requestClosedResponseClosed - context.responsePromise.fail(GRPCError.AlreadyComplete()) - } - } - - /// Handle an error from the event observer. - private func handleObserverError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request observer hasn't been created") - - case let .requestOpenResponseOpen(context, _), - let .requestClosedResponseOpen(context): - let (status, trailers) = self.processObserverError( - error, - headers: context.headers, - trailers: context.trailers - ) - // This will handle the response promise as well. - self.sendEnd(status: status, trailers: trailers) - - case .requestClosedResponseClosed: - // We hit an error, but we're already closed (i.e. we hit a library error first). Ignore - // the error. - () - } - } - - /// Handle a 'library' error, i.e. an error emanating from the `Channel`. - private func handleLibraryError(_ error: Error) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestOpenResponseOpen(_, .notObserved): - // We haven't seen a message, we'll send end to close the stream. - let (status, trailers) = self.processLibraryError(error) - self.sendEnd(status: status, trailers: trailers) - - case .requestOpenResponseOpen(_, .observed), - .requestClosedResponseOpen: - // We've seen a message, the observer is in flight, we'll let it play out. - () - - case .requestClosedResponseClosed: - // We're already closed, we can just ignore this. - () - } - } - - // MARK: - Inbound - - override internal func observeLibraryError(_ error: Error) { - self.handleLibraryError(error) - } - - override internal func observeHeaders(_ headers: HPACKHeaders) { - switch self._callHandlerState { - case let .requestIdleResponseIdle(factory): - // This allocates a promise, but the observer is provided with 'StatusOnlyCallContext' and - // doesn't get access to the promise. The observer must return a response future instead - // which we cascade to this promise. We can avoid this extra allocation by using a different - // context here. - // - // TODO: provide a new context without a promise. - let context = UnaryResponseCallContext( - eventLoop: self.eventLoop, - headers: headers, - logger: self.logger, - userInfoRef: self._userInfoRef - ) - let observer = factory(context) - - // We're fully open now (we'll send the response headers back in a moment). - self._callHandlerState = .requestOpenResponseOpen(context, .notObserved(observer)) - - // Register callbacks for the response promise. - context.responsePromise.futureResult.whenComplete { result in - switch result { - case let .success(response): - self.sendResponse(response) - case let .failure(error): - self.handleObserverError(error) - } - } - - // Write back the response headers. - self.sendResponsePartFromObserver(.metadata([:]), promise: nil) - - // The main state machine guards against these states. - case .requestOpenResponseOpen, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: request headers already received") - } - } - - override internal func observeRequest(_ message: RequestPayload) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: request received before headers") - - case let .requestOpenResponseOpen(context, request): - switch request { - case .observed: - // We've already observed the request message. The main state machine doesn't guard against - // too many messages for unary streams. Assuming downstream handlers protect against this - // then this must be an errant interceptor, we'll ignore it. - () - - case let .notObserved(observer): - self._callHandlerState = .requestOpenResponseOpen(context, .observed) - // Complete the promise with the observer block. - context.responsePromise.completeWith(observer(message)) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: the request stream has already been closed") - } - } - - override internal func observeEnd() { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: no request headers received") - - case let .requestOpenResponseOpen(context, request): - switch request { - case .observed: - // Close the request stream. - self._callHandlerState = .requestClosedResponseOpen(context) - - case .notObserved: - // We haven't received a request: this is an empty stream, the observer will never be - // invoked. - context.responsePromise.fail(GRPCError.StreamCardinalityViolation.request) - } - - case .requestClosedResponseOpen, - .requestClosedResponseClosed: - preconditionFailure("Invalid state: request stream is already closed") - } - } - - // MARK: - Outbound - - private func sendResponse(_ message: ResponsePayload) { - switch self._callHandlerState { - case .requestIdleResponseIdle: - preconditionFailure("Invalid state: can't send response before receiving headers and request") - - case .requestOpenResponseOpen(_, .notObserved): - preconditionFailure("Invalid state: can't send response before receiving request") - - case let .requestOpenResponseOpen(context, .observed), - let .requestClosedResponseOpen(context): - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver( - .message(message, .init(compress: context.compressionEnabled, flush: false)), - promise: nil - ) - self.sendResponsePartFromObserver( - .end(context.responseStatus, context.trailers), - promise: nil - ) - - case .requestClosedResponseClosed: - // Already closed, do nothing. - () - } - } - - private func sendEnd(status: GRPCStatus, trailers: HPACKHeaders) { - switch self._callHandlerState { - case .requestIdleResponseIdle, - .requestClosedResponseOpen: - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - - case let .requestOpenResponseOpen(context, _): - self._callHandlerState = .requestClosedResponseClosed - self.sendResponsePartFromObserver(.end(status, trailers), promise: nil) - // Fail the promise. - context.responsePromise.fail(status) - - case .requestClosedResponseClosed: - // Already closed, do nothing. - () - } - } -} diff --git a/Sources/GRPC/CallHandlers/_BaseCallHandler.swift b/Sources/GRPC/CallHandlers/_BaseCallHandler.swift deleted file mode 100644 index d6c9d0e96..000000000 --- a/Sources/GRPC/CallHandlers/_BaseCallHandler.swift +++ /dev/null @@ -1,742 +0,0 @@ -/* - * Copyright 2019, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Foundation -import Logging -import NIO -import NIOHPACK -import SwiftProtobuf - -/// Provides a means for decoding incoming gRPC messages into protobuf objects. -/// -/// Calls through to `processMessage` for individual messages it receives, which needs to be implemented by subclasses. -/// - Important: This is **NOT** part of the public API. -public class _BaseCallHandler< - RequestDeserializer: MessageDeserializer, - ResponseSerializer: MessageSerializer ->: GRPCCallHandler, ChannelInboundHandler { - public typealias RequestPayload = RequestDeserializer.Output - public typealias ResponsePayload = ResponseSerializer.Input - - public typealias InboundIn = GRPCServerRequestPart - public typealias OutboundOut = GRPCServerResponsePart - - /// An interceptor pipeline. - @usableFromInline - internal var _pipeline: ServerInterceptorPipeline? - - /// Our current state. - @usableFromInline - internal var _state: State = .idle - - /// The type of this RPC, e.g. 'unary'. - @usableFromInline - internal let _callType: GRPCCallType - - /// Some context provided to us from the routing handler. - @usableFromInline - internal let _callHandlerContext: CallHandlerContext - - /// A request deserializer. - @usableFromInline - internal let _requestDeserializer: RequestDeserializer - - /// A response serializer. - @usableFromInline - internal let _responseSerializer: ResponseSerializer - - /// The `ChannelHandlerContext`. - @usableFromInline - internal var _context: ChannelHandlerContext? - - /// The event loop this call is being handled on. - @usableFromInline - internal var eventLoop: EventLoop { - return self._callHandlerContext.eventLoop - } - - /// An error delegate. - @usableFromInline - internal var errorDelegate: ServerErrorDelegate? { - return self._callHandlerContext.errorDelegate - } - - /// A logger. - @usableFromInline - internal var logger: Logger { - return self._callHandlerContext.logger - } - - /// A reference to `UserInfo`. - @usableFromInline - internal var _userInfoRef: Ref - - @inlinable - internal init( - callHandlerContext: CallHandlerContext, - requestDeserializer: RequestDeserializer, - responseSerializer: ResponseSerializer, - callType: GRPCCallType, - interceptors: [ServerInterceptor] - ) { - let userInfoRef = Ref(UserInfo()) - self._requestDeserializer = requestDeserializer - self._responseSerializer = responseSerializer - self._callHandlerContext = callHandlerContext - self._callType = callType - self._userInfoRef = userInfoRef - self._pipeline = ServerInterceptorPipeline( - logger: callHandlerContext.logger, - eventLoop: callHandlerContext.eventLoop, - path: callHandlerContext.path, - callType: callType, - remoteAddress: callHandlerContext.remoteAddress, - userInfoRef: userInfoRef, - interceptors: interceptors, - onRequestPart: self._receiveRequestPartFromInterceptors(_:), - onResponsePart: self._sendResponsePartFromInterceptors(_:promise:) - ) - } - - // MARK: - ChannelHandler - - public func handlerAdded(context: ChannelHandlerContext) { - self._state.handlerAdded() - self._context = context - } - - public func handlerRemoved(context: ChannelHandlerContext) { - self._pipeline = nil - self._context = nil - } - - public func channelInactive(context: ChannelHandlerContext) { - self._pipeline = nil - context.fireChannelInactive() - } - - public func errorCaught(context: ChannelHandlerContext, error: Error) { - if self._state.errorCaught() { - self.observeLibraryError(error) - } - } - - public func channelRead(context: ChannelHandlerContext, data: NIOAny) { - let part = self.unwrapInboundIn(data) - - switch part { - case let .metadata(headers): - if self._state.channelReadMetadata() { - self._receiveRequestPartInInterceptors(.metadata(headers)) - } - - case let .message(buffer): - if self._state.channelReadMessage() { - do { - let request = try self._requestDeserializer.deserialize(byteBuffer: buffer) - self._receiveRequestPartInInterceptors(.message(request)) - } catch { - self.errorCaught(context: context, error: error) - } - } - - case .end: - if self._state.channelReadEnd() { - self._receiveRequestPartInInterceptors(.end) - } - } - - // We're the last handler. We don't have anything to forward. - } - - // MARK: - Event Observer - - @inlinable - internal func observeHeaders(_ headers: HPACKHeaders) { - fatalError("must be overridden by subclasses") - } - - @inlinable - internal func observeRequest(_ message: RequestPayload) { - fatalError("must be overridden by subclasses") - } - - @inlinable - internal func observeEnd() { - fatalError("must be overridden by subclasses") - } - - @inlinable - internal func observeLibraryError(_ error: Error) { - fatalError("must be overridden by subclasses") - } - - /// Send a response part to the interceptor pipeline. Called by an event observer. - /// - Parameters: - /// - part: The response part to send. - /// - promise: A promise to complete once the response part has been written. - @inlinable - internal final func sendResponsePartFromObserver( - _ part: GRPCServerResponsePart, - promise: EventLoopPromise? - ) { - let forward: Bool - - switch part { - case .metadata: - forward = self._state.sendResponsePartFromObserver(.metadata) - case .message: - forward = self._state.sendResponsePartFromObserver(.message) - case .end: - forward = self._state.sendResponsePartFromObserver(.end) - } - - if forward { - self._sendResponsePartToInterceptors(part, promise: promise) - } else { - promise?.fail(GRPCError.AlreadyComplete()) - } - } - - /// Processes a library error to form a `GRPCStatus` and trailers to send back to the client. - /// - Parameter error: The error to process. - /// - Returns: The status and trailers to send to the client. - internal func processLibraryError(_ error: Error) -> (GRPCStatus, HPACKHeaders) { - // Observe the error if we have a delegate. - self.errorDelegate?.observeLibraryError(error) - - // What status are we terminating this RPC with? - // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge - // them with any on the call context. - // - If we don't have a delegate, then try to transform the error to a status. - // - Fallback to a generic error. - let status: GRPCStatus - let trailers: HPACKHeaders - - if let transformed = self.errorDelegate?.transformLibraryError(error) { - status = transformed.status - trailers = transformed.trailers ?? [:] - } else if let grpcStatusTransformable = error as? GRPCStatusTransformable { - status = grpcStatusTransformable.makeGRPCStatus() - trailers = [:] - } else { - // Eh... well, we don't what status to use. Use a generic one. - status = .processingError - trailers = [:] - } - - return (status, trailers) - } - - /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer. - internal func processObserverError( - _ error: Error, - headers: HPACKHeaders, - trailers: HPACKHeaders - ) -> (GRPCStatus, HPACKHeaders) { - // Observe the error if we have a delegate. - self.errorDelegate?.observeRequestHandlerError(error, headers: headers) - - // What status are we terminating this RPC with? - // - If we have a delegate, try transforming the error. If the delegate returns trailers, merge - // them with any on the call context. - // - If we don't have a delegate, then try to transform the error to a status. - // - Fallback to a generic error. - let status: GRPCStatus - let mergedTrailers: HPACKHeaders - - if let transformed = self.errorDelegate?.transformRequestHandlerError(error, headers: headers) { - status = transformed.status - if var transformedTrailers = transformed.trailers { - // The delegate returned trailers: merge in those from the context as well. - transformedTrailers.add(contentsOf: trailers) - mergedTrailers = transformedTrailers - } else { - mergedTrailers = trailers - } - } else if let grpcStatusTransformable = error as? GRPCStatusTransformable { - status = grpcStatusTransformable.makeGRPCStatus() - mergedTrailers = trailers - } else { - // Eh... well, we don't what status to use. Use a generic one. - status = .processingError - mergedTrailers = trailers - } - - return (status, mergedTrailers) - } -} - -// MARK: - Interceptor API - -extension _BaseCallHandler { - /// Receive a request part from the interceptors pipeline to forward to the event observer. - /// - Parameter part: The request part to forward. - @inlinable - internal func _receiveRequestPartFromInterceptors(_ part: GRPCServerRequestPart) { - let forward: Bool - - switch part { - case .metadata: - forward = self._state.receiveRequestPartFromInterceptors(.metadata) - case .message: - forward = self._state.receiveRequestPartFromInterceptors(.message) - case .end: - forward = self._state.receiveRequestPartFromInterceptors(.end) - } - - if forward { - self._receiveRequestPartInObserver(part) - } - } - - /// Send a response part via the `Channel`. Called once the response part has traversed the - /// interceptor pipeline. - /// - Parameters: - /// - part: The response part to send. - /// - promise: A promise to complete once the response part has been written. - @inlinable - internal func _sendResponsePartFromInterceptors( - _ part: GRPCServerResponsePart, - promise: EventLoopPromise? - ) { - let forward: Bool - - switch part { - case .metadata: - forward = self._state.sendResponsePartFromInterceptors(.metadata) - case .message: - forward = self._state.sendResponsePartFromInterceptors(.message) - case .end: - forward = self._state.sendResponsePartFromInterceptors(.end) - } - - if forward, let context = self._context { - self._writeResponsePartToChannel(context: context, part: part, promise: promise) - } else { - promise?.fail(GRPCError.AlreadyComplete()) - } - } -} - -// MARK: - State - -@usableFromInline -internal enum State { - /// Idle. We're waiting to be added to a pipeline. - case idle - - /// We're in a pipeline and receiving from the client. - case active(ActiveState) - - /// We're done. This state is terminal, all actions are ignored. - case closed -} - -@usableFromInline -internal enum RPCStreamPart { - case metadata - case message - case end -} - -extension State { - /// The state of the request and response streams. - /// - /// We track the stream state twice: between the 'Channel' and interceptor pipeline, and between - /// the interceptor pipeline and event observer. - @usableFromInline - enum StreamState { - case requestIdleResponseIdle - case requestOpenResponseIdle - case requestOpenResponseOpen - case requestClosedResponseIdle - case requestClosedResponseOpen - case requestClosedResponseClosed - - @inlinable - mutating func receiveHeaders() -> Bool { - switch self { - case .requestIdleResponseIdle: - self = .requestOpenResponseIdle - return true - - case .requestOpenResponseIdle, - .requestOpenResponseOpen, - .requestClosedResponseIdle, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - return false - } - } - - @inlinable - func receiveMessage() -> Bool { - switch self { - case .requestOpenResponseIdle, - .requestOpenResponseOpen: - return true - - case .requestIdleResponseIdle, - .requestClosedResponseIdle, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - return false - } - } - - @inlinable - mutating func receiveEnd() -> Bool { - switch self { - case .requestOpenResponseIdle: - self = .requestClosedResponseIdle - return true - - case .requestOpenResponseOpen: - self = .requestClosedResponseOpen - return true - - case .requestIdleResponseIdle, - .requestClosedResponseIdle, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - return false - } - } - - @inlinable - mutating func sendHeaders() -> Bool { - switch self { - case .requestOpenResponseIdle: - self = .requestOpenResponseOpen - return true - - case .requestClosedResponseIdle: - self = .requestClosedResponseOpen - return true - - case .requestIdleResponseIdle, - .requestOpenResponseOpen, - .requestClosedResponseOpen, - .requestClosedResponseClosed: - return false - } - } - - @inlinable - func sendMessage() -> Bool { - switch self { - case .requestOpenResponseOpen, - .requestClosedResponseOpen: - return true - - case .requestIdleResponseIdle, - .requestOpenResponseIdle, - .requestClosedResponseIdle, - .requestClosedResponseClosed: - return false - } - } - - @inlinable - mutating func sendEnd() -> Bool { - switch self { - case .requestIdleResponseIdle: - return false - - case .requestOpenResponseIdle, - .requestOpenResponseOpen, - .requestClosedResponseIdle, - .requestClosedResponseOpen: - self = .requestClosedResponseClosed - return true - - case .requestClosedResponseClosed: - return false - } - } - } - - @usableFromInline - struct ActiveState { - /// The stream state between the 'Channel' and interceptor pipeline. - @usableFromInline - var channelStreamState: StreamState - - /// The stream state between the interceptor pipeline and event observer. - @usableFromInline - var observerStreamState: StreamState - - @inlinable - init() { - self.channelStreamState = .requestIdleResponseIdle - self.observerStreamState = .requestIdleResponseIdle - } - } -} - -extension State { - /// The handler was added to the `ChannelPipeline`: this is the only way to move from the `.idle` - /// state. We only expect this to be called once. - internal mutating func handlerAdded() { - switch self { - case .idle: - // This is the only way we can become active. - self = .active(.init()) - case .active: - preconditionFailure("Invalid state: already active") - case .closed: - () - } - } - - /// Received an error from the `Channel`. - /// - Returns: True if the error should be forwarded to the error observer, or false if it should - /// be dropped. - internal func errorCaught() -> Bool { - switch self { - case .active: - return true - case .idle, .closed: - return false - } - } - - /// Receive a metadata part from the `Channel`. - /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise. - internal mutating func channelReadMetadata() -> Bool { - switch self { - case .idle: - preconditionFailure("Invalid state: the handler isn't in the pipeline yet") - case var .active(state): - let allow = state.channelStreamState.receiveHeaders() - self = .active(state) - return allow - case .closed: - return false - } - } - - /// Receive a message part from the `Channel`. - /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise. - internal func channelReadMessage() -> Bool { - switch self { - case .idle: - preconditionFailure("Invalid state: the handler isn't in the pipeline yet") - case let .active(state): - return state.channelStreamState.receiveMessage() - case .closed: - return false - } - } - - /// Receive an end-stream part from the `Channel`. - /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise. - internal mutating func channelReadEnd() -> Bool { - switch self { - case .idle: - preconditionFailure("Invalid state: the handler isn't in the pipeline yet") - case var .active(state): - let allow = state.channelStreamState.receiveEnd() - self = .active(state) - return allow - case .closed: - return false - } - } - - /// Send a response part from the observer to the interceptors. - /// - Returns: True if the part should be forwarded to the interceptor pipeline, false otherwise. - @inlinable - internal mutating func sendResponsePartFromObserver(_ part: RPCStreamPart) -> Bool { - switch self { - case .idle: - preconditionFailure("Invalid state: the handler isn't in the pipeline yet") - - case var .active(state): - // Avoid CoW-ing 'state'. - self = .idle - - let allow: Bool - - switch part { - case .metadata: - allow = state.observerStreamState.sendHeaders() - case .message: - allow = state.observerStreamState.sendMessage() - case .end: - allow = state.observerStreamState.sendEnd() - } - - // Restore the state. - self = .active(state) - return allow - - case .closed: - return false - } - } - - /// Send a response part from the interceptors to the `Channel`. - /// - Returns: True if the part should be forwarded to the `Channel`, false otherwise. - @inlinable - internal mutating func sendResponsePartFromInterceptors(_ part: RPCStreamPart) -> Bool { - switch self { - case .idle: - preconditionFailure("Invalid state: can't send response on idle call") - - case var .active(state): - // Avoid CoW-ing 'state'. - self = .idle - - let allow: Bool - - switch part { - case .metadata: - allow = state.channelStreamState.sendHeaders() - self = .active(state) - case .message: - allow = state.channelStreamState.sendMessage() - self = .active(state) - case .end: - allow = state.channelStreamState.sendEnd() - // We're sending end, we're no longer active. - self = .closed - } - - return allow - - case .closed: - // We're already closed. - return false - } - } - - /// A request part has traversed the interceptor pipeline, now send it to the observer. - /// - Returns: True if the part should be forwarded to the observer, false otherwise. - @inlinable - internal mutating func receiveRequestPartFromInterceptors(_ part: RPCStreamPart) -> Bool { - switch self { - case .idle: - preconditionFailure("Invalid state: the handler isn't in the pipeline yet") - - case var .active(state): - // Avoid CoW-ing `state`. - self = .idle - - let allow: Bool - - // Does the active state allow us to send this? - switch part { - case .metadata: - allow = state.observerStreamState.receiveHeaders() - case .message: - allow = state.observerStreamState.receiveMessage() - case .end: - allow = state.observerStreamState.receiveEnd() - } - - // Put `state` back. - self = .active(state) - return allow - - case .closed: - // We're closed, just ignore this. - return false - } - } -} - -// MARK: State Actions - -extension _BaseCallHandler { - /// Receives a request part in the interceptor pipeline. - @inlinable - internal func _receiveRequestPartInInterceptors(_ part: GRPCServerRequestPart) { - self._pipeline?.receive(part) - } - - /// Observe a request part. This just farms out to the subclass implementation for the - /// appropriate part. - @inlinable - internal func _receiveRequestPartInObserver(_ part: GRPCServerRequestPart) { - switch part { - case let .metadata(headers): - self.observeHeaders(headers) - case let .message(request): - self.observeRequest(request) - case .end: - self.observeEnd() - } - } - - /// Sends a response part into the interceptor pipeline. - @inlinable - internal func _sendResponsePartToInterceptors( - _ part: GRPCServerResponsePart, - promise: EventLoopPromise? - ) { - if let pipeline = self._pipeline { - pipeline.send(part, promise: promise) - } else { - promise?.fail(GRPCError.AlreadyComplete()) - } - } - - /// Writes a response part to the `Channel`. - @inlinable - internal func _writeResponsePartToChannel( - context: ChannelHandlerContext, - part: GRPCServerResponsePart, - promise: EventLoopPromise? - ) { - let flush: Bool - - switch part { - case let .metadata(headers): - // Only flush if we're not unary: if we're unary we'll wait for the response and end before - // emitting the flush. - flush = self._callType != .unary - context.write(self.wrapOutboundOut(.metadata(headers)), promise: promise) - - case let .message(message, metadata): - do { - let serializedResponse = try self._responseSerializer.serialize( - message, - allocator: context.channel.allocator - ) - context.write( - self.wrapOutboundOut(.message(serializedResponse, metadata)), - promise: promise - ) - // Flush if we've been told to flush. - flush = metadata.flush - } catch { - self.errorCaught(context: context, error: error) - promise?.fail(error) - return - } - - case let .end(status, trailers): - context.write(self.wrapOutboundOut(.end(status, trailers)), promise: promise) - // Always flush on end. - flush = true - } - - if flush { - context.flush() - } - } -} diff --git a/Sources/GRPC/GRPCServerRequestRoutingHandler.swift b/Sources/GRPC/GRPCServerRequestRoutingHandler.swift index fdf131628..dd51fd41e 100644 --- a/Sources/GRPC/GRPCServerRequestRoutingHandler.swift +++ b/Sources/GRPC/GRPCServerRequestRoutingHandler.swift @@ -20,10 +20,7 @@ import NIOHTTP1 import NIOHTTP2 import SwiftProtobuf -/// Processes individual gRPC messages and stream-close events on an HTTP2 channel. -public protocol GRPCCallHandler: ChannelHandler {} - -/// Provides `GRPCCallHandler` objects for the methods on a particular service name. +/// Provides `GRPCServerHandlerProtocol` objects for the methods on a particular service name. /// /// Implemented by the generated code. public protocol CallHandlerProvider: AnyObject { @@ -32,11 +29,6 @@ public protocol CallHandlerProvider: AnyObject { /// - Example: "io.grpc.Echo.EchoService" var serviceName: Substring { get } - /// Determines, calls and returns the appropriate request handler (`GRPCCallHandler`), depending on the request's - /// method. Returns nil for methods not handled by this service. - func handleMethod(_ methodName: Substring, callHandlerContext: CallHandlerContext) - -> GRPCCallHandler? - /// Returns a call handler for the method with the given name, if this service provider implements /// the given method. Returns `nil` if the method is not handled by this provider. /// - Parameters: @@ -45,24 +37,6 @@ public protocol CallHandlerProvider: AnyObject { func handle(method name: Substring, context: CallHandlerContext) -> GRPCServerHandlerProtocol? } -extension CallHandlerProvider { - // TODO: remove this once we've removed 'handleMethod(_:callHandlerContext:)'. - public func handle( - method name: Substring, - context: CallHandlerContext - ) -> GRPCServerHandlerProtocol? { - return nil - } - - // TODO: remove this once we've removed 'handleMethod(_:callHandlerContext:)'. - public func handleMethod( - _ methodName: Substring, - callHandlerContext: CallHandlerContext - ) -> GRPCCallHandler? { - return nil - } -} - // This is public because it will be passed into generated code, all members are `internal` because // the context will get passed from generated code back into gRPC library code and all members should // be considered an implementation detail to the user. diff --git a/Sources/GRPC/ServerCallContexts/ServerCallContext.swift b/Sources/GRPC/ServerCallContexts/ServerCallContext.swift index 477fc0d5d..c850c0aad 100644 --- a/Sources/GRPC/ServerCallContexts/ServerCallContext.swift +++ b/Sources/GRPC/ServerCallContexts/ServerCallContext.swift @@ -90,14 +90,6 @@ open class ServerCallContextBase: ServerCallContext { self.logger = logger } - @available(*, deprecated, renamed: "init(eventLoop:headers:logger:userInfo:)") - public init(eventLoop: EventLoop, request: HTTPRequestHead, logger: Logger) { - self.eventLoop = eventLoop - self.headers = HPACKHeaders(httpHeaders: request.headers, normalizeHTTPHeaders: false) - self.logger = logger - self.userInfoRef = .init(UserInfo()) - } - /// Processes an error, transforming it into a 'GRPCStatus' and any trailers to send to the peer. internal func processObserverError( _ error: Error, diff --git a/Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift b/Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift index 4cd41a900..d3b7a55f4 100644 --- a/Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift +++ b/Sources/GRPC/ServerCallContexts/StreamingResponseCallContext.swift @@ -51,12 +51,6 @@ open class StreamingResponseCallContext: ServerCallContextBase super.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: userInfoRef) } - @available(*, deprecated, renamed: "init(eventLoop:path:headers:logger:userInfo:)") - override public init(eventLoop: EventLoop, request: HTTPRequestHead, logger: Logger) { - self.statusPromise = eventLoop.makePromise() - super.init(eventLoop: eventLoop, request: request, logger: logger) - } - /// Send a response to the client. /// /// - Parameters: @@ -202,102 +196,6 @@ internal final class _StreamingResponseCallContext: } } -/// Concrete implementation of `StreamingResponseCallContext` used by our generated code. -open class StreamingResponseCallContextImpl: StreamingResponseCallContext { - public let channel: Channel - - /// - Parameters: - /// - channel: The NIO channel the call is handled on. - /// - headers: The headers provided with this call. - /// - errorDelegate: Provides a means for transforming status promise failures to `GRPCStatusTransformable` before - /// sending them to the client. - /// - logger: A logger. - /// - /// Note: `errorDelegate` is not called for status promise that are `succeeded` with a non-OK status. - public init( - channel: Channel, - headers: HPACKHeaders, - errorDelegate: ServerErrorDelegate?, - logger: Logger - ) { - self.channel = channel - super.init( - eventLoop: channel.eventLoop, - headers: headers, - logger: logger, - userInfoRef: Ref(UserInfo()) - ) - - self.statusPromise.futureResult.whenComplete { result in - switch result { - case let .success(status): - self.channel.writeAndFlush( - self.wrap(.end(status, self.trailers)), - promise: nil - ) - - case let .failure(error): - let (status, trailers) = self.processObserverError(error, delegate: errorDelegate) - self.channel.writeAndFlush(self.wrap(.end(status, trailers)), promise: nil) - } - } - } - - /// Wrap the response part in a `NIOAny`. This is useful in order to avoid explicitly spelling - /// out `NIOAny(WrappedResponse(...))`. - private func wrap(_ response: WrappedResponse) -> NIOAny { - return NIOAny(response) - } - - @available(*, deprecated, renamed: "init(channel:headers:errorDelegate:logger:)") - public convenience init( - channel: Channel, - request: HTTPRequestHead, - errorDelegate: ServerErrorDelegate?, - logger: Logger - ) { - self.init( - channel: channel, - headers: HPACKHeaders(httpHeaders: request.headers, normalizeHTTPHeaders: false), - errorDelegate: errorDelegate, - logger: logger - ) - } - - override open func sendResponse( - _ message: ResponsePayload, - compression: Compression = .deferToCallDefault, - promise: EventLoopPromise? - ) { - let compress = compression.isEnabled(callDefault: self.compressionEnabled) - self.channel.write( - self.wrap(.message(message, .init(compress: compress, flush: true))), - promise: promise - ) - } - - override open func sendResponses( - _ messages: Messages, - compression: Compression = .deferToCallDefault, - promise: EventLoopPromise? - ) where ResponsePayload == Messages.Element { - let compress = compression.isEnabled(callDefault: self.compressionEnabled) - - var iterator = messages.makeIterator() - var next = iterator.next() - - while let current = next { - next = iterator.next() - // Attach the promise, if present, to the last message. - let isLast = next == nil - self.channel.write( - self.wrap(.message(current, .init(compress: compress, flush: isLast))), - promise: isLast ? promise : nil - ) - } - } -} - /// Concrete implementation of `StreamingResponseCallContext` used for testing. /// /// Simply records all sent messages. diff --git a/Sources/GRPC/ServerCallContexts/UnaryResponseCallContext.swift b/Sources/GRPC/ServerCallContexts/UnaryResponseCallContext.swift index fbccf628b..4ee927497 100644 --- a/Sources/GRPC/ServerCallContexts/UnaryResponseCallContext.swift +++ b/Sources/GRPC/ServerCallContexts/UnaryResponseCallContext.swift @@ -54,12 +54,6 @@ open class UnaryResponseCallContext: ServerCallContextBase, Sta self.responsePromise = eventLoop.makePromise() super.init(eventLoop: eventLoop, headers: headers, logger: logger, userInfoRef: userInfoRef) } - - @available(*, deprecated, renamed: "init(eventLoop:headers:logger:userInfo:)") - override public init(eventLoop: EventLoop, request: HTTPRequestHead, logger: Logger) { - self.responsePromise = eventLoop.makePromise() - super.init(eventLoop: eventLoop, request: request, logger: logger) - } } /// Protocol variant of `UnaryResponseCallContext` that only exposes the `responseStatus` and `trailingMetadata` @@ -76,94 +70,6 @@ public protocol StatusOnlyCallContext: ServerCallContext { var trailers: HPACKHeaders { get set } } -extension StatusOnlyCallContext { - @available(*, deprecated, renamed: "trailers") - public var trailingMetadata: HTTPHeaders { - get { - return HTTPHeaders(self.trailers.map { ($0.name, $0.value) }) - } - set { - self.trailers = HPACKHeaders(httpHeaders: newValue) - } - } -} - -/// Concrete implementation of `UnaryResponseCallContext` used by our generated code. -open class UnaryResponseCallContextImpl: UnaryResponseCallContext { - public let channel: Channel - - /// - Parameters: - /// - channel: The NIO channel the call is handled on. - /// - headers: The headers provided with this call. - /// - errorDelegate: Provides a means for transforming response promise failures to `GRPCStatusTransformable` before - /// sending them to the client. - /// - logger: A logger. - public init( - channel: Channel, - headers: HPACKHeaders, - errorDelegate: ServerErrorDelegate?, - logger: Logger - ) { - self.channel = channel - super.init( - eventLoop: channel.eventLoop, - headers: headers, - logger: logger, - userInfoRef: .init(UserInfo()) - ) - - self.responsePromise.futureResult.whenComplete { [self, weak errorDelegate] result in - switch result { - case let .success(message): - self.handleResponse(message) - - case let .failure(error): - self.handleError(error, delegate: errorDelegate) - } - } - } - - /// Handle the response from the service provider. - private func handleResponse(_ response: ResponsePayload) { - self.channel.write( - self.wrap(.message(response, .init(compress: self.compressionEnabled, flush: false))), - promise: nil - ) - - self.channel.writeAndFlush( - self.wrap(.end(self.responseStatus, self.trailers)), - promise: nil - ) - } - - /// Handle an error from the service provider. - private func handleError(_ error: Error, delegate: ServerErrorDelegate?) { - let (status, trailers) = self.processObserverError(error, delegate: delegate) - self.channel.writeAndFlush(self.wrap(.end(status, trailers)), promise: nil) - } - - /// Wrap the response part in a `NIOAny`. This is useful in order to avoid explicitly spelling - /// out `NIOAny(WrappedResponse(...))`. - private func wrap(_ response: WrappedResponse) -> NIOAny { - return NIOAny(response) - } - - @available(*, deprecated, renamed: "init(channel:headers:errorDelegate:logger:)") - public convenience init( - channel: Channel, - request: HTTPRequestHead, - errorDelegate: ServerErrorDelegate?, - logger: Logger - ) { - self.init( - channel: channel, - headers: HPACKHeaders(httpHeaders: request.headers, normalizeHTTPHeaders: false), - errorDelegate: errorDelegate, - logger: logger - ) - } -} - /// Concrete implementation of `UnaryResponseCallContext` used for testing. /// /// Only provided to make it clear in tests that no "real" implementation is used. diff --git a/Sources/GRPC/Shims.swift b/Sources/GRPC/Shims.swift deleted file mode 100644 index 04ca89436..000000000 --- a/Sources/GRPC/Shims.swift +++ /dev/null @@ -1,467 +0,0 @@ -/* - * Copyright 2019, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Logging -import NIO -import NIOHPACK -import NIOHTTP1 -import NIOSSL -import SwiftProtobuf - -// This file contains shims to notify users of API changes between v1.0.0-alpha.1 and v1.0.0. - -// TODO: Remove these shims before v1.0.0 is tagged. - -extension ClientConnection.Configuration { - @available(*, deprecated, message: "use 'tls' and 'ClientConnection.Configuration.TLS'") - public var tlsConfiguration: TLSConfiguration? { - return nil - } -} - -extension Server.Configuration { - @available(*, deprecated, message: "use 'tls' and 'Server.Configuration.TLS'") - public var tlsConfiguration: TLSConfiguration? { - return nil - } -} - -@available(*, deprecated, renamed: "PlatformSupport") -public enum GRPCNIO {} - -extension ClientErrorDelegate { - @available(*, deprecated, message: "Please use 'didCatchError(_:logger:file:line:)' instead") - public func didCatchError(_ error: Error, file: StaticString, line: Int) {} -} - -extension GRPCStatusTransformable { - @available(*, deprecated, renamed: "makeGRPCStatus") - func asGRPCStatus() -> GRPCStatus { - return self.makeGRPCStatus() - } -} - -extension GRPCClient { - @available(*, deprecated, renamed: "channel") - public var connection: GRPCChannel { - return self.channel - } -} - -extension CallOptions { - @available( - *, - deprecated, - renamed: "init(customMetadata:timeLimit:messageEncoding:requestIDProvider:requestIDHeader:cacheable:)" - ) - public init( - customMetadata: HPACKHeaders = HPACKHeaders(), - timeout: GRPCTimeout, - messageEncoding: ClientMessageEncoding = .disabled, - requestIDProvider: RequestIDProvider = .autogenerated, - requestIDHeader: String? = nil, - cacheable: Bool = false - ) { - self.init( - customMetadata: customMetadata, - timeLimit: .timeout(timeout.asNIOTimeAmount), - messageEncoding: messageEncoding, - requestIDProvider: requestIDProvider, - requestIDHeader: requestIDHeader, - cacheable: cacheable - ) - } - - // TODO: `timeLimit.wrapped` can be private when the shims are removed. - @available(*, deprecated, renamed: "timeLimit") - public var timeout: GRPCTimeout { - get { - switch self.timeLimit.wrapped { - case .none: - return .infinite - - case let .timeout(timeout) where timeout.nanoseconds == .max: - return .infinite - - case let .deadline(deadline) where deadline == .distantFuture: - return .infinite - - case let .timeout(timeout): - return GRPCTimeout.nanoseconds(rounding: Int(timeout.nanoseconds)) - - case let .deadline(deadline): - return GRPCTimeout(deadline: deadline) - } - } - set { - self.timeLimit = .timeout(newValue.asNIOTimeAmount) - } - } -} - -extension GRPCTimeout { - /// Creates a new GRPCTimeout for the given amount of hours. - /// - /// `amount` must be positive and at most 8-digits. - /// - /// - Parameter amount: the amount of hours this `GRPCTimeout` represents. - /// - Returns: A `GRPCTimeout` representing the given number of hours. - /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func hours(_ amount: Int) throws -> GRPCTimeout { - return try makeTimeout(Int64(amount), .hours) - } - - /// Creates a new GRPCTimeout for the given amount of hours. - /// - /// The timeout will be rounded up if it may not be represented in the wire format. - /// - /// - Parameter amount: The number of hours to represent. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func hours(rounding amount: Int) -> GRPCTimeout { - return .init(rounding: Int64(amount), unit: .hours) - } - - /// Creates a new GRPCTimeout for the given amount of minutes. - /// - /// `amount` must be positive and at most 8-digits. - /// - /// - Parameter amount: the amount of minutes this `GRPCTimeout` represents. - /// - Returns: A `GRPCTimeout` representing the given number of minutes. - /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func minutes(_ amount: Int) throws -> GRPCTimeout { - return try makeTimeout(Int64(amount), .minutes) - } - - /// Creates a new GRPCTimeout for the given amount of minutes. - /// - /// The timeout will be rounded up if it may not be represented in the wire format. - /// - /// - Parameter amount: The number of minutes to represent. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func minutes(rounding amount: Int) -> GRPCTimeout { - return .init(rounding: Int64(amount), unit: .minutes) - } - - /// Creates a new GRPCTimeout for the given amount of seconds. - /// - /// `amount` must be positive and at most 8-digits. - /// - /// - Parameter amount: the amount of seconds this `GRPCTimeout` represents. - /// - Returns: A `GRPCTimeout` representing the given number of seconds. - /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func seconds(_ amount: Int) throws -> GRPCTimeout { - return try makeTimeout(Int64(amount), .seconds) - } - - /// Creates a new GRPCTimeout for the given amount of seconds. - /// - /// The timeout will be rounded up if it may not be represented in the wire format. - /// - /// - Parameter amount: The number of seconds to represent. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func seconds(rounding amount: Int) -> GRPCTimeout { - return .init(rounding: Int64(amount), unit: .seconds) - } - - /// Creates a new GRPCTimeout for the given amount of milliseconds. - /// - /// `amount` must be positive and at most 8-digits. - /// - /// - Parameter amount: the amount of milliseconds this `GRPCTimeout` represents. - /// - Returns: A `GRPCTimeout` representing the given number of milliseconds. - /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func milliseconds(_ amount: Int) throws -> GRPCTimeout { - return try makeTimeout(Int64(amount), .milliseconds) - } - - /// Creates a new GRPCTimeout for the given amount of milliseconds. - /// - /// The timeout will be rounded up if it may not be represented in the wire format. - /// - /// - Parameter amount: The number of milliseconds to represent. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func milliseconds(rounding amount: Int) -> GRPCTimeout { - return .init(rounding: Int64(amount), unit: .milliseconds) - } - - /// Creates a new GRPCTimeout for the given amount of microseconds. - /// - /// `amount` must be positive and at most 8-digits. - /// - /// - Parameter amount: the amount of microseconds this `GRPCTimeout` represents. - /// - Returns: A `GRPCTimeout` representing the given number of microseconds. - /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func microseconds(_ amount: Int) throws -> GRPCTimeout { - return try makeTimeout(Int64(amount), .microseconds) - } - - /// Creates a new GRPCTimeout for the given amount of microseconds. - /// - /// The timeout will be rounded up if it may not be represented in the wire format. - /// - /// - Parameter amount: The number of microseconds to represent. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func microseconds(rounding amount: Int) -> GRPCTimeout { - return .init(rounding: Int64(amount), unit: .microseconds) - } - - /// Creates a new GRPCTimeout for the given amount of nanoseconds. - /// - /// `amount` must be positive and at most 8-digits. - /// - /// - Parameter amount: the amount of nanoseconds this `GRPCTimeout` represents. - /// - Returns: A `GRPCTimeout` representing the given number of nanoseconds. - /// - Throws: `GRPCTimeoutError` if the amount was negative or more than 8 digits long. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func nanoseconds(_ amount: Int) throws -> GRPCTimeout { - return try makeTimeout(Int64(amount), .nanoseconds) - } - - /// Creates a new GRPCTimeout for the given amount of nanoseconds. - /// - /// The timeout will be rounded up if it may not be represented in the wire format. - /// - /// - Parameter amount: The number of nanoseconds to represent. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public static func nanoseconds(rounding amount: Int) -> GRPCTimeout { - return .init(rounding: Int64(amount), unit: .nanoseconds) - } -} - -extension GRPCTimeout { - /// Returns a NIO `TimeAmount` representing the amount of time as this timeout. - @available( - *, - deprecated, - message: "Use 'TimeLimit.timeout(_:)' or 'TimeLimit.deadline(_:)' instead." - ) - public var asNIOTimeAmount: TimeAmount { - return TimeAmount.nanoseconds(numericCast(nanoseconds)) - } - - internal static func makeTimeout(_ amount: Int64, _ unit: GRPCTimeoutUnit) throws -> GRPCTimeout { - // Timeouts must be positive and at most 8-digits. - if amount < 0 { - throw GRPCTimeoutError.negative - } - if amount > GRPCTimeout.maxAmount { - throw GRPCTimeoutError.tooManyDigits - } - return .init(amount: amount, unit: unit) - } -} - -// These will be obsoleted when the shims are removed. - -/// Errors thrown when constructing a timeout. -public struct GRPCTimeoutError: Error, Equatable, CustomStringConvertible { - private enum BaseError { - case negative - case tooManyDigits - } - - private var error: BaseError - - private init(_ error: BaseError) { - self.error = error - } - - public var description: String { - switch self.error { - case .negative: - return "GRPCTimeoutError: time amount must not be negative" - case .tooManyDigits: - return "GRPCTimeoutError: too many digits to represent using the gRPC wire-format" - } - } - - /// The timeout is negative. - public static let negative = GRPCTimeoutError(.negative) - - /// The number of digits in the timeout amount is more than 8-digits and cannot be encoded in - /// the gRPC wire-format. - public static let tooManyDigits = GRPCTimeoutError(.tooManyDigits) -} - -extension UnaryCallHandler { - @available(*, unavailable, message: "Please regenerate your code or use 'CallHandler.makeUnary'") - public convenience init( - callHandlerContext: CallHandlerContext, - eventObserverFactory: @escaping (UnaryResponseCallContext) - -> (RequestPayload) -> EventLoopFuture - ) { - fatalError("Unimplemented: please regenerate your code.") - } -} - -extension ServerStreamingCallHandler { - @available( - *, - unavailable, - message: "Please regenerate your code or use 'CallHandler.makeServerStreaming'" - ) - public convenience init( - callHandlerContext: CallHandlerContext, - eventObserverFactory: @escaping (StreamingResponseCallContext) - -> (RequestPayload) -> EventLoopFuture - ) { - fatalError("Unimplemented: please regenerate your code.") - } -} - -extension ClientStreamingCallHandler { - @available( - *, - unavailable, - message: "Please regenerate your code or use 'CallHandler.makeClientStreaming'" - ) - public convenience init( - callHandlerContext: CallHandlerContext, - eventObserverFactory: @escaping (UnaryResponseCallContext) - -> EventLoopFuture<(StreamEvent) -> Void> - ) { - fatalError("Unimplemented: please regenerate your code.") - } -} - -extension BidirectionalStreamingCallHandler { - @available( - *, - unavailable, - message: "Please regenerate your code or use 'CallHandler.makeBidirectionalStreaming'" - ) - public convenience init( - callHandlerContext: CallHandlerContext, - eventObserverFactory: @escaping (StreamingResponseCallContext) - -> EventLoopFuture<(StreamEvent) -> Void> - ) { - fatalError("Unimplemented: please regenerate your code.") - } -} - -@available(*, deprecated, message: "This protocol is longer required. Please regenerate your code.") -public protocol GRPCProtobufPayload {} - -extension GRPCStatus.Code { - @available(*, deprecated, message: "Use a 'default' branch") - public static let doNotUse = GRPCStatus.Code.unknown -} - -@available(*, deprecated, renamed: "GRPCStatusAndTrailers") -public typealias GRPCStatusAndMetadata = GRPCStatusAndTrailers - -extension GRPCStatusAndTrailers { - @available(*, deprecated, renamed: "init(status:trailers:)") - public init(status: GRPCStatus, metadata: HTTPHeaders?) { - self.status = status - self.trailers = metadata.map { httpHeaders in - HPACKHeaders(httpHeaders: httpHeaders, normalizeHTTPHeaders: false) - } - } - - @available(*, deprecated, renamed: "trailers") - public var metadata: HTTPHeaders? { - get { - return self.trailers.map { - HTTPHeaders($0.map { ($0.name, $0.value) }) - } - } - set { - self.trailers = newValue.map { - // We'll normalize these before sending them. - HPACKHeaders(httpHeaders: $0, normalizeHTTPHeaders: false) - } - } - } -} - -extension ServerCallContext { - /// Generic metadata provided with this request. - @available(*, deprecated, message: "Use 'headers' to get information found in 'request'") - public var request: HTTPRequestHead { - return HTTPRequestHead( - version: .init(major: 2, minor: 0), - method: .POST, - uri: self.headers.first(name: ":path") ?? "", - headers: HTTPHeaders(self.headers.map { ($0.name, $0.value) }) - ) - } -} - -extension ServerCallContextBase { - @available(*, deprecated, renamed: "trailers") - public var trailingMetadata: HTTPHeaders { - get { - return HTTPHeaders(self.trailers.map { ($0.name, $0.value) }) - } - set { - // No need to normalize; we'll do it in the 2-to-1 handler. - self.trailers = HPACKHeaders(httpHeaders: newValue, normalizeHTTPHeaders: false) - } - } -} diff --git a/Sources/GRPC/TimeLimit.swift b/Sources/GRPC/TimeLimit.swift index 574966ec6..26406967b 100644 --- a/Sources/GRPC/TimeLimit.swift +++ b/Sources/GRPC/TimeLimit.swift @@ -26,14 +26,14 @@ import NIO /// may therefore complete with `.deadlineExceeded` even if no time limit was set by the client. public struct TimeLimit: Equatable, CustomStringConvertible { // private but for shimming. - internal enum Wrapped: Equatable { + private enum Wrapped: Equatable { case none case timeout(TimeAmount) case deadline(NIODeadline) } // private but for shimming. - internal var wrapped: Wrapped + private var wrapped: Wrapped private init(_ wrapped: Wrapped) { self.wrapped = wrapped