Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Thiebaud committed May 21, 2018
1 parent 76faf09 commit 255ce88
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Sources/CgRPC/shim/cgrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ cgrpc_completion_queue *cgrpc_channel_completion_queue(cgrpc_channel *channel);
grpc_connectivity_state cgrpc_channel_check_connectivity_state(
cgrpc_channel *channel, int try_to_connect);
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel,
cgrpc_completion_queue * completion_queue,
cgrpc_completion_queue *completion_queue,
grpc_connectivity_state last_observed_state,
double deadline,
void *tag);
Expand Down
2 changes: 1 addition & 1 deletion Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ grpc_connectivity_state cgrpc_channel_check_connectivity_state(cgrpc_channel *ch
return grpc_channel_check_connectivity_state(channel->channel, try_to_connect);
}

void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue * completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
void cgrpc_channel_watch_connectivity_state(cgrpc_channel *channel, cgrpc_completion_queue *completion_queue, grpc_connectivity_state last_observed_state, double deadline, void *tag) {
gpr_timespec deadline_seconds = cgrpc_deadline_in_seconds_from_now(deadline);
return grpc_channel_watch_connectivity_state(channel->channel, last_observed_state, deadline_seconds, completion_queue, tag);
}
91 changes: 42 additions & 49 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Channel {
public var host: String

/// Connectivity state observers
private var observers: [ConnectivityObserver] = []
private var connectivityObservers: [ConnectivityObserver] = []

/// Initializes a gRPC channel
///
Expand All @@ -47,8 +47,7 @@ public class Channel {
} else {
underlyingChannel = cgrpc_channel_create(address)
}
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

Expand All @@ -59,13 +58,13 @@ public class Channel {
/// - Parameter host: an optional hostname override
public init(address: String, certificates: String, host: String?) {
self.host = address
underlyingChannel = cgrpc_channel_create_secure(address, certificates, host)
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
underlyingChannel = cgrpc_channel_create_secure(address, certificates, &argumentValues, Int32(arguments.count))
completionQueue = CompletionQueue(underlyingCompletionQueue: cgrpc_channel_completion_queue(underlyingChannel), name: "Client")
completionQueue.run() // start a loop that watches the channel's completion queue
}

deinit {
connectivityObservers.forEach { $0.polling = false }
cgrpc_channel_destroy(underlyingChannel)
completionQueue.shutdown()
}
Expand All @@ -86,83 +85,77 @@ public class Channel {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

public func subscribe(sourceState: ConnectivityState, tryToConnect: Bool = false, callback: @escaping (ConnectivityState) -> ()) {
var observer = observers.first(where: { $0.state == sourceState })

if observer == nil {
let newObserver = ConnectivityObserver(state: sourceState, underlyingChannel: underlyingChannel, tryToConnect: tryToConnect)
observers.append(newObserver)
observer = newObserver
}

observer?.callbacks.append(callback)
observer?.polling = true
public func subscribe(callback: @escaping (ConnectivityState) -> ()) {
let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback)
observer.polling = true
connectivityObservers.append(observer)
}
}

private extension Channel {
class ConnectivityObserver: Equatable {
let state: ConnectivityState
let queue: CompletionQueue
let underlyingChannel: UnsafeMutableRawPointer
let underlyingCompletionQueue: UnsafeMutableRawPointer
private(set) var tryToConnect: Bool
var callbacks: [(ConnectivityState) -> ()] = []
class ConnectivityObserver {
private let completionQueue: CompletionQueue
private let underlyingChannel: UnsafeMutableRawPointer
private let underlyingCompletionQueue: UnsafeMutableRawPointer
private let callback: (ConnectivityState) -> ()
private var lastState: ConnectivityState

private let queue: OperationQueue

var polling: Bool = false {
didSet {
if polling == true && oldValue == false {
run()
queue.addOperation { [weak self] in
guard let `self` = self else { return }

if self.polling == true && oldValue == false {
self.run()
} else if self.polling == false && oldValue == true {
self.shutdown()
}
}
}
}

init(state: ConnectivityState, underlyingChannel: UnsafeMutableRawPointer, tryToConnect: Bool) {
self.state = state
init(underlyingChannel: UnsafeMutableRawPointer, callback: @escaping (ConnectivityState) -> ()) {
self.underlyingChannel = underlyingChannel
self.tryToConnect = tryToConnect
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.queue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.callback = callback
self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))

queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
queue.qualityOfService = .background
}

deinit {
queue.shutdown()
shutdown()
}

private func run() {
DispatchQueue.global().async { [weak self] in
guard let `self` = self, let underlyingState = self.lastState.underlyingState else { return }
guard let `self` = self else { return }

while self.polling {
guard !self.callbacks.isEmpty && !self.tryToConnect else {
self.polling = false
break
}

defer { self.tryToConnect = false }

guard let underlyingState = self.lastState.underlyingState else { return }

let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
let event = self.queue.wait(timeout: deadline)
let event = self.completionQueue.wait(timeout: deadline)

if event.success == 1 || self.tryToConnect {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, self.tryToConnect ? 1 : 0))
if event.success == 1 {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 1))

guard newState != self.lastState else { continue }
defer { self.lastState = newState }

if self.lastState == self.state {
self.callbacks.forEach({ $0(newState) })
}
self.callback(newState)
}
}
}
}

static func == (lhs: ConnectivityObserver, rhs: ConnectivityObserver) -> Bool {
return lhs.state == rhs.state
private func shutdown() {
completionQueue.shutdown()
}
}
}
Expand Down

0 comments on commit 255ce88

Please sign in to comment.