-
Notifications
You must be signed in to change notification settings - Fork 420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Server handler state machine #1396
Merged
glbrntt
merged 1 commit into
grpc:1.7.1-async-await
from
glbrntt:gb-async-handler-handler-state-machine
May 4, 2022
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
60 changes: 60 additions & 0 deletions
60
...port/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Actions.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright 2022, gRPC Authors All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#if compiler(>=5.6) | ||
import NIOHPACK | ||
|
||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
extension ServerHandlerStateMachine { | ||
enum HandleMetadataAction { | ||
/// Invoke the user handler. | ||
case invokeHandler(Ref<UserInfo>, CallHandlerContext) | ||
/// Cancel the RPC, the metadata was not expected. | ||
case cancel | ||
} | ||
|
||
enum HandleMessageAction: Hashable { | ||
/// Forward the message to the interceptors, via the interceptor state machine. | ||
case forward | ||
/// Cancel the RPC, the message was not expected. | ||
case cancel | ||
} | ||
|
||
/// The same as 'HandleMessageAction. | ||
typealias HandleEndAction = HandleMessageAction | ||
|
||
enum SendMessageAction: Equatable { | ||
/// Intercept the message, but first intercept the headers if they are non-nil. Must go via | ||
/// the interceptor state machine first. | ||
case intercept(headers: HPACKHeaders?) | ||
/// Drop the message. | ||
case drop | ||
} | ||
|
||
enum SendStatusAction: Equatable { | ||
/// Intercept the status, providing the given trailers. | ||
case intercept(trailers: HPACKHeaders) | ||
/// Drop the status. | ||
case drop | ||
} | ||
|
||
enum CancelAction: Hashable { | ||
/// Cancel and nil out the handler 'bits'. | ||
case cancelAndNilOutHandlerComponents | ||
/// Don't do anything. | ||
case none | ||
} | ||
} | ||
#endif // compiler(>=5.6) |
79 changes: 79 additions & 0 deletions
79
...ort/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Draining.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright 2022, gRPC Authors All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#if compiler(>=5.6) | ||
import NIOHPACK | ||
|
||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
extension ServerHandlerStateMachine { | ||
/// In the 'Draining' state the user handler has been invoked and the request stream has been | ||
/// closed (i.e. we have seen 'end' but it has not necessarily been consumed by the user handler). | ||
/// We can transition to a new state either by sending the end of the response stream or by | ||
/// cancelling. | ||
internal struct Draining { | ||
typealias NextStateAndOutput<Output> = | ||
ServerHandlerStateMachine.NextStateAndOutput< | ||
ServerHandlerStateMachine.Draining.NextState, | ||
Output | ||
> | ||
|
||
/// Whether the response headers have been written yet. | ||
private var headersWritten: Bool | ||
internal let context: GRPCAsyncServerCallContext | ||
|
||
init(from state: ServerHandlerStateMachine.Handling) { | ||
self.headersWritten = state.headersWritten | ||
self.context = state.context | ||
} | ||
|
||
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> { | ||
// We're already draining, i.e. the inbound stream is closed, cancel the RPC. | ||
return .init(nextState: .draining(self), output: .cancel) | ||
} | ||
|
||
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> { | ||
// We're already draining, i.e. the inbound stream is closed, cancel the RPC. | ||
return .init(nextState: .draining(self), output: .cancel) | ||
} | ||
|
||
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> { | ||
// We're already draining, i.e. the inbound stream is closed, cancel the RPC. | ||
return .init(nextState: .draining(self), output: .cancel) | ||
} | ||
|
||
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> { | ||
let headers: HPACKHeaders? | ||
|
||
if self.headersWritten { | ||
headers = nil | ||
} else { | ||
self.headersWritten = true | ||
headers = self.context.initialResponseMetadata | ||
} | ||
|
||
return .init(nextState: .draining(self), output: .intercept(headers: headers)) | ||
} | ||
|
||
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> { | ||
let trailers = self.context.trailingResponseMetadata | ||
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers)) | ||
} | ||
|
||
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> { | ||
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents) | ||
} | ||
} | ||
} | ||
#endif // compiler(>=5.6) |
55 changes: 55 additions & 0 deletions
55
...ort/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Finished.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright 2022, gRPC Authors All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#if compiler(>=5.6) | ||
|
||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
extension ServerHandlerStateMachine { | ||
internal struct Finished { | ||
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput< | ||
ServerHandlerStateMachine.Finished.NextState, | ||
Output | ||
> | ||
|
||
internal init(from state: ServerHandlerStateMachine.Idle) {} | ||
internal init(from state: ServerHandlerStateMachine.Handling) {} | ||
internal init(from state: ServerHandlerStateMachine.Draining) {} | ||
|
||
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> { | ||
return .init(nextState: .finished(self), output: .cancel) | ||
} | ||
|
||
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> { | ||
return .init(nextState: .finished(self), output: .cancel) | ||
} | ||
|
||
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> { | ||
return .init(nextState: .finished(self), output: .cancel) | ||
} | ||
|
||
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> { | ||
return .init(nextState: .finished(self), output: .drop) | ||
} | ||
|
||
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> { | ||
return .init(nextState: .finished(self), output: .drop) | ||
} | ||
|
||
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> { | ||
return .init(nextState: .finished(self), output: .cancelAndNilOutHandlerComponents) | ||
} | ||
} | ||
} | ||
#endif // compiler(>=5.6) |
88 changes: 88 additions & 0 deletions
88
...ort/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Handling.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright 2022, gRPC Authors All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#if compiler(>=5.6) | ||
import NIOHPACK | ||
|
||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
extension ServerHandlerStateMachine { | ||
/// In the 'Handling' state the user handler has been invoked and the request stream is open (but | ||
/// the request metadata has already been seen). We can transition to a new state either by | ||
/// receiving the end of the request stream or by closing the response stream. Cancelling also | ||
/// moves us to the finished state. | ||
internal struct Handling { | ||
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput< | ||
ServerHandlerStateMachine.Handling.NextState, | ||
Output | ||
> | ||
|
||
/// Whether response headers have been written (they are written lazily rather than on receipt | ||
/// of the request headers). | ||
internal private(set) var headersWritten: Bool | ||
|
||
/// A context held by user handler which may be used to alter the response headers or trailers. | ||
internal let context: GRPCAsyncServerCallContext | ||
|
||
/// Transition from the 'Idle' state. | ||
init(from state: ServerHandlerStateMachine.Idle, context: GRPCAsyncServerCallContext) { | ||
self.headersWritten = false | ||
self.context = context | ||
} | ||
|
||
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> { | ||
// We are in the 'Handling' state because we received metadata. If we receive it again we | ||
// should cancel the RPC. | ||
return .init(nextState: .handling(self), output: .cancel) | ||
} | ||
|
||
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> { | ||
// We can always forward a message since receiving the end of the request stream causes a | ||
// transition to the 'draining' state. | ||
return .init(nextState: .handling(self), output: .forward) | ||
} | ||
|
||
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> { | ||
// The request stream is finished: move to the draining state so the user handler can finish | ||
// executing. | ||
return .init(nextState: .draining(from: self), output: .forward) | ||
} | ||
|
||
mutating func sendMessage() -> Self.NextStateAndOutput<SendMessageAction> { | ||
let headers: HPACKHeaders? | ||
|
||
// We send headers once, lazily, when the first message is sent back. | ||
if self.headersWritten { | ||
headers = nil | ||
} else { | ||
self.headersWritten = true | ||
headers = self.context.initialResponseMetadata | ||
} | ||
|
||
return .init(nextState: .handling(self), output: .intercept(headers: headers)) | ||
} | ||
|
||
mutating func sendStatus() -> Self.NextStateAndOutput<SendStatusAction> { | ||
// Sending the status is the final action taken by the user handler. We can always send | ||
// them from this state and doing so means the user handler has completed. | ||
let trailers = self.context.trailingResponseMetadata | ||
return .init(nextState: .finished(from: self), output: .intercept(trailers: trailers)) | ||
} | ||
|
||
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> { | ||
return .init(nextState: .finished(from: self), output: .cancelAndNilOutHandlerComponents) | ||
} | ||
} | ||
} | ||
#endif // compiler(>=5.6) |
81 changes: 81 additions & 0 deletions
81
...Support/AsyncServerHandler/ServerHandlerStateMachine/ServerHandlerStateMachine+Idle.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Copyright 2022, gRPC Authors All rights reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#if compiler(>=5.6) | ||
|
||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) | ||
extension ServerHandlerStateMachine { | ||
/// In the 'Idle' state nothing has happened. To advance we must either receive metadata (i.e. | ||
/// the request headers) and invoke the handler, or we are cancelled. | ||
@usableFromInline | ||
internal struct Idle { | ||
typealias NextStateAndOutput<Output> = ServerHandlerStateMachine.NextStateAndOutput< | ||
ServerHandlerStateMachine.Idle.NextState, | ||
Output | ||
> | ||
|
||
/// A ref to the `UserInfo`. We hold on to this until we're ready to invoke the handler. | ||
let userInfoRef: Ref<UserInfo> | ||
/// A bag of bits required to construct a context passed to the user handler when it is invoked. | ||
let callHandlerContext: CallHandlerContext | ||
|
||
/// The state of the inbound stream, i.e. the request stream. | ||
internal private(set) var inboundState: ServerInterceptorStateMachine.InboundStreamState | ||
|
||
init(userInfoRef: Ref<UserInfo>, context: CallHandlerContext) { | ||
self.userInfoRef = userInfoRef | ||
self.callHandlerContext = context | ||
self.inboundState = .idle | ||
} | ||
|
||
mutating func handleMetadata() -> Self.NextStateAndOutput<HandleMetadataAction> { | ||
let action: HandleMetadataAction | ||
|
||
switch self.inboundState.receiveMetadata() { | ||
case .accept: | ||
// We tell the caller to invoke the handler immediately: they should then call | ||
// 'handlerInvoked' on the state machine which will cause a transition to the next state. | ||
action = .invokeHandler(self.userInfoRef, self.callHandlerContext) | ||
case .reject: | ||
action = .cancel | ||
} | ||
|
||
return .init(nextState: .idle(self), output: action) | ||
} | ||
|
||
mutating func handleMessage() -> Self.NextStateAndOutput<HandleMessageAction> { | ||
// We can't receive a message before the metadata, doing so is a protocol violation. | ||
return .init(nextState: .idle(self), output: .cancel) | ||
} | ||
|
||
mutating func handleEnd() -> Self.NextStateAndOutput<HandleEndAction> { | ||
// Receiving 'end' before we start is odd but okay, just cancel. | ||
return .init(nextState: .idle(self), output: .cancel) | ||
} | ||
|
||
mutating func handlerInvoked( | ||
context: GRPCAsyncServerCallContext | ||
) -> Self.NextStateAndOutput<Void> { | ||
// The handler was invoked as a result of receiving metadata. Move to the next state. | ||
return .init(nextState: .handling(from: self, context: context)) | ||
} | ||
|
||
mutating func cancel() -> Self.NextStateAndOutput<CancelAction> { | ||
// There's no handler to cancel. Move straight to finished. | ||
return .init(nextState: .finished(from: self), output: .none) | ||
} | ||
} | ||
} | ||
#endif // compiler(>=5.6) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these not
Hashable
for a reason?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only because
HPACKHeaders
is notHashable
.We don't really need
Equatable
orHashable
here, but it's useful for testing.