Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server handler state machine #1396

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.6)
import NIOHPACK

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ServerHandlerStateMachine {
enum HandleMetadataAction {
/// Invoke the user handler.
case invokeHandler(Ref<UserInfo>, CallHandlerContext)
/// Cancel the RPC, the metadata was not expected.
case cancel
}

enum HandleMessageAction: Hashable {
/// Forward the message to the interceptors, via the interceptor state machine.
case forward
/// Cancel the RPC, the message was not expected.
case cancel
}

/// The same as 'HandleMessageAction.
typealias HandleEndAction = HandleMessageAction

enum SendMessageAction: Equatable {
/// Intercept the message, but first intercept the headers if they are non-nil. Must go via
/// the interceptor state machine first.
case intercept(headers: HPACKHeaders?)
/// Drop the message.
case drop
}

enum SendStatusAction: Equatable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these not Hashable for a reason?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because HPACKHeaders is not Hashable.

We don't really need Equatable or Hashable here, but it's useful for testing.

/// Intercept the status, providing the given trailers.
case intercept(trailers: HPACKHeaders)
/// Drop the status.
case drop
}

enum CancelAction: Hashable {
/// Cancel and nil out the handler 'bits'.
case cancelAndNilOutHandlerComponents
/// Don't do anything.
case none
}
}
#endif // compiler(>=5.6)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.6)
import NIOHPACK

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ServerHandlerStateMachine {
/// In the 'Draining' state the user handler has been invoked and the request stream has been
/// closed (i.e. we have seen 'end' but it has not necessarily been consumed by the user handler).
/// We can transition to a new state either by sending the end of the response stream or by
/// cancelling.
internal struct Draining {
typealias NextStateAndOutput<Output> =
ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Draining.NextState,
Output
>

/// Whether the response headers have been written yet.
private var headersWritten: Bool
internal let context: GRPCAsyncServerCallContext

init(from state: ServerHandlerStateMachine.Handling) {
self.headersWritten = state.headersWritten
self.context = state.context
}

mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
return .init(nextState: .draining(self), output: .cancel)
}

mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
return .init(nextState: .draining(self), output: .cancel)
}

mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
// We're already draining, i.e. the inbound stream is closed, cancel the RPC.
return .init(nextState: .draining(self), output: .cancel)
}

mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
let headers: HPACKHeaders?

if self.headersWritten {
headers = nil
} else {
self.headersWritten = true
headers = self.context.initialResponseMetadata
}

return .init(nextState: .draining(self), output: .intercept(headers: headers))
}

mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
let trailers = self.context.trailingResponseMetadata
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers))
}

mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents)
}
}
}
#endif // compiler(>=5.6)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.6)

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ServerHandlerStateMachine {
internal struct Finished {
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Finished.NextState,
Output
>

internal init(from state: ServerHandlerStateMachine.Idle) {}
internal init(from state: ServerHandlerStateMachine.Handling) {}
internal init(from state: ServerHandlerStateMachine.Draining) {}

mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
return .init(nextState: .finished(self), output: .cancel)
}

mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
return .init(nextState: .finished(self), output: .cancel)
}

mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
return .init(nextState: .finished(self), output: .cancel)
}

mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
return .init(nextState: .finished(self), output: .drop)
}

mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
return .init(nextState: .finished(self), output: .drop)
}

mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
return .init(nextState: .finished(self), output: .cancelAndNilOutHandlerComponents)
}
}
}
#endif // compiler(>=5.6)
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.6)
import NIOHPACK

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ServerHandlerStateMachine {
/// In the 'Handling' state the user handler has been invoked and the request stream is open (but
/// the request metadata has already been seen). We can transition to a new state either by
/// receiving the end of the request stream or by closing the response stream. Cancelling also
/// moves us to the finished state.
internal struct Handling {
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Handling.NextState,
Output
>

/// Whether response headers have been written (they are written lazily rather than on receipt
/// of the request headers).
internal private(set) var headersWritten: Bool

/// A context held by user handler which may be used to alter the response headers or trailers.
internal let context: GRPCAsyncServerCallContext

/// Transition from the 'Idle' state.
init(from state: ServerHandlerStateMachine.Idle, context: GRPCAsyncServerCallContext) {
self.headersWritten = false
self.context = context
}

mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
// We are in the 'Handling' state because we received metadata. If we receive it again we
// should cancel the RPC.
return .init(nextState: .handling(self), output: .cancel)
}

mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
// We can always forward a message since receiving the end of the request stream causes a
// transition to the 'draining' state.
return .init(nextState: .handling(self), output: .forward)
}

mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
// The request stream is finished: move to the draining state so the user handler can finish
// executing.
return .init(nextState: .draining(from: self), output: .forward)
}

mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> {
let headers: HPACKHeaders?

// We send headers once, lazily, when the first message is sent back.
if self.headersWritten {
headers = nil
} else {
self.headersWritten = true
headers = self.context.initialResponseMetadata
}

return .init(nextState: .handling(self), output: .intercept(headers: headers))
}

mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> {
// Sending the status is the final action taken by the user handler. We can always send
// them from this state and doing so means the user handler has completed.
let trailers = self.context.trailingResponseMetadata
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers))
}

mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents)
}
}
}
#endif // compiler(>=5.6)
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2022, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.6)

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension ServerHandlerStateMachine {
/// In the 'Idle' state nothing has happened. To advance we must either receive metadata (i.e.
/// the request headers) and invoke the handler, or we are cancelled.
@usableFromInline
internal struct Idle {
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput<
ServerHandlerStateMachine.Idle.NextState,
Output
>

/// A ref to the `UserInfo`. We hold on to this until we're ready to invoke the handler.
let userInfoRef: Ref<UserInfo>
/// A bag of bits required to construct a context passed to the user handler when it is invoked.
let callHandlerContext: CallHandlerContext

/// The state of the inbound stream, i.e. the request stream.
internal private(set) var inboundState: ServerInterceptorStateMachine.InboundStreamState

init(userInfoRef: Ref<UserInfo>, context: CallHandlerContext) {
self.userInfoRef = userInfoRef
self.callHandlerContext = context
self.inboundState = .idle
}

mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> {
let action: HandleMetadataAction

switch self.inboundState.receiveMetadata() {
case .accept:
// We tell the caller to invoke the handler immediately: they should then call
// 'handlerInvoked' on the state machine which will cause a transition to the next state.
action = .invokeHandler(self.userInfoRef, self.callHandlerContext)
case .reject:
action = .cancel
}

return .init(nextState: .idle(self), output: action)
}

mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> {
// We can't receive a message before the metadata, doing so is a protocol violation.
return .init(nextState: .idle(self), output: .cancel)
}

mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> {
// Receiving 'end' before we start is odd but okay, just cancel.
return .init(nextState: .idle(self), output: .cancel)
}

mutating func handlerInvoked(
context: GRPCAsyncServerCallContext
) -> Self.NextStateAndOutput<Void> {
// The handler was invoked as a result of receiving metadata. Move to the next state.
return .init(nextState: .handling(from: self, context: context))
}

mutating func cancel() -> Self.NextStateAndOutput<CancelAction> {
// There's no handler to cancel. Move straight to finished.
return .init(nextState: .finished(from: self), output: .none)
}
}
}
#endif // compiler(>=5.6)
Loading