Skip to content

Commit

Permalink
Implement connectivity state property and observers (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
cvanderschuere committed Mar 26, 2018
1 parent 5ebf7a3 commit 0bd14d8
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 36 deletions.
6 changes: 6 additions & 0 deletions Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);

grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel,
cgrpc_completion_queue * completion_queue,
grpc_connectivity_state last_observed_state,
double deadline,
void *tag);

// server support
cgrpc_server *cgrpc_server_create(const char *address);
Expand All @@ -159,6 +164,7 @@ void cgrpc_server_start(cgrpc_server *s);
cgrpc_completion_queue *cgrpc_server_get_completion_queue(cgrpc_server *s);

// completion queues
cgrpc_completion_queue *cgrpc_completion_queue_create_for_next();
grpc_event cgrpc_completion_queue_get_next_event(cgrpc_completion_queue *cq,
double timeout);
void cgrpc_completion_queue_drain(cgrpc_completion_queue *cq);
Expand Down
5 changes: 5 additions & 0 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,8 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel) {
grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *channel, int try_to_connect) {
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}

void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline);
return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag);
}
4 changes: 4 additions & 0 deletions Sources/CgRPC/shim/completion_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include <stdio.h>

grpc_completion_queue *cgrpc_completion_queue_create_for_next() {
return grpc_completion_queue_create_for_next(NULL);
}

grpc_event cgrpc_completion_queue_get_next_event(grpc_completion_queue *cq, double timeout) {
gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
if (timeout < 0) {
Expand Down
148 changes: 144 additions & 4 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ public class Channel {

/// Default host to use for new calls
public var host: String

public var connectivityState: ConnectivityState? {
return ConnectivityState.fromCEnum(cgrpc_channel_check_connectivity_state(underlyingChannel, 0))
}

/// Connectivity state observers
private var observers: [ConnectivityObserver] = []

/// Initializes a gRPC channel
///
Expand Down Expand Up @@ -81,4 +80,145 @@ public class Channel {
let underlyingCall = cgrpc_channel_create_call(underlyingChannel, method, host, timeout)!
return Call(underlyingCall: underlyingCall, owned: true, completionQueue: completionQueue)
}

public func connectivityState(tryToConnect: Bool = false) -> ConnectivityState {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) {
var observer = observers.first(where: { $0.state == sourceState })

if observer == nil {
let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect)
observers.append(newObserver)
observer = newObserver
}

observer?.callbacks.append(callback)
observer?.polling = true
}
}

private extension Channel {
class ConnectivityObserver: Equatable {
let state: ConnectivityState
let queue: CompletionQueue
let underlyingChannel: UnsafeMutableRawPointer
let underlyingCompletionQueue: UnsafeMutableRawPointer
private(set) var tryToConnect: Bool
var callbacks: [(ConnectivityState) -> ()] = []
private var lastState: ConnectivityState

var polling: Bool = false {
didSet {
if polling == true && oldValue == false {
run()
}
}
}

init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) {
self.state = state
self.underlyingChannel = underlyingChannel
self.tryToConnect = tryToConnect
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))
}

deinit {
queue.shutdown()
}

private func run() {
DispatchQueue.global().async { [weak self] in
guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return }

while self.polling {
guard !self.callbacks.isEmpty && !self.tryToConnect else {
self.polling = false
break
}

defer { self.tryToConnect = false }

let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
let event = self.queue.wait(timeout: deadline)

if event.success == 1 || self.tryToConnect {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0))

guard newState != self.lastState else { continue }
defer { self.lastState = newState }

if self.lastState == self.state {
self.callbacks.forEach({ $0(newState) })
}
}
}
}
}

static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool {
return lhs.state == rhs.state
}
}
}

extension Channel {
public enum ConnectivityState {
/// Channel has just been initialized
case initialized
/// Channel is idle
case idle
/// Channel is connecting
case connecting
/// Channel is ready for work
case ready
/// Channel has seen a failure but expects to recover
case transientFailure
/// Channel has seen a failure that it cannot recover from
case shutdown
/// Channel connectivity state is unknown
case unknown

fileprivate static func connectivityState(_ value: grpc_connectivity_state) -> ConnectivityState {
switch value {
case GRPC_CHANNEL_INIT:
return .initialized
case GRPC_CHANNEL_IDLE:
return .idle
case GRPC_CHANNEL_CONNECTING:
return .connecting
case GRPC_CHANNEL_READY:
return .ready
case GRPC_CHANNEL_TRANSIENT_FAILURE:
return .transientFailure
case GRPC_CHANNEL_SHUTDOWN:
return .shutdown
default:
return .unknown
}
}

fileprivate var underlyingState: grpc_connectivity_state? {
switch self {
case .initialized:
return GRPC_CHANNEL_INIT
case .idle:
return GRPC_CHANNEL_IDLE
case .connecting:
return GRPC_CHANNEL_CONNECTING
case .ready:
return GRPC_CHANNEL_READY
case .transientFailure:
return GRPC_CHANNEL_TRANSIENT_FAILURE
case .shutdown:
return GRPC_CHANNEL_SHUTDOWN
default:
return nil
}
}
}
}
32 changes: 0 additions & 32 deletions Sources/SwiftGRPC/Core/ConnectivityState.swift

This file was deleted.

0 comments on commit 0bd14d8

Please sign in to comment.