Skip to content

Commit

Permalink
PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebastian Thiebaud committed May 22, 2018
1 parent 4e94ef4 commit 0891482
Showing 1 changed file with 30 additions and 41 deletions.
71 changes: 30 additions & 41 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public class Channel {
}

deinit {
connectivityObservers.forEach { $0.polling = false }
cgrpc_channel_destroy(underlyingChannel)
completionQueue.shutdown()
}
Expand All @@ -94,9 +93,8 @@ public class Channel {
return ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(underlyingChannel, tryToConnect ? 1 : 0))
}

public func subscribe(callback: @escaping (ConnectivityState) -> ()) {
let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, callback: callback)
observer.polling = true
public func subscribe(callback: @escaping (ConnectivityState) -> Void) {
let observer = ConnectivityObserver(underlyingChannel: underlyingChannel, currentState: connectivityState(), callback: callback)
connectivityObservers.append(observer)
}
}
Expand All @@ -106,66 +104,57 @@ private extension Channel {
private let completionQueue: CompletionQueue
private let underlyingChannel: UnsafeMutableRawPointer
private let underlyingCompletionQueue: UnsafeMutableRawPointer
private let callback: (ConnectivityState) -> ()
private let callback: (ConnectivityState) -> Void
private var lastState: ConnectivityState
private let queue: OperationQueue

var polling: Bool = false {
didSet {
queue.addOperation { [weak self] in
guard let `self` = self else { return }

if self.polling == true && oldValue == false {
self.run()
} else if self.polling == false && oldValue == true {
self.shutdown()
}
}
}
}

init(underlyingChannel: UnsafeMutableRawPointer, callback: @escaping (ConnectivityState) -> ()) {
init(underlyingChannel: UnsafeMutableRawPointer, currentState: ConnectivityState, callback: @escaping (ConnectivityState) -> ()) {
self.underlyingChannel = underlyingChannel
self.underlyingCompletionQueue = cgrpc_completion_queue_create_for_next()
self.completionQueue = CompletionQueue(underlyingCompletionQueue: self.underlyingCompletionQueue, name: "Connectivity State")
self.callback = callback
self.lastState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))

queue = OperationQueue()
queue.maxConcurrentOperationCount = 1
queue.qualityOfService = .background
self.lastState = currentState
run()
}

deinit {
shutdown()
completionQueue.shutdown()
}

private func run() {
DispatchQueue.global().async { [weak self] in
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.ConnectivityObserver.run.spinloopThread")

spinloopThreadQueue.async { [weak self] in
guard let `self` = self else { return }

while self.polling {
guard let underlyingState = self.lastState.underlyingState else { return }
spinloop: while true {
guard let underlyingState = self.lastState.underlyingState else {
print("Couldn't retrieve `underlyingState`")
return
}

let deadline: TimeInterval = 0.2
cgrpc_channel_watch_connectivity_state(self.underlyingChannel, self.underlyingCompletionQueue, underlyingState, deadline, nil)
let event = self.completionQueue.wait(timeout: deadline)

if event.success == 1 {
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 1))

guard newState != self.lastState else { continue }
defer { self.lastState = newState }

self.callback(newState)
switch event.type {
case .complete:
let newState = ConnectivityState.connectivityState(cgrpc_channel_check_connectivity_state(self.underlyingChannel, 0))

if newState != self.lastState {
self.callback(newState)
}

self.lastState = newState
case .queueShutdown:
break spinloop
case .queueTimeout:
continue spinloop
default:
break spinloop
}
}
}
}

private func shutdown() {
completionQueue.shutdown()
}
}
}

Expand Down

0 comments on commit 0891482

Please sign in to comment.