Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A bit of cleanup #191

Merged
merged 6 commits into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ all:
project:
swift package generate-xcodeproj
# Optional: set the generated project's indentation settings.
-ruby fix-indentation-settings.rb
-ruby fix-indentation-settings.rb || echo "Consider running 'sudo gem install xcodeproj' to automatically set correct indentation settings for the generated project."

test: all
swift test -v $(CFLAGS)
Expand Down
2 changes: 0 additions & 2 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ cgrpc_channel *cgrpc_channel_create_secure(const char *address,
void cgrpc_channel_destroy(cgrpc_channel *c) {
grpc_channel_destroy(c->channel);
c->channel = NULL;

grpc_completion_queue_shutdown(c->completion_queue);
free(c);
}

Expand Down
1 change: 0 additions & 1 deletion Sources/CgRPC/shim/handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ cgrpc_handler *cgrpc_handler_create_with_server(cgrpc_server *server) {
}

void cgrpc_handler_destroy(cgrpc_handler *h) {
grpc_completion_queue_shutdown(h->completion_queue);
grpc_metadata_array_destroy(&(h->request_metadata_recv));
grpc_call_details_destroy(&(h->call_details));
if (h->server_call) {
Expand Down
2 changes: 0 additions & 2 deletions Sources/CgRPC/shim/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ void cgrpc_server_destroy(cgrpc_server *server) {
}
grpc_server_destroy(server->server);
server->server = NULL;

grpc_completion_queue_shutdown(server->completion_queue);
}

void cgrpc_server_start(cgrpc_server *server) {
Expand Down
15 changes: 9 additions & 6 deletions Sources/SwiftGRPC/Core/Call.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,15 @@ public class Call {
/// - Returns: the result of initiating the call
/// - Throws: `CallError` if fails to call.
func perform(_ operations: OperationGroup) throws {
completionQueue.register(operations)
Call.callMutex.lock()
let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
Call.callMutex.unlock()
if error != GRPC_CALL_OK {
throw CallError.callError(grpcCallError: error)
try completionQueue.register(operations) {
Call.callMutex.lock()
// We need to do the perform *inside* the `completionQueue.register` call, to ensure that the queue can't get
// shutdown in between registering the operation group and calling `cgrpc_call_perform`.
let error = cgrpc_call_perform(underlyingCall, operations.underlyingOperations, operations.tag)
Call.callMutex.unlock()
if error != GRPC_CALL_OK {
throw CallError.callError(grpcCallError: error)
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions Sources/SwiftGRPC/Core/CallError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public enum CallError: Error {
case batchTooBig
case payloadTypeMismatch

case completionQueueShutdown

static func callError(grpcCallError error: grpc_call_error) -> CallError {
switch error {
case GRPC_CALL_OK:
Expand Down
1 change: 1 addition & 0 deletions Sources/SwiftGRPC/Core/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class Channel {

deinit {
cgrpc_channel_destroy(underlyingChannel)
completionQueue.shutdown()
}

/// Constructs a Call object to make a gRPC API call
Expand Down
36 changes: 21 additions & 15 deletions Sources/SwiftGRPC/Core/CompletionQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class CompletionQueue {
}

deinit {
operationGroupsMutex.synchronize {
hasBeenShutdown = true
}
cgrpc_completion_queue_shutdown(underlyingCompletionQueue)
cgrpc_completion_queue_drain(underlyingCompletionQueue)
grpc_completion_queue_destroy(underlyingCompletionQueue)
}
Expand All @@ -92,21 +96,15 @@ class CompletionQueue {
return CompletionQueueEvent(event)
}

/// Register an operation group for handling upon completion
/// Register an operation group for handling upon completion. Will throw if the queue has been shutdown already.
///
/// - Parameter operationGroup: the operation group to handle
func register(_ operationGroup: OperationGroup) {
operationGroupsMutex.synchronize {
if !hasBeenShutdown {
operationGroups[operationGroup.tag] = operationGroup
} else {
// The queue has been shut down already, so there's no spinloop to call the operation group's completion handler
// on. To guarantee that the completion handler gets called, we'll enqueue it right now.
DispatchQueue.global().async {
operationGroup.success = false
operationGroup.completion?(operationGroup)
}
}
func register(_ operationGroup: OperationGroup, onSuccess: () throws -> Void) throws {
try operationGroupsMutex.synchronize {
guard !hasBeenShutdown
else { throw CallError.completionQueueShutdown }
operationGroups[operationGroup.tag] = operationGroup
try onSuccess()
}
}

Expand Down Expand Up @@ -138,7 +136,6 @@ class CompletionQueue {
self.operationGroupsMutex.lock()
let currentOperationGroups = self.operationGroups
self.operationGroups = [:]
self.hasBeenShutdown = true
self.operationGroupsMutex.unlock()

for operationGroup in currentOperationGroups.values {
Expand All @@ -165,6 +162,15 @@ class CompletionQueue {

/// Shuts down a completion queue
func shutdown() {
cgrpc_completion_queue_shutdown(underlyingCompletionQueue)
var needsShutdown = false
operationGroupsMutex.synchronize {
if !hasBeenShutdown {
needsShutdown = true
hasBeenShutdown = true
}
}
if needsShutdown {
cgrpc_completion_queue_shutdown(underlyingCompletionQueue)
}
}
}
3 changes: 3 additions & 0 deletions Sources/SwiftGRPC/Core/Handler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class Handler {
}

deinit {
// Technically unnecessary, because the handler only gets released once the completion queue has already been
// shut down, but it doesn't hurt to keep this here.
completionQueue.shutdown()
cgrpc_handler_destroy(self.underlyingHandler)
}

Expand Down
7 changes: 3 additions & 4 deletions Sources/SwiftGRPC/Core/Mutex.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ public class Mutex {
/// Runs a block within a locked mutex
///
/// Parameter block: the code to run while the mutex is locked
public func synchronize(block: () throws -> Void) rethrows {
lock()
try block()
unlock()
public func synchronize<T>(block: () throws -> T) rethrows -> T {
lock(); defer { unlock() }
return try block()
}
}
1 change: 1 addition & 0 deletions Sources/SwiftGRPC/Core/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class Server {

deinit {
cgrpc_server_destroy(underlyingServer)
completionQueue.shutdown()
}

/// Run the server
Expand Down
1 change: 1 addition & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ XCTMain([
testCase(ClientCancellingTests.allTests),
testCase(ClientTestExample.allTests),
testCase(ClientTimeoutTests.allTests),
testCase(CompletionQueueTests.allTests),
testCase(ConnectionFailureTests.allTests),
testCase(EchoTests.allTests),
testCase(ServerCancellingTests.allTests),
Expand Down
65 changes: 65 additions & 0 deletions Tests/SwiftGRPCTests/CompletionQueueTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Dispatch
import Foundation
@testable import SwiftGRPC
import XCTest

fileprivate class ClosingProvider: Echo_EchoProvider {
var doneExpectation: XCTestExpectation!

func get(request: Echo_EchoRequest, session: Echo_EchoGetSession) throws -> Echo_EchoResponse {
return Echo_EchoResponse()
}

func expand(request: Echo_EchoRequest, session: Echo_EchoExpandSession) throws {
let closeSem = DispatchSemaphore(value: 0)
try! session.close(withStatus: .ok) {
closeSem.signal()
}
XCTAssertThrowsError(try session.send(Echo_EchoResponse()))
doneExpectation.fulfill()
}

func collect(session: Echo_EchoCollectSession) throws { }

func update(session: Echo_EchoUpdateSession) throws { }
}

class CompletionQueueTests: BasicEchoTestCase {
static var allTests: [(String, (CompletionQueueTests) -> () throws -> Void)] {
return [
("testCompletionQueueThrowsAfterShutdown", testCompletionQueueThrowsAfterShutdown)
]
}

override func makeProvider() -> Echo_EchoProvider { return ClosingProvider() }
}

extension CompletionQueueTests {
func testCompletionQueueThrowsAfterShutdown() {
(self.provider as! ClosingProvider).doneExpectation = expectation(description: "end of server-side request handler reached")

let completionHandlerExpectation = expectation(description: "completion handler called")
_ = try! client.expand(Echo_EchoRequest(text: "foo bar baz")) { callResult in
XCTAssertEqual(.ok, callResult.statusCode)
XCTAssertEqual("OK", callResult.statusMessage)
completionHandlerExpectation.fulfill()
}

waitForExpectations(timeout: defaultTimeout)
}
}
4 changes: 2 additions & 2 deletions Tests/SwiftGRPCTests/EchoTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ extension EchoTests {
}

func testUnaryLotsOfRequests() {
// No need to spam the log with 50k lines.
// No need to spam the log with 10k lines.
server.shouldLogRequests = false
// Sending that many requests at once can sometimes trip things up, it seems.
client.timeout = 5.0
let clockStart = clock()
let numberOfRequests = 50_000
let numberOfRequests = 10_000
for i in 0..<numberOfRequests {
if i % 1_000 == 0 && i > 0 {
print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
Expand Down