Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new state machines in async server handler #1403

Merged
merged 2 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
matrix:
include:
- image: swift:5.6-focal
swift-test-flags: "--sanitize=thread"
- image: swift:5.5-focal
swift-test-flags: "--sanitize=thread"
- image: swift:5.4-focal
Expand Down Expand Up @@ -106,8 +107,6 @@ jobs:
GRPC_NO_NIO_SSL: 1
timeout-minutes: 20
- name: Test without NIOSSL
# Skip tests on 5.6: https://bugs.swift.org/browse/SR-15955
if: ${{ matrix.image != 'swift:5.6-focal' }}
run: swift test
env:
GRPC_NO_NIO_SSL: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import NIOHPACK

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ServerHandlerStateMachine {
@usableFromInline
enum HandleMetadataAction {
/// Invoke the user handler.
case invokeHandler(Ref<UserInfo>, CallHandlerContext)
/// Cancel the RPC, the metadata was not expected.
case cancel
}

@usableFromInline
enum HandleMessageAction: Hashable {
/// Forward the message to the interceptors, via the interceptor state machine.
case forward
Expand All @@ -33,8 +35,10 @@ extension ServerHandlerStateMachine {
}

/// The same as 'HandleMessageAction.
@usableFromInline
typealias HandleEndAction = HandleMessageAction

@usableFromInline
enum SendMessageAction: Equatable {
/// Intercept the message, but first intercept the headers if they are non-nil. Must go via
/// the interceptor state machine first.
Expand All @@ -43,13 +47,15 @@ extension ServerHandlerStateMachine {
case drop
}

@usableFromInline
enum SendStatusAction: Equatable {
/// Intercept the status, providing the given trailers.
case intercept(trailers: HPACKHeaders)
case intercept(requestHeaders: HPACKHeaders, trailers: HPACKHeaders)
/// Drop the status.
case drop
}

@usableFromInline
enum CancelAction: Hashable {
/// Cancel and nil out the handler 'bits'.
case cancelAndNilOutHandlerComponents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,46 @@ extension ServerHandlerStateMachine {
/// closed (i.e. we have seen 'end' but it has not necessarily been consumed by the user handler).
/// We can transition to a new state either by sending the end of the response stream or by
/// cancelling.
@usableFromInline
internal struct Draining {
@usableFromInline
typealias NextStateAndOutput<Output> =
ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Draining.NextState,
Output
>

/// Whether the response headers have been written yet.
private var headersWritten: Bool
@usableFromInline
internal private(set) var headersWritten: Bool
@usableFromInline
internal let context: GRPCAsyncServerCallContext

@inlinable
init(from state: ServerHandlerStateMachine.Handling) {
self.headersWritten = state.headersWritten
self.context = state.context
}

@inlinable
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
return .init(nextState: .draining(self), output: .cancel)
}

@inlinable
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
return .init(nextState: .draining(self), output: .cancel)
}

@inlinable
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
return .init(nextState: .draining(self), output: .cancel)
}

@inlinable
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
let headers: HPACKHeaders?

Expand All @@ -66,11 +75,16 @@ extension ServerHandlerStateMachine {
return .init(nextState: .draining(self), output: .intercept(headers: headers))
}

@inlinable
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
let trailers = self.context.trailingResponseMetadata
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers))
return .init(
nextState: .finished(from: self),
output: .intercept(requestHeaders: self.context.requestMetadata, trailers: trailers)
)
}

@inlinable
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,47 @@

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ServerHandlerStateMachine {
@usableFromInline
internal struct Finished {
@usableFromInline
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Finished.NextState,
Output
>

@inlinable
internal init(from state: ServerHandlerStateMachine.Idle) {}
@inlinable
internal init(from state: ServerHandlerStateMachine.Handling) {}
@inlinable
internal init(from state: ServerHandlerStateMachine.Draining) {}

@inlinable
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
return .init(nextState: .finished(self), output: .cancel)
}

@inlinable
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
return .init(nextState: .finished(self), output: .cancel)
}

