Skip to content

Commit

Permalink
Add scaffolding for client interceptors (#986)
Browse files Browse the repository at this point in the history
Motivation:

To support client interceptors we first need to define a few types to
work with.

Modifications:

- Add client request and response parts.
- Add the client interceptor protocol.
- Add the client interceptor context, which is passed to functions on
  the interceptor protocol in order to allow implementations to invoke
  functions on interceptors either side of it in the pipeline.
- Add a stubbed out client interceptor pipeline.

Result:

We have some base types for client side interceptors.
  • Loading branch information
glbrntt committed Nov 6, 2020
1 parent d43f258 commit c9df463
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 0 deletions.
25 changes: 25 additions & 0 deletions Sources/GRPC/Array+BoundsCheck.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

extension Array {
internal subscript(checked index: Index) -> Element? {
if self.indices.contains(index) {
return self[index]
} else {
return nil
}
}
}
40 changes: 40 additions & 0 deletions Sources/GRPC/Interceptor/ClientInterceptor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIO

internal protocol ClientInterceptor {
associatedtype Request
associatedtype Response

/// Called when the interceptor has received a response part to handle.
func read(
_ part: ClientResponsePart<Response>,
context: ClientInterceptorContext<Request, Response>
)

/// Called when the interceptor has received a request part to handle.
func write(
_ part: ClientRequestPart<Request>,
promise: EventLoopPromise<Void>?,
context: ClientInterceptorContext<Request, Response>
)

/// Called when the interceptor has received a request to cancel the RPC.
func cancel(
promise: EventLoopPromise<Void>?,
context: ClientInterceptorContext<Request, Response>
)
}
127 changes: 127 additions & 0 deletions Sources/GRPC/Interceptor/ClientInterceptorContext.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIO

public struct ClientInterceptorContext<Request, Response> {
/// The pipeline this context is associated with.
private let pipeline: ClientInterceptorPipeline<Request, Response>

/// The index of this context's interceptor within the pipeline.
private let index: Int

// The next context in the inbound direction, if one exists.
private var nextInbound: ClientInterceptorContext<Request, Response>? {
return self.pipeline.context(atIndex: self.index + 1)
}

// The next context in the outbound direction, if one exists.
private var nextOutbound: ClientInterceptorContext<Request, Response>? {
return self.pipeline.context(atIndex: self.index - 1)
}

/// The `EventLoop` this interceptor pipeline is being executed on.
public var eventLoop: EventLoop {
return self.pipeline.eventLoop
}

/// A logger.
public var logger: Logger {
return self.pipeline.logger
}

/// Construct a `ClientInterceptorContext` for the interceptor at the given index within in
/// interceptor pipeline.
internal init(pipeline: ClientInterceptorPipeline<Request, Response>, index: Int) {
self.pipeline = pipeline
self.index = index
}

/// Forwards the response part to the next inbound interceptor in the pipeline, if there is one.
///
/// - Parameter part: The response part to forward.
/// - Important: This *must* to be called from the `eventLoop`.
public func read(_ part: ClientResponsePart<Response>) {
self._read(part)
}

/// Forwards the request part to the next outbound interceptor in the pipeline, if there is one.
///
/// - Parameters:
/// - part: The request part to forward.
/// - promise: The promise the complete when the part has been written.
/// - Important: This *must* to be called from the `eventLoop`.
public func write(
_ part: ClientRequestPart<Request>,
promise: EventLoopPromise<Void>?
) {
self._write(part, promise: promise)
}

/// Forwards a request to cancel the RPC to the next outbound interceptor in the pipeline.
///
/// - Parameter promise: The promise to complete with the outcome of the cancellation request.
/// - Important: This *must* to be called from the `eventLoop`.
public func cancel(promise: EventLoopPromise<Void>?) {
self._cancel(promise: promise)
}
}

extension ClientInterceptorContext {
private func _read(_ part: ClientResponsePart<Response>) {
self.eventLoop.assertInEventLoop()
self.nextInbound?.invokeRead(part)
}

private func _write(
_ part: ClientRequestPart<Request>,
promise: EventLoopPromise<Void>?
) {
self.eventLoop.assertInEventLoop()

if let outbound = self.nextOutbound {
outbound.invokeWrite(part, promise: promise)
} else {
promise?.fail(GRPCStatus(code: .unavailable, message: "The RPC has already completed"))
}
}

private func _cancel(promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()

if let outbound = self.nextOutbound {
outbound.invokeCancel(promise: promise)
} else {
// The RPC has already been completed. Should cancellation fail?
promise?.succeed(())
}
}

internal func invokeRead(_ part: ClientResponsePart<Response>) {
self.eventLoop.assertInEventLoop()
fatalError("TODO: call the interceptor")
}

internal func invokeWrite(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()
fatalError("TODO: call the interceptor")
}

internal func invokeCancel(promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()
fatalError("TODO: call the interceptor")
}
}
138 changes: 138 additions & 0 deletions Sources/GRPC/Interceptor/ClientInterceptorPipeline.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Logging
import NIO
import NIOHPACK
import NIOHTTP2

/// A pipeline for intercepting client request and response streams.
///
/// The interceptor pipeline lies between the call object (`UnaryCall`, `ClientStreamingCall`, etc.)
/// and the transport used to send and receive messages from the server (a `NIO.Channel`). It holds
/// a collection of interceptors which may be used to observe or alter messages as the travel
/// through the pipeline.
///
/// ```
/// ┌───────────────────────────────────────────────────────────────────┐
/// │ Call │
/// └────────────────────────────────────────────────────────┬──────────┘
/// │ write(_:promise) /
/// │ cancel(promise:)
/// ┌────────────────────────────────────────────────────────▼──────────┐
/// │ InterceptorPipeline ╎ │
/// │ ╎ │
/// │ ┌──────────────────────────────────────────────────────▼────────┐ │
/// │ │ Tail Interceptor (hands response parts to a callback) │ │
/// │ └────────▲─────────────────────────────────────────────┬────────┘ │
/// │ ╎ ╎ │
/// │ ╎ (More interceptors) ╎ │
/// │ ╎ ╎ │
/// │ ┌────────┴─────────────────────────────────────────────▼────────┐ │
/// │ │ Interceptor 2 │ │
/// │ └────────▲─────────────────────────────────────────────┬────────┘ │
/// │ ┌────────┴─────────────────────────────────────────────▼────────┐ │
/// │ │ Interceptor 1 │ │
/// │ └────────▲─────────────────────────────────────────────┬────────┘ │
/// │ ┌────────┴─────────────────────────────────────────────▼────────┐ │
/// │ │ Head Interceptor (interacts with transport) │ │
/// │ └────────▲─────────────────────────────────────────────┬────────┘ │
/// │ read(_:)╎ │ │
/// └──────────▲─────────────────────────────────────────────┼──────────┘
/// read(_:)│ │ write(_:promise:) /
/// │ │ cancel(promise:)
/// ┌──────────┴─────────────────────────────────────────────▼──────────┐
/// │ ClientTransport │
/// │ (a NIO.ChannelHandler) │
/// ```
internal final class ClientInterceptorPipeline<Request, Response> {
/// A logger.
internal let logger: Logger

/// The `EventLoop` this RPC is being executed on.
internal let eventLoop: EventLoop

/// The contexts associated with the interceptors stored in this pipeline. Context will be removed
/// once the RPC has completed.
private var contexts: [ClientInterceptorContext<Request, Response>]

/// Returns the context for the given index, if one exists.
/// - Parameter index: The index of the `ClientInterceptorContext` to return.
/// - Returns: The `ClientInterceptorContext` or `nil` if one does not exist for the given index.
internal func context(atIndex index: Int) -> ClientInterceptorContext<Request, Response>? {
return self.contexts[checked: index]
}

/// The context closest to the `NIO.Channel`, i.e. where inbound events originate. This will be
/// `nil` once the RPC has completed.
private var head: ClientInterceptorContext<Request, Response>? {
return self.contexts.first
}

/// The context closest to the application, i.e. where outbound events originate. This will be
/// `nil` once the RPC has completed.
private var tail: ClientInterceptorContext<Request, Response>? {
return self.contexts.last
}

internal init() {
fatalError("Not yet implemented.")
}

/// Emit a response part message into the interceptor pipeline.
///
/// This should be called by the transport layer when receiving a response part from the server.
///
/// - Parameter part: The part to emit into the pipeline.
/// - Important: This *must* to be called from the `eventLoop`.
internal func read(_ part: ClientResponsePart<Response>) {
self.eventLoop.assertInEventLoop()
self.head?.invokeRead(part)
}

/// Writes a request message into the interceptor pipeline.
///
/// This should be called by the call object to send requests parts to the transport.
///
/// - Parameters:
/// - part: The request part to write.
/// - promise: A promise to complete when the request part has been successfully written.
/// - Important: This *must* to be called from the `eventLoop`.
internal func write(_ part: ClientRequestPart<Request>, promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()

if let tail = self.tail {
tail.invokeWrite(part, promise: promise)
} else {
promise?.fail(GRPCStatus(code: .unavailable, message: "The RPC has already completed"))
}
}

/// Send a request to cancel the RPC through the interceptor pipeline.
///
/// This should be called by the call object when attempting to cancel the RPC.
///
/// - Parameter promise: A promise to complete when the cancellation request has been handled.
/// - Important: This *must* to be called from the `eventLoop`.
internal func cancel(promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()

if let tail = self.tail {
tail.invokeCancel(promise: promise)
} else {
promise?.fail(GRPCStatus(code: .unavailable, message: "The RPC has already completed"))
}
}
}
58 changes: 58 additions & 0 deletions Sources/GRPC/Interceptor/ClientParts.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NIOHPACK

public enum ClientRequestPart<Request> {
/// User provided metadata sent at the start of the request stream.
case metadata(HPACKHeaders)

/// A message to send to the server.
case message(Request, Metadata)

/// End the request stream.
case end

/// Metadata associated with a request message.
public struct Metadata {
/// Whether the message should be compressed. If compression has not been enabled on the RPC
/// then setting is ignored.
public var compress: Bool

/// Whether the underlying transported should be 'flushed' after writing this message. If a batch
/// of messages is to be sent then flushing only after the last message may improve
/// performance.
public var flush: Bool

public init(compress: Bool, flush: Bool) {
self.compress = compress
self.flush = flush
}
}
}

public enum ClientResponsePart<Response> {
/// The initial metadata returned by the server.
case metadata(HPACKHeaders)

/// A response message from the server.
case message(Response)

/// The end of response stream sent by the server.
case end(GRPCStatus, HPACKHeaders)

/// Error.
case error(Error)
}
Loading

0 comments on commit c9df463

Please sign in to comment.