Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to manually shut down channels #384

Merged
merged 5 commits into from
Feb 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Examples/SimpleXcode/Simple/Document.swift
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,17 @@ class Document: NSDocument {
if !self.isRunning() {
break
}
let method = (i < steps) ? "/hello" : "/quit"
let call = self.channel.makeCall(method)

let metadata = try! Metadata([
"x": "xylophone",
"y": "yu",
"z": "zither"
])

do {
let method = (i < steps) ? "/hello" : "/quit"
let call = try self.channel.makeCall(method)

let metadata = try Metadata([
"x": "xylophone",
"y": "yu",
"z": "zither"
])

try call.start(.unary,
metadata: metadata,
message: messageData) { callResult in
Expand Down
2 changes: 1 addition & 1 deletion Sources/Examples/Simple/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func client() throws {

let method = (i < steps - 1) ? "/hello" : "/quit"
print("calling " + method)
let call = c.makeCall(method)
let call = try! c.makeCall(method)
rebello95 marked this conversation as resolved.
Show resolved Hide resolved

let metadata = try Metadata([
"x": "xylophone",
Expand Down
46 changes: 38 additions & 8 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,31 @@ import Foundation
/// A gRPC Channel
public class Channel {
private let mutex = Mutex()
/// Weak references to API calls using this channel that are in-flight
private let activeCalls = NSHashTable<Call>.weakObjects()
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
/// Pointer to underlying C representation
private let underlyingChannel: UnsafeMutableRawPointer
/// Completion queue for channel call operations
private let completionQueue: CompletionQueue
/// Observer for connectivity state changes. Created lazily if needed
private var connectivityObserver: ConnectivityObserver?
/// Whether the gRPC channel has been shut down
private var hasBeenShutdown = false

/// Timeout for new calls
public var timeout: TimeInterval = 600.0

/// Default host to use for new calls
public var host: String

/// Errors that may be thrown by the channel
enum Error: Swift.Error {
/// Action cannot be performed because the channel has already been shut down
case alreadyShutdown
/// Failed to create a new call within the gRPC stack
case callCreationFailed
}

/// Initializes a gRPC channel
///
/// - Parameter address: the address of the server to be called
Expand Down Expand Up @@ -94,12 +106,21 @@ public class Channel {
completionQueue.run() // start a loop that watches the channel's completion queue
}

deinit {
/// Shut down the channel. No new calls may be made using this channel after it is shut down. Any in-flight calls using this channel will be canceled
public func shutdown() {
self.mutex.synchronize {
guard !self.hasBeenShutdown else { return }

self.hasBeenShutdown = true
self.connectivityObserver?.shutdown()
cgrpc_channel_destroy(self.underlyingChannel)
self.completionQueue.shutdown()
self.activeCalls.allObjects.forEach { $0.cancel() }
}
cgrpc_channel_destroy(self.underlyingChannel)
self.completionQueue.shutdown()
}

deinit {
self.shutdown()
}

/// Constructs a Call object to make a gRPC API call
Expand All @@ -108,11 +129,20 @@ public class Channel {
/// - Parameter host: the gRPC host name for the call. If unspecified, defaults to the Client host
/// - Parameter timeout: a timeout value in seconds
/// - Returns: a Call object that can be used to perform the request
public func makeCall(_ method: String, host: String = "", timeout: TimeInterval? = nil) -> Call {
let host = host.isEmpty ? self.host : host
let timeout = timeout ?? self.timeout
let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
public func makeCall(_ method: String, host: String? = nil, timeout: TimeInterval? = nil) throws -> Call {
guard (self.mutex.synchronize { !self.hasBeenShutdown }) else {
throw Error.alreadyShutdown
}

guard let underlyingCall = cgrpc_channel_create_call(
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
self.underlyingChannel, method, host ?? self.host, timeout ?? self.timeout) else
{
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
throw Error.callCreationFailed
}

let call = Call(underlyingCall: underlyingCall, owned: true, completionQueue: self.completionQueue)
self.mutex.synchronize { self.activeCalls.add(call) }
return call
}

/// Check the current connectivity state
Expand Down
19 changes: 10 additions & 9 deletions Sources/SwiftGRPC/Runtime/ClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Dispatch
import Foundation
import SwiftProtobuf

public protocol ClientCall: class {
static var method: String { get }

/// Cancel the call.
func cancel()
}

open class ClientCallBase: ClientCall {
open class ClientCallBase {
open class var method: String { fatalError("needs to be overridden") }

public let call: Call

/// Create a call.
public init(_ channel: Channel) {
call = channel.makeCall(type(of: self).method)
public init(_ channel: Channel) throws {
self.call = try channel.makeCall(type(of: self).method)
}
}

extension ClientCallBase: ClientCall {
public func cancel() {
self.call.cancel()
}

public func cancel() { call.cancel() }
}
1 change: 1 addition & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ XCTMain([
testCase(gRPCTests.allTests),
testCase(ChannelArgumentTests.allTests),
testCase(ChannelConnectivityTests.allTests),
testCase(ChannelShutdownTests.allTests),
testCase(ClientCancellingTests.allTests),
testCase(ClientTestExample.allTests),
testCase(ClientTimeoutTests.allTests),
Expand Down
97 changes: 97 additions & 0 deletions Tests/SwiftGRPCTests/ChannelShutdownTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@testable import SwiftGRPC
import XCTest

final class ChannelShutdownTests: BasicEchoTestCase {
static var allTests: [(String, (ChannelShutdownTests) -> () throws -> Void)] {
return [
("testThrowsWhenCreatingCallWithAlreadyShutDownChannel", testThrowsWhenCreatingCallWithAlreadyShutDownChannel),
("testCallReceiveThrowsWhenChannelIsShutDown", testCallReceiveThrowsWhenChannelIsShutDown),
("testCallCloseThrowsWhenChannelIsShutDown", testCallCloseThrowsWhenChannelIsShutDown),
("testCallCloseAndReceiveThrowsWhenChannelIsShutDown", testCallCloseAndReceiveThrowsWhenChannelIsShutDown),
("testCallSendThrowsWhenChannelIsShutDown", testCallSendThrowsWhenChannelIsShutDown),
("testCancelsActiveCallWhenShutdownIsCalled", testCancelsActiveCallWhenShutdownIsCalled),
]
}
}

extension ChannelShutdownTests {
func testThrowsWhenCreatingCallWithAlreadyShutDownChannel() {
self.client.channel.shutdown()

XCTAssertThrowsError(try self.client.channel.makeCall("foobar")) { error in
XCTAssertEqual(.alreadyShutdown, error as? Channel.Error)
}
}

func testCallReceiveThrowsWhenChannelIsShutDown() {
let call = try! self.client.channel.makeCall("foo")
self.client.channel.shutdown()

XCTAssertThrowsError(try call.receiveMessage(completion: { _ in })) { error in
rebello95 marked this conversation as resolved.
Show resolved Hide resolved
XCTAssertEqual(.completionQueueShutdown, error as? CallError)
XCTAssertNotNil(self.client.channel)
MrMage marked this conversation as resolved.
Show resolved Hide resolved
}
}

func testCallCloseThrowsWhenChannelIsShutDown() {
let call = try! self.client.channel.makeCall("foo")
self.client.channel.shutdown()

XCTAssertThrowsError(try call.close()) { error in
XCTAssertEqual(.completionQueueShutdown, error as? CallError)
XCTAssertNotNil(self.client.channel)
}
}

func testCallCloseAndReceiveThrowsWhenChannelIsShutDown() {
let call = try! self.client.channel.makeCall("foo")
self.client.channel.shutdown()

XCTAssertThrowsError(try call.closeAndReceiveMessage(completion: { _ in })) { error in
XCTAssertEqual(.completionQueueShutdown, error as? CallError)
XCTAssertNotNil(self.client.channel)
}
}

func testCallSendThrowsWhenChannelIsShutDown() {
let call = try! self.client.channel.makeCall("foo")
self.client.channel.shutdown()

XCTAssertThrowsError(try call.sendMessage(data: Data())) { error in
XCTAssertEqual(.completionQueueShutdown, error as? CallError)
XCTAssertNotNil(self.client.channel)
}
}

func testCancelsActiveCallWhenShutdownIsCalled() {
let errorExpectation = self.expectation(description: "error is returned to call when channel is shut down")
let call = try! self.client.channel.makeCall("foo")

try! call.receiveMessage { result in
XCTAssertFalse(result.success)
errorExpectation.fulfill()
}

self.client.channel.shutdown()
self.waitForExpectations(timeout: 0.1)
MrMage marked this conversation as resolved.
Show resolved Hide resolved

XCTAssertThrowsError(try call.close()) { error in
XCTAssertEqual(.completionQueueShutdown, error as? CallError)
}
}
}
6 changes: 3 additions & 3 deletions Tests/SwiftGRPCTests/GRPCTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func callUnary(channel: Channel) throws {
func callUnaryIndividual(channel: Channel, message: Data, shouldSucceed: Bool) throws {
let sem = DispatchSemaphore(value: 0)
let method = hello
let call = channel.makeCall(method)
let call = try channel.makeCall(method)
let metadata = try Metadata(initialClientMetadata)
try call.start(.unary, metadata: metadata, message: message) {
response in
Expand Down Expand Up @@ -228,7 +228,7 @@ func callServerStream(channel: Channel) throws {

let sem = DispatchSemaphore(value: 0)
let method = helloServerStream
let call = channel.makeCall(method)
let call = try channel.makeCall(method)
try call.start(.serverStreaming, metadata: metadata, message: message) {
response in

Expand Down Expand Up @@ -270,7 +270,7 @@ func callBiDiStream(channel: Channel) throws {

let sem = DispatchSemaphore(value: 0)
let method = helloBiDiStream
let call = channel.makeCall(method)
let call = try channel.makeCall(method)
try call.start(.bidiStreaming, metadata: metadata, message: nil) {
response in

Expand Down