Skip to content

Commit

Permalink
Merge pull request #196 from postmates/connectivity_state
Browse files Browse the repository at this point in the history
Implement connectivity state property and observers
  • Loading branch information
timburks authored May 26, 2018
2 parents 8941a85 + 0d25ef8 commit b1bba67
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 41 deletions.
6 changes: 6 additions & 0 deletions Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);

grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel,
cgrpc_completion_queue *completion_queue,
grpc_connectivity_state last_observed_state,
double deadline,
void *tag);

// server support
cgrpc_server *cgrpc_server_create(const char *address);
Expand All @@ -190,6 +195,7 @@ void cgrpc_server_start(cgrpc_server *s);
cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s);

// completion queues
cgrpc_completion_queue *cgrpc_completion_queue_create_for_next();
grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq,
double timeout);
void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
Expand Down
5 changes: 5 additions & 0 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) {
grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) {
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}

void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline);
return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag);
}
4 changes: 4 additions & 0 deletions Sources/CgRPC/shim/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include <stdio.h>

grpc_completion_queue *cgrpc_completion_queue_create_for_next() {
return grpc_completion_queue_create_for_next(NULL);
}

grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) {
gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
if (timeout < 0) {
Expand Down
147 changes: 138 additions & 9 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#if SWIFT_PACKAGE
import CgRPC
import Dispatch
#endif
import Foundation

Expand All @@ -31,10 +32,9 @@ public class Channel {

/// Default host to use for new calls
public var host: String

public var connectivityState: ConnectivityState? {
return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0))
}

/// Connectivity state observers
private var connectivityObservers: [ConnectivityObserver] = []

/// Initializes a gRPC channel
///
Expand All @@ -52,8 +52,7 @@ public class Channel {
} else {
underlyingChannel = cgrpc_channel_create(address, &argumentValues, Int32(arguments.count))
}
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

Expand All @@ -64,17 +63,17 @@ public class Channel {
/// - Parameter arguments: list of channel configuration options
public init(address: String, certificates: String, arguments: [Argument] = []) {
gRPC.initialize()
self.host = address
host = address
let argumentWrappers = arguments.map { $0.toCArg() }
var argumentValues = argumentWrappers.map { $0.wrapped }

underlyingChannel = cgrpc_channel_create_secure(address, certificates, &argumentValues, Int32(arguments.count))
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

deinit {
connectivityObservers.forEach { $0.shutdown() }
cgrpc_channel_destroy(underlyingChannel)
completionQueue.shutdown()
}
Expand All @@ -90,4 +89,134 @@ public class Channel {
let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
}

/// Check the current connectivity state
///
/// - Parameter tryToConnect: boolean value to indicate if should try to connect if channel's connectivity state is idle
/// - Returns: a ConnectivityState value representing the current connectivity state of the channel
public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

/// Subscribe to connectivity state changes
///
/// - Parameter callback: block executed every time a new connectivity state is detected
public func subscribe(callback: @escaping (ConnectivityState) -> Void) {
connectivityObservers.append(ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback))
}
}

private extension Channel {
class ConnectivityObserver {
private let completionQueue: CompletionQueue
private let underlyingChannel: UnsafeMutableRawPointer
private let underlyingCompletionQueue: UnsafeMutableRawPointer
private let callback: (ConnectivityState) -> Void
private var lastState: ConnectivityState

init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) {
self.underlyingChannel = underlyingChannel
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.callback = callback
self.lastState = currentState
run()
}

deinit {
shutdown()
}

private func run() {
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread")

spinloopThreadQueue.async {
while true {
guard let underlyingState = self.lastState.underlyingState else { return }

let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
let event = self.completionQueue.wait(timeout: deadline)

switch event.type {
case .complete:
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))

if newState != self.lastState {
self.callback(newState)
}

self.lastState = newState
case .queueTimeout:
continue
case .queueShutdown:
return
default:
continue
}
}
}
}

func shutdown() {
completionQueue.shutdown()
}
}
}

extension Channel {
public enum ConnectivityState {
/// Channel has just been initialized
case initialized
/// Channel is idle
case idle
/// Channel is connecting
case connecting
/// Channel is ready for work
case ready
/// Channel has seen a failure but expects to recover
case transientFailure
/// Channel has seen a failure that it cannot recover from
case shutdown
/// Channel connectivity state is unknown
case unknown

fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
switch value {
case GRPC_CHANNEL_INIT:
return .initialized
case GRPC_CHANNEL_IDLE:
return .idle
case GRPC_CHANNEL_CONNECTING:
return .connecting
case GRPC_CHANNEL_READY:
return .ready
case GRPC_CHANNEL_TRANSIENT_FAILURE:
return .transientFailure
case GRPC_CHANNEL_SHUTDOWN:
return .shutdown
default:
return .unknown
}
}

fileprivate var underlyingState: grpc_connectivity_state? {
switch self {
case .initialized:
return GRPC_CHANNEL_INIT
case .idle:
return GRPC_CHANNEL_IDLE
case .connecting:
return GRPC_CHANNEL_CONNECTING
case .ready:
return GRPC_CHANNEL_READY
case .transientFailure:
return GRPC_CHANNEL_TRANSIENT_FAILURE
case .shutdown:
return GRPC_CHANNEL_SHUTDOWN
default:
return nil
}
}
}
}
32 changes: 0 additions & 32 deletions Sources/SwiftGRPC/Core/ConnectivityState.swift

This file was deleted.

0 comments on commit b1bba67

Please sign in to comment.