Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Two important fixes (and possibly more improvements) #188

Merged
merged 5 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Sources/CgRPC/shim/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
#include <assert.h>

void cgrpc_call_destroy(cgrpc_call *call) {
//grpc_call_destroy(call->call);
if (call->call) {
grpc_call_unref(call->call);
}
free(call);
}

Expand Down
1 change: 1 addition & 0 deletions Sources/CgRPC/shim/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ cgrpc_call *cgrpc_channel_create_call(cgrpc_channel *channel,
// create call
host_slice = grpc_slice_from_copied_string(host);
gpr_timespec deadline = cgrpc_deadline_in_seconds_from_now(timeout);
// The resulting call will have a retain call of +1. We'll release it in `cgrpc_call_destroy()`.
grpc_call *channel_call = grpc_channel_create_call(channel->channel,
NULL,
GRPC_PROPAGATE_DEFAULTS,
Expand Down
13 changes: 10 additions & 3 deletions Sources/CgRPC/shim/handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void cgrpc_handler_destroy(cgrpc_handler *h) {
grpc_metadata_array_destroy(&(h->request_metadata_recv));
grpc_call_details_destroy(&(h->call_details));
if (h->server_call) {
//grpc_call_destroy(h->server_call);
grpc_call_unref(h->server_call);
}
free(h);
}
Expand Down Expand Up @@ -67,6 +67,10 @@ cgrpc_call *cgrpc_handler_get_call(cgrpc_handler *h) {
cgrpc_call *call = (cgrpc_call *) malloc(sizeof(cgrpc_call));
memset(call, 0, sizeof(cgrpc_call));
call->call = h->server_call;
if (call->call) {
// This retain will be balanced by `cgrpc_call_destroy()`.
grpc_call_ref(call->call);
}
return call;
}

Expand All @@ -77,6 +81,11 @@ cgrpc_completion_queue *cgrpc_handler_get_completion_queue(cgrpc_handler *h) {
grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h,
cgrpc_metadata_array *metadata,
long tag) {
if (h->server_call != NULL) {
return GRPC_CALL_OK;
}
// This fills `h->server_call` with a call with retain count of +1.
// We'll release that retain in `cgrpc_handler_destroy()`.
return grpc_server_request_call(h->server->server,
&(h->server_call),
&(h->call_details),
Expand All @@ -85,5 +94,3 @@ grpc_call_error cgrpc_handler_request_call(cgrpc_handler *h,
h->server->completion_queue,
cgrpc_create_tag(tag));
}


2 changes: 1 addition & 1 deletion Sources/SwiftGRPC/Core/Handler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class Handler {
/// A Call object that can be used to respond to the request
public private(set) lazy var call: Call = {
Call(underlyingCall: cgrpc_handler_get_call(self.underlyingHandler),
owned: false,
owned: true,
completionQueue: self.completionQueue)
}()

Expand Down
6 changes: 3 additions & 3 deletions Sources/SwiftGRPC/Core/Metadata.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ public class Metadata: CustomStringConvertible {
}

public var description: String {
var result = ""
var lines: [String] = []
for i in 0..<count() {
let key = self.key(i)
let value = self.value(i)
result += (key ?? "(nil)") + ":" + (value ?? "(nil)") + "\n"
lines.append((key ?? "(nil)") + ":" + (value ?? "(nil)"))
}
return result
return lines.joined(separator: "\n")
}

public func copy() -> Metadata {
Expand Down
16 changes: 10 additions & 6 deletions Sources/SwiftGRPC/Runtime/ServiceServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ open class ServiceServer {
public let address: String
public let server: Server

public var shouldLogRequests = true

/// Create a server that accepts insecure connections.
public init(address: String) {
gRPC.initialize()
Expand Down Expand Up @@ -58,13 +60,15 @@ open class ServiceServer {
return
}

let unwrappedHost = handler.host ?? "(nil)"
let unwrappedMethod = handler.method ?? "(nil)"
let unwrappedCaller = handler.caller ?? "(nil)"
print("Server received request to " + unwrappedHost
+ " calling " + unwrappedMethod
+ " from " + unwrappedCaller
+ " with " + handler.requestMetadata.description)
if strongSelf.shouldLogRequests == true {
let unwrappedHost = handler.host ?? "(nil)"
let unwrappedCaller = handler.caller ?? "(nil)"
print("Server received request to " + unwrappedHost
+ " calling " + unwrappedMethod
+ " from " + unwrappedCaller
+ " with " + handler.requestMetadata.description)
}

do {
if !(try strongSelf.handleMethod(unwrappedMethod, handler: handler, queue: queue)) {
Expand Down
15 changes: 15 additions & 0 deletions Tests/SwiftGRPCTests/EchoTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class EchoTests: BasicEchoTestCase {
static var allTests: [(String, (EchoTests) -> () throws -> Void)] {
return [
("testUnary", testUnary),
("testUnaryLotsOfRequests", testUnaryLotsOfRequests),
("testClientStreaming", testClientStreaming),
("testClientStreamingLotsOfMessages", testClientStreamingLotsOfMessages),
("testServerStreaming", testServerStreaming),
Expand All @@ -48,6 +49,20 @@ extension EchoTests {
XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text)
XCTAssertEqual("Swift echo get: foo", try! client.get(Echo_EchoRequest(text: "foo")).text)
}

func testUnaryLotsOfRequests() {
// No need to spam the log with 50k lines.
server.shouldLogRequests = false
let clockStart = clock()
let numberOfRequests = 50_000
for i in 0..<numberOfRequests {
if i % 1_000 == 0 {
print("\(i) requests sent so far, elapsed time: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
}
XCTAssertEqual("Swift echo get: foo \(i)", try client.get(Echo_EchoRequest(text: "foo \(i)")).text)
}
print("total time for \(numberOfRequests) requests: \(Double(clock() - clockStart) / Double(CLOCKS_PER_SEC))")
}
}

extension EchoTests {
Expand Down
124 changes: 81 additions & 43 deletions Tests/SwiftGRPCTests/GRPCTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@ import Foundation
import XCTest

class gRPCTests: XCTestCase {
// We have seen this test flake out in rare cases fairly often due to race conditions.
// To detect such rare errors, we run the tests several times.
// (By now, all known errors should have been fixed, but we'd still like to detect new ones.)
let testRepetitions = 10

func testConnectivity() {
runTest(useSSL: false)
for _ in 0..<testRepetitions {
runTest(useSSL: false)
}
}

func testConnectivitySecure() {
runTest(useSSL: true)
for _ in 0..<testRepetitions {
runTest(useSSL: true)
}
}

static var allTests: [(String, (gRPCTests) -> () throws -> Void)] {
Expand Down Expand Up @@ -75,11 +84,8 @@ let helloServerStream = "/hello.server-stream"
let helloBiDiStream = "/hello.bidi-stream"

// Return code/message for unary test
let oddStatusCode = StatusCode.ok
let oddStatusMessage = "OK"

let evenStatusCode = StatusCode.notFound
let eventStatusMessage = "Not Found"
let evenStatusMessage = "some other status message"

func runTest(useSSL: Bool) {
gRPC.initialize()
Expand Down Expand Up @@ -141,9 +147,12 @@ func runClient(useSSL: Bool) throws {
}

channel.host = host
try callUnary(channel: channel)
try callServerStream(channel: channel)
try callBiDiStream(channel: channel)
for _ in 0..<10 {
// Send several calls to each server we spin up, to ensure that each individual server can handle many requests.
try callUnary(channel: channel)
try callServerStream(channel: channel)
try callBiDiStream(channel: channel)
}
}

func callUnary(channel: Channel) throws {
Expand All @@ -157,23 +166,32 @@ func callUnary(channel: Channel) throws {
try call.start(.unary, metadata: metadata, message: message) {
response in
// verify the basic response from the server
XCTAssertEqual(response.statusCode, (i % 2 == 0) ? evenStatusCode : oddStatusCode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we never check for non .ok status codes which was the original bug filed in #144.

XCTAssertEqual(response.statusMessage, (i % 2 == 0) ? eventStatusMessage : oddStatusMessage)
XCTAssertEqual(response.statusCode, .ok)
XCTAssertEqual(response.statusMessage, (i % 2 == 0) ? evenStatusMessage : oddStatusMessage)

// verify the message from the server
if (i % 2) == 0 {
let resultData = response.resultData!
let messageString = String(data: resultData, encoding: .utf8)
XCTAssertEqual(messageString, serverText)
if let resultData = response.resultData {
let messageString = String(data: resultData, encoding: .utf8)
XCTAssertEqual(messageString, serverText)
} else {
XCTFail("callUnary response missing")
}
}

// verify the initial metadata from the server
let initialMetadata = response.initialMetadata!
verify_metadata(initialMetadata, expected: initialServerMetadata)

if let initialMetadata = response.initialMetadata {
verify_metadata(initialMetadata, expected: initialServerMetadata)
} else {
XCTFail("callUnary initial metadata missing")
}

// verify the trailing metadata from the server
let trailingMetadata = response.trailingMetadata!
verify_metadata(trailingMetadata, expected: trailingServerMetadata)
if let trailingMetadata = response.trailingMetadata {
verify_metadata(trailingMetadata, expected: trailingServerMetadata)
} else {
XCTFail("callUnary trailing metadata missing")
}

// report completion
sem.signal()
Expand All @@ -197,8 +215,11 @@ func callServerStream(channel: Channel) throws {
XCTAssertEqual(response.statusMessage, "Custom Status Message ServerStreaming")

// verify the trailing metadata from the server
let trailingMetadata = response.trailingMetadata!
verify_metadata(trailingMetadata, expected: trailingServerMetadata)
if let trailingMetadata = response.trailingMetadata {
verify_metadata(trailingMetadata, expected: trailingServerMetadata)
} else {
XCTFail("callServerStream trailing metadata missing")
}

sem.signal() // signal call is finished
}
Expand All @@ -224,29 +245,31 @@ let clientPing = "ping"
let serverPong = "pong"

func callBiDiStream(channel: Channel) throws {
let message = clientPing.data(using: .utf8)
let metadata = Metadata(initialClientMetadata)

let sem = DispatchSemaphore(value: 0)
let method = helloBiDiStream
let call = channel.makeCall(method)
try call.start(.bidiStreaming, metadata: metadata, message: message) {
try call.start(.bidiStreaming, metadata: metadata, message: nil) {
response in

XCTAssertEqual(response.statusCode, .ok)
XCTAssertEqual(response.statusMessage, "Custom Status Message BiDi")

// verify the trailing metadata from the server
let trailingMetadata = response.trailingMetadata!
verify_metadata(trailingMetadata, expected: trailingServerMetadata)
if let trailingMetadata = response.trailingMetadata {
verify_metadata(trailingMetadata, expected: trailingServerMetadata)
} else {
XCTFail("callBiDiStream trailing metadata missing")
}

sem.signal() // signal call is finished
}

// Send pings
let message = clientPing.data(using: .utf8)!
for _ in 0..<steps {
let message = clientPing.data(using: .utf8)
try call.sendMessage(data: message!) { (err) in
try call.sendMessage(data: message) { err in
XCTAssertNil(err)
}
call.messageQueueEmpty.wait()
Expand Down Expand Up @@ -313,21 +336,28 @@ func handleUnary(requestHandler: Handler, requestCount: Int) throws {
let initialMetadata = requestHandler.requestMetadata
verify_metadata(initialMetadata, expected: initialClientMetadata)
let initialMetadataToSend = Metadata(initialServerMetadata)
try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) { messageData in
let messageString = String(data: messageData!, encoding: .utf8)
XCTAssertEqual(messageString, clientText)
try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
if let messageData = $0 {
let messageString = String(data: messageData, encoding: .utf8)
XCTAssertEqual(messageString, clientText)
} else {
XCTFail("handleUnary message missing")
}
}

// We need to return status OK in both cases, as it seems like the server might never send out the last few messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a huge problem. We need to assert that the server sends an appropriate status code and if it's not currently then tests should be failing

// once it has been asked to send a non-OK status. Alternatively, we could send a non-OK status here, but then we
// would need to sleep for a few milliseconds before sending the non-OK status.
let replyMessage = serverText.data(using: .utf8)!
if (requestCount % 2) == 0 {
let replyMessage = serverText
let trailingMetadataToSend = Metadata(trailingServerMetadata)
try requestHandler.sendResponse(message: replyMessage.data(using: .utf8)!,
status: ServerStatus(code: evenStatusCode,
message: eventStatusMessage,
try requestHandler.sendResponse(message: replyMessage,
status: ServerStatus(code: .ok,
message: evenStatusMessage,
trailingMetadata: trailingMetadataToSend))
} else {
let trailingMetadataToSend = Metadata(trailingServerMetadata)
try requestHandler.sendStatus(ServerStatus(code: oddStatusCode,
try requestHandler.sendStatus(ServerStatus(code: .ok,
message: oddStatusMessage,
trailingMetadata: trailingMetadataToSend))
}
Expand All @@ -340,14 +370,18 @@ func handleServerStream(requestHandler: Handler) throws {
verify_metadata(initialMetadata, expected: initialClientMetadata)

let initialMetadataToSend = Metadata(initialServerMetadata)
try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) { messageData in
let messageString = String(data: messageData!, encoding: .utf8)
XCTAssertEqual(messageString, clientText)
try requestHandler.receiveMessage(initialMetadata: initialMetadataToSend) {
if let messageData = $0 {
let messageString = String(data: messageData, encoding: .utf8)
XCTAssertEqual(messageString, clientText)
} else {
XCTFail("handleServerStream message missing")
}
}

let replyMessage = serverText
let replyMessage = serverText.data(using: .utf8)!
for _ in 0..<steps {
try requestHandler.call.sendMessage(data: replyMessage.data(using: .utf8)!) { error in
try requestHandler.call.sendMessage(data: replyMessage) { error in
XCTAssertNil(error)
}
requestHandler.call.messageQueueEmpty.wait()
Expand Down Expand Up @@ -380,8 +414,12 @@ func handleBiDiStream(requestHandler: Handler) throws {
for _ in 0..<steps {
let receiveSem = DispatchSemaphore(value: 0)
try requestHandler.call.receiveMessage { callStatus in
let messageString = String(data: callStatus.resultData!, encoding: .utf8)
XCTAssertEqual(messageString, clientPing)
if let messageData = callStatus.resultData {
let messageString = String(data: messageData, encoding: .utf8)
XCTAssertEqual(messageString, clientPing)
} else {
XCTFail("handleBiDiStream message empty")
}
receiveSem.signal()
}
_ = receiveSem.wait()
Expand Down