Skip to content

Commit

Permalink
Fix Websocket Cancellation Handling (grpc#917)
Browse files Browse the repository at this point in the history
* Continue reading websocket to handle closing

* added websocket transport to cancel propagation test (grpc#904)

(cherry picked from commit 36be481)

* Added cancellation test

Co-authored-by: Wiktor Jurkiewicz <[email protected]>
  • Loading branch information
MarcusLongmuir and watjurk authored Mar 16, 2021
1 parent 5cc8ddf commit 604a83d
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 83 deletions.
10 changes: 10 additions & 0 deletions go/grpcweb/websocket_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ func (w *webSocketWrappedReader) Read(p []byte) (int, error) {

// If the frame consists of only a single byte of value 1 then this indicates the client has finished sending
if len(framePayload) == 1 && framePayload[0] == 1 {
go func() {
for {
messageType, _, err := w.wsConn.Read(w.context)
if err == io.EOF || messageType == 0 {
// The client has closed the connection. Indicate to the response writer that it should close
w.cancel()
return
}
}
}()
return 0, io.EOF
}

Expand Down
161 changes: 82 additions & 79 deletions integration_test/ts/src/cancellation.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
} from "../_proto/improbable/grpcweb/test/test_pb";
import {TestService, TestUtilService} from "../_proto/improbable/grpcweb/test/test_pb_service";
import {DEBUG, continueStream} from "./util";
import { runWithHttp1AndHttp2 } from "./testRpcCombinations";
import { runWithHttp1AndHttp2, runWithSupportedTransports } from "./testRpcCombinations";

describe("Cancellation", () => {
runWithHttp1AndHttp2(({testHostUrl}) => {
Expand Down Expand Up @@ -52,91 +52,94 @@ describe("Cancellation", () => {
assert.equal(transportCancelFuncInvoked, true, "transport's cancel func must be invoked");
});

it("should handle aborting a streaming response mid-stream with propagation of the disconnection to the server", (done) => {
let onMessageId = 0;
runWithSupportedTransports((transport) => {
it("should handle aborting a streaming response mid-stream with propagation of the disconnection to the server", (done) => {
let onMessageId = 0;

const streamIdentifier = `rpc-${Math.random()}`;
const streamIdentifier = `rpc-${Math.random()}`;

const ping = new PingRequest();
ping.setValue("hello world");
ping.setResponseCount(100); // Request more messages than the client will accept before cancelling
ping.setStreamIdentifier(streamIdentifier);

let reqObj: grpc.Request;

// Checks are performed every 1s = 15s total wait
const maxAbortChecks = 15;

const numMessagesBeforeAbort = 5;

const doAbort = () => {
DEBUG && debug("doAbort");
reqObj.close();

// To ensure that the transport is successfully closing the connection, poll the server every 1s until
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
// but can take several seconds in others.
function checkAbort(attempt: number) {
DEBUG && debug("checkAbort", attempt);
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("checkAbort.continueStream.status", status);

const checkStreamClosedRequest = new CheckStreamClosedRequest();
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
grpc.unary(TestUtilService.CheckStreamClosed, {
debug: DEBUG,
request: checkStreamClosedRequest,
host: testHostUrl,
onEnd: ({message}) => {
const closed = ( message as CheckStreamClosedResponse ).getClosed();
DEBUG && debug("closed", closed);
if (closed) {
done();
} else {
if (attempt >= maxAbortChecks) {
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
const ping = new PingRequest();
ping.setValue("hello world");
ping.setResponseCount(100); // Request more messages than the client will accept before cancelling
ping.setStreamIdentifier(streamIdentifier);

let reqObj: grpc.Request;

// Checks are performed every 1s = 15s total wait
const maxAbortChecks = 15;

const numMessagesBeforeAbort = 5;

const doAbort = () => {
DEBUG && debug("doAbort");
reqObj.close();

// To ensure that the transport is successfully closing the connection, poll the server every 1s until
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
// but can take several seconds in others.
function checkAbort(attempt: number) {
DEBUG && debug("checkAbort", attempt);
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("checkAbort.continueStream.status", status);

const checkStreamClosedRequest = new CheckStreamClosedRequest();
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
grpc.unary(TestUtilService.CheckStreamClosed, {
debug: DEBUG,
request: checkStreamClosedRequest,
host: testHostUrl,
onEnd: ({message}) => {
const closed = ( message as CheckStreamClosedResponse ).getClosed();
DEBUG && debug("closed", closed);
if (closed) {
done();
} else {
setTimeout(() => {
checkAbort(attempt + 1);
}, 1000);
if (attempt >= maxAbortChecks) {
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
done();
} else {
setTimeout(() => {
checkAbort(attempt + 1);
}, 1000);
}
}
}
},
})
});
}
},
})
});
}

checkAbort(0);
};
checkAbort(0);
};

reqObj = grpc.invoke(TestService.PingList, {
debug: DEBUG,
request: ping,
host: testHostUrl,
onHeaders: (headers: grpc.Metadata) => {
DEBUG && debug("headers", headers);
},
onMessage: (message: PingResponse) => {
assert.ok(message instanceof PingResponse);
DEBUG && debug("onMessage.message.getCounter()", message.getCounter());
assert.strictEqual(message.getCounter(), onMessageId++);
if (message.getCounter() === numMessagesBeforeAbort) {
// Abort after receiving numMessagesBeforeAbort messages
doAbort();
} else if (message.getCounter() < numMessagesBeforeAbort) {
// Only request the next message if not yet aborted
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("onMessage.continueStream.status", status);
});
reqObj = grpc.invoke(TestService.PingList, {
debug: DEBUG,
request: ping,
host: testHostUrl,
transport: transport,
onHeaders: (headers: grpc.Metadata) => {
DEBUG && debug("headers", headers);
},
onMessage: (message: PingResponse) => {
assert.ok(message instanceof PingResponse);
DEBUG && debug("onMessage.message.getCounter()", message.getCounter());
assert.strictEqual(message.getCounter(), onMessageId++);
if (message.getCounter() === numMessagesBeforeAbort) {
// Abort after receiving numMessagesBeforeAbort messages
doAbort();
} else if (message.getCounter() < numMessagesBeforeAbort) {
// Only request the next message if not yet aborted
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("onMessage.continueStream.status", status);
});
}
},
onEnd: (status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
DEBUG && debug("status", status, "statusMessage", statusMessage, "trailers", trailers);
// onEnd shouldn't be called if abort is called prior to the response ending
assert.fail();
}
},
onEnd: (status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
DEBUG && debug("status", status, "statusMessage", statusMessage, "trailers", trailers);
// onEnd shouldn't be called if abort is called prior to the response ending
assert.fail();
}
});
}, 20000);
});
}, 20000);
})
});
});
109 changes: 105 additions & 4 deletions integration_test/ts/src/client.websocket.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ import { grpc } from "@improbable-eng/grpc-web";
import { debug } from "../../../client/grpc-web/src/debug";
import { assert } from "chai";
// Generated Test Classes
import { PingRequest, PingResponse } from "../_proto/improbable/grpcweb/test/test_pb";
import { TestService } from "../_proto/improbable/grpcweb/test/test_pb_service";
import { DEBUG, DISABLE_WEBSOCKET_TESTS } from "./util";
import {
CheckStreamClosedRequest,
CheckStreamClosedResponse,
PingRequest,
PingResponse
} from "../_proto/improbable/grpcweb/test/test_pb";
import { TestService, TestUtilService } from "../_proto/improbable/grpcweb/test/test_pb_service";
import { continueStream, DEBUG, DISABLE_WEBSOCKET_TESTS } from "./util";
import { headerTrailerCombos, runWithHttp1AndHttp2 } from "./testRpcCombinations";

