Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a crash when hitting a timeout while waiting for the next call. #630

Merged
merged 3 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions Sources/SwiftGRPC/Core/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ public class Server {
/// Completion queue used for server operations
let completionQueue: CompletionQueue

/// Delay for which the spin loop should wait before starting over.
let loopTimeout: TimeInterval

/// Optional callback when server stops serving
public var onCompletion: (() -> Void)?

/// Initializes a Server
///
/// - Parameter address: the address where the server will listen
public init(address: String) {
public init(address: String, loopTimeout: TimeInterval = 600) {
MrMage marked this conversation as resolved.
Show resolved Hide resolved
underlyingServer = cgrpc_server_create(address)
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address)
self.loopTimeout = loopTimeout
}

/// Initializes a secure Server
Expand All @@ -51,10 +55,11 @@ public class Server {
/// - Parameter key: the private key for the server's certificates
/// - Parameter certs: the server's certificates
/// - Parameter rootCerts: used to validate client certificates; will enable enforcing valid client certificates when provided
public init(address: String, key: String, certs: String, rootCerts: String? = nil) {
public init(address: String, key: String, certs: String, rootCerts: String? = nil, loopTimeout: TimeInterval = 600) {
MrMage marked this conversation as resolved.
Show resolved Hide resolved
underlyingServer = cgrpc_server_create_secure(address, key, certs, rootCerts, rootCerts == nil ? 0 : 1)
completionQueue = CompletionQueue(
underlyingCompletionQueue: cgrpc_server_get_completion_queue(underlyingServer), name: "Server " + address)
self.loopTimeout = loopTimeout
}

deinit {
Expand All @@ -70,13 +75,17 @@ public class Server {
// run the server on a new background thread
let spinloopThreadQueue = DispatchQueue(label: "SwiftGRPC.CompletionQueue.runToCompletion.spinloopThread")
spinloopThreadQueue.async {
spinloop: while true {
do {
let handler = Handler(underlyingServer: self.underlyingServer)
try handler.requestCall(tag: Server.handlerCallTag)

do {
// Allocate a handler _outside_ the spin loop, as we must use _this particular_ handler to serve the next call
// once we have called `handler.requestCall`. In particular, we need to keep the current handler for the next
// spin loop interation when we hit the `.queueTimeout` case. The handler should only be replaced once it is
// "used up" for serving an incoming call.
var handler = Handler(underlyingServer: self.underlyingServer)
// Tell gRPC to store the next call's information in this handler object.
try handler.requestCall(tag: Server.handlerCallTag)
spinloop: while true {
// block while waiting for an incoming request
let event = self.completionQueue.wait(timeout: 600)
let event = self.completionQueue.wait(timeout: self.loopTimeout)

if event.type == .complete {
if event.tag == Server.handlerCallTag {
Expand All @@ -95,23 +104,29 @@ public class Server {

// Dispatch the handler function on a separate thread.
let handlerDispatchThreadQueue = DispatchQueue(label: "SwiftGRPC.Server.run.dispatchHandlerThread")
// Needs to be copied, because we will change the value of `handler` right after this.
let handlerCopy = handler
handlerDispatchThreadQueue.async {
handlerFunction(handler)
handlerFunction(handlerCopy)
}
}

// This handler has now been "used up" for the current call; replace it with a fresh one for the next
// loop iteration.
handler = Handler(underlyingServer: self.underlyingServer)
try handler.requestCall(tag: Server.handlerCallTag)
} else if event.tag == Server.stopTag || event.tag == Server.destroyTag {
break spinloop
}
} else if event.type == .queueTimeout {
// everything is fine
// Everything is fine, just start over *while continuing to use the existing handler*.
continue
} else if event.type == .queueShutdown {
break spinloop
}
} catch {
print("server call error: \(error)")
break spinloop
}
} catch {
print("server call error: \(error)")
}
self.onCompletion?()
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/SwiftGRPC/Runtime/ServiceServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ open class ServiceServer {
fileprivate let servicesByName: [String: ServiceProvider]

/// Create a server that accepts insecure connections.
public init(address: String, serviceProviders: [ServiceProvider]) {
public init(address: String, serviceProviders: [ServiceProvider], loopTimeout: TimeInterval = 600) {
gRPC.initialize()
self.address = address
server = Server(address: address)
server = Server(address: address, loopTimeout: loopTimeout)
servicesByName = Dictionary(uniqueKeysWithValues: serviceProviders.map { ($0.serviceName, $0) })
}

/// Create a server that accepts secure connections.
public init(address: String, certificateString: String, keyString: String, rootCerts: String? = nil, serviceProviders: [ServiceProvider]) {
public init(address: String, certificateString: String, keyString: String, rootCerts: String? = nil, serviceProviders: [ServiceProvider], loopTimeout: TimeInterval = 600) {
gRPC.initialize()
self.address = address
server = Server(address: address, key: keyString, certs: certificateString, rootCerts: rootCerts)
server = Server(address: address, key: keyString, certs: certificateString, rootCerts: rootCerts, loopTimeout: loopTimeout)
servicesByName = Dictionary(uniqueKeysWithValues: serviceProviders.map { ($0.serviceName, $0) })
}

/// Create a server that accepts secure connections.
public init?(address: String, certificateURL: URL, keyURL: URL, rootCertsURL: URL? = nil, serviceProviders: [ServiceProvider]) {
public init?(address: String, certificateURL: URL, keyURL: URL, rootCertsURL: URL? = nil, serviceProviders: [ServiceProvider], loopTimeout: TimeInterval = 600) {
guard let certificate = try? String(contentsOf: certificateURL, encoding: .utf8),
let key = try? String(contentsOf: keyURL, encoding: .utf8)
else { return nil }
Expand All @@ -56,7 +56,7 @@ open class ServiceServer {
}
gRPC.initialize()
self.address = address
server = Server(address: address, key: key, certs: certificate, rootCerts: rootCerts)
server = Server(address: address, key: key, certs: certificate, rootCerts: rootCerts, loopTimeout: loopTimeout)
servicesByName = Dictionary(uniqueKeysWithValues: serviceProviders.map { ($0.serviceName, $0) })
}

Expand Down
39 changes: 39 additions & 0 deletions Tests/SwiftGRPCTests/ServerTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Dispatch
import Foundation
@testable import SwiftGRPC
import XCTest

class ServerTests: XCTestCase {
static var allTests: [(String, (ServerTests) -> () throws -> Void)] {
return [
("testDoesNotCrashWhenServerTimesOutWithoutReceivingARequest", testDoesNotCrashWhenServerTimesOutWithoutReceivingARequest)
]
}
}

extension ServerTests {
func testDoesNotCrashWhenServerTimesOutWithoutReceivingARequest() {
let server = ServiceServer(address: address, serviceProviders: [EchoProvider()], loopTimeout: 0.01)
server.start()
Thread.sleep(forTimeInterval: 0.02)
let client = Echo_EchoServiceClient(address: address, secure: false)
client.timeout = 0.1
XCTAssertEqual("Swift echo get: foo", try client.get(Echo_EchoRequest(text: "foo")).text)
server.server.stop()
}
}