@inlinable
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
return .init(nextState: .finished(self), output: .cancel)
}

@inlinable
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
return .init(nextState: .finished(self), output: .drop)
}

@inlinable
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
return .init(nextState: .finished(self), output: .drop)
}

@inlinable
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
return .init(nextState: .finished(self), output: .cancelAndNilOutHandlerComponents)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,52 @@ extension ServerHandlerStateMachine {
/// the request metadata has already been seen). We can transition to a new state either by
/// receiving the end of the request stream or by closing the response stream. Cancelling also
/// moves us to the finished state.
@usableFromInline
internal struct Handling {
@usableFromInline
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Handling.NextState,
Output
>

/// Whether response headers have been written (they are written lazily rather than on receipt
/// of the request headers).
@usableFromInline
internal private(set) var headersWritten: Bool

/// A context held by user handler which may be used to alter the response headers or trailers.
@usableFromInline
internal let context: GRPCAsyncServerCallContext

/// Transition from the 'Idle' state.
@inlinable
init(from state: ServerHandlerStateMachine.Idle, context: GRPCAsyncServerCallContext) {
self.headersWritten = false
self.context = context
}

@inlinable
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
// We are in the 'Handling' state because we received metadata. If we receive it again we
// should cancel the RPC.
return .init(nextState: .handling(self), output: .cancel)
}

@inlinable
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
// We can always forward a message since receiving the end of the request stream causes a
// transition to the 'draining' state.
return .init(nextState: .handling(self), output: .forward)
}

@inlinable
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
// The request stream is finished: move to the draining state so the user handler can finish
// executing.
return .init(nextState: .draining(from: self), output: .forward)
}

@inlinable
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
let headers: HPACKHeaders?

Expand All @@ -73,13 +82,18 @@ extension ServerHandlerStateMachine {
return .init(nextState: .handling(self), output: .intercept(headers: headers))
}

@inlinable
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
// Sending the status is the final action taken by the user handler. We can always send
// them from this state and doing so means the user handler has completed.
let trailers = self.context.trailingResponseMetadata
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers))
return .init(
nextState: .finished(from: self),
output: .intercept(requestHeaders: self.context.requestMetadata, trailers: trailers)
)
}

@inlinable
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,31 @@ extension ServerHandlerStateMachine {
/// the request headers) and invoke the handler, or we are cancelled.
@usableFromInline
internal struct Idle {
@usableFromInline
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Idle.NextState,
Output
>

/// A ref to the `UserInfo`. We hold on to this until we're ready to invoke the handler.
@usableFromInline
let userInfoRef: Ref<UserInfo>
/// A bag of bits required to construct a context passed to the user handler when it is invoked.
@usableFromInline
let callHandlerContext: CallHandlerContext

/// The state of the inbound stream, i.e. the request stream.
@usableFromInline
internal private(set) var inboundState: ServerInterceptorStateMachine.InboundStreamState

@inlinable
init(userInfoRef: Ref<UserInfo>, context: CallHandlerContext) {
self.userInfoRef = userInfoRef
self.callHandlerContext = context
self.inboundState = .idle
}

@inlinable
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
let action: HandleMetadataAction

Expand All @@ -55,23 +61,27 @@ extension ServerHandlerStateMachine {
return .init(nextState: .idle(self), output: action)
}

@inlinable
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
// We can't receive a message before the metadata, doing so is a protocol violation.
return .init(nextState: .idle(self), output: .cancel)
}

@inlinable
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
// Receiving 'end' before we start is odd but okay, just cancel.
return .init(nextState: .idle(self), output: .cancel)
}

@inlinable
mutating func handlerInvoked(
context: GRPCAsyncServerCallContext
) -> Self.NextStateAndOutput<Void> {
// The handler was invoked as a result of receiving metadata. Move to the next state.
return .init(nextState: .handling(from: self, context: context))
}

@inlinable
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
// There's no handler to cancel. Move straight to finished.
return .init(nextState: .finished(from: self), output: .none)
Expand Down
Loading