if (DISABLE_WEBSOCKET_TESTS) {
Expand Down Expand Up @@ -73,7 +78,7 @@ if (DISABLE_WEBSOCKET_TESTS) {

describe("bidirectional (websockets)", () => {
headerTrailerCombos((withHeaders, withTrailers) => {
it("should make a bidirectional request that is terminated by the client", (done) => {
it("should make a bidirectional request that is ended by the client finishing sending", (done) => {
let didGetOnHeaders = false;
let counter = 1;
let lastMessage = `helloworld:${counter}`;
Expand Down Expand Up @@ -129,6 +134,102 @@ if (DISABLE_WEBSOCKET_TESTS) {
});
});

headerTrailerCombos((withHeaders, withTrailers) => {
it("should make a bidirectional request that is aborted by the client with propagation of the disconnection to the server", (done) => {
let didGetOnHeaders = false;
let counter = 1;
const streamIdentifier = `rpc-${Math.random()}`;
let lastMessage = `helloworld:${counter}`;
const ping = new PingRequest();
ping.setStreamIdentifier(streamIdentifier);
ping.setSendHeaders(withHeaders);
ping.setSendTrailers(withTrailers);
ping.setValue(lastMessage);

const client = grpc.client(TestService.PingPongBidi, {
debug: DEBUG,
host: testHostUrl,
transport: grpc.WebsocketTransport(),
});

// Checks are performed every 1s = 15s total wait
const maxAbortChecks = 15;

const doAbort = () => {
DEBUG && debug("doAbort");
client.close();

// To ensure that the transport is successfully closing the connection, poll the server every 1s until
// it confirms the connection was closed. Connection closure is immediate in some browser/transport combinations,
// but can take several seconds in others.
function checkAbort(attempt: number) {
DEBUG && debug("checkAbort", attempt);
continueStream(testHostUrl, streamIdentifier, (status) => {
DEBUG && debug("checkAbort.continueStream.status", status);

const checkStreamClosedRequest = new CheckStreamClosedRequest();
checkStreamClosedRequest.setStreamIdentifier(streamIdentifier);
grpc.unary(TestUtilService.CheckStreamClosed, {
debug: DEBUG,
request: checkStreamClosedRequest,
host: testHostUrl,
onEnd: ({message}) => {
const closed = ( message as CheckStreamClosedResponse ).getClosed();
DEBUG && debug("closed", closed);
if (closed) {
done();
} else {
if (attempt >= maxAbortChecks) {
assert.ok(closed, `server did not observe connection closure within ${maxAbortChecks} seconds`);
done();
} else {
setTimeout(() => {
checkAbort(attempt + 1);
}, 1000);
}
}
},
})
});
}

checkAbort(0);
};

client.onHeaders((headers: grpc.Metadata) => {
DEBUG && debug("headers", headers);
didGetOnHeaders = true;
if (withHeaders) {
assert.deepEqual(headers.get("HeaderTestKey1"), ["ServerValue1"]);
assert.deepEqual(headers.get("HeaderTestKey2"), ["ServerValue2"]);
}
});
client.onMessage((message: PingResponse) => {
assert.ok(message instanceof PingResponse);
assert.deepEqual(message.getValue(), lastMessage);

if (counter === 10) {
doAbort();
} else {
counter++;
lastMessage = `helloworld:${counter}`;
const ping = new PingRequest();
ping.setValue(lastMessage);
client.send(ping);
}
});
client.onEnd((status: grpc.Code, statusMessage: string, trailers: grpc.Metadata) => {
DEBUG && debug("status", status, "statusMessage", statusMessage);
// onEnd shouldn't be called if abort is called prior to the response ending
assert.fail();
});
client.start();

// send initial message
client.send(ping);
});
});

headerTrailerCombos((withHeaders, withTrailers) => {
it("should make a bidirectional request that is terminated by the server", (done) => {
let didGetOnHeaders = false;
Expand Down

0 comments on commit 604a83d

Please sign in to comment.