Skip to content
This repository has been archived by the owner on Jan 6, 2025. It is now read-only.

Support transparent reconnects on the server #19

Merged
merged 21 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 76 additions & 20 deletions lib/server/sse_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,37 @@ class SseConnection extends StreamChannelMixin<String> {
/// Outgoing messages to the Browser client.
final _outgoingController = StreamController<String>();

final Sink _sink;
Sink _sink;

/// How long to wait after a connection drops before considering it closed.
final Duration _keepAlive;

/// Whether the connection is in the timeout period waiting for a reconnect.
bool _isTimingOut = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of holding onto a boolean reference we can probably hold onto the underlying timer and cancel as necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! However it's slightly weird in that after cancelling I also need to set it to null so I can check for nulls elsewhere (otherwise we'd probably need a getter that wraps up checking that it is either null or cancelled - which would look a lot like this boolean - but let me know if you think that would be better).


/// The subscription that passes messages outgoing messages to the sink. This
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment reads odd to me. Also new line after the first sentence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, there was an extra "messages" there. Fixed - let me know if it still doesn't look right.

/// will be paused during the timeout/reconnect period.
StreamSubscription _outgoingStreamSubscription;

final _closedCompleter = Completer<void>();

SseConnection(this._sink) {
_outgoingController.stream.listen((data) {
SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment indicating what keepAlive means here and what a null value implies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment to the constructor describing this (I presume it's what you had in mind - if not, let me know).

_outgoingStreamSubscription = _outgoingController.stream.listen((data) {
if (!_closedCompleter.isCompleted) {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
try {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
} catch (StateError) {
if (_keepAlive == null) {
rethrow;
}
// If we got here then the sink may have closed but the stream.onDone
// hasn't fired yet, so pause the subscription, re-queue the message
// and handle the error as a disconnect.
_handleDisconnect();
_outgoingController.add(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative would be to use a StreamQueue and use peak / next to conditionally move forward. That should guarantee order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good - the order does concern me. However I'm not sure how this would work - without the stream, how would we fire this code?

I see there's a sync argument in StreamController that could fix this (this code would fire on the first event, so there wouldn't be others to get out of order), though I don't know if it'd introduce any other side-effects?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking something like this:

var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
    while (await outgoingStreamQueue.hasNext) {
      var peek = await outgoingStreamQueue.peek;
      try {
        send(peek);
        await outgoingStreamQueue.next;
      } catch (e) {
        // Handle error and don't call `next`
      }
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await outgoingStreamQueue.hasNext

Aha, I missed that - this looks sensible to me. I'll try and make it work Monday (as well as test on GitPod to confirm it actually solves the issue). Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code would burn CPU when an exception occurs (if we don't call next, hasNext will return true and just sit in a tight loop). The existing code pauses the stream so it wouldn't enter the handler again.

A fix could be to await Future.delayed(const Duration(milliseconds: x)) though it feels a bit weird (and if the delay is too large, we introduce latency, and too low and we burn more CPU).

Can you think of a better way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with a wait period of 200ms for now and added a test for in-order messages. I think this all works, though I don't know how you feel about the 200ms retry (I thought about using exponential backoff, but we could easily end up with a large delay that would blow the keepAlive period and a simple check 5 times per second doesn't seem too wasteful for something we'd expect to occur within a number of seconds).

LMK what you think!

      // If we're in a KeepAlive timeout, there's nowhere to send messages so
      // wait a short period and check again.
      if (isInKeepAlivePeriod) {
        await Future.delayed(const Duration(milliseconds: 200));
        continue;
      }

}
}
});
_outgoingController.onCancel = _close;
Expand All @@ -55,9 +76,35 @@ class SseConnection extends StreamChannelMixin<String> {
@override
Stream<String> get stream => _incomingController.stream;

void _acceptReconnection(Sink sink) {
_isTimingOut = false;
_sink = sink;
_outgoingStreamSubscription.resume();
}

void _handleDisconnect() {
if (_keepAlive == null) {
_close();
} else if (!_isTimingOut) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic will likely be cleaned up if we only have reference to the current timer instead of indirectly making refrerence through the _isTimeOut member.

_outgoingStreamSubscription.pause();
_isTimingOut = true;
// If after the timeout period we're still in this state, we'll close.
Timer(_keepAlive, () {
if (_isTimingOut) {
_isTimingOut = false;
_close();
}
});
}
}

// TODO(dantup): @visibleForTesting?
void closeSink() => _sink.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slightly better pattern to this but it will require some work.

Let's move this sse_handler.dart under src. Make this closeSink a top level method that takes an SseConnection. Now export everything under sse_handler.dart but closeSink.

In the test you can import from src without concern and call closeSink. Note you can also annotate that it is only visible for testing.

See this parallel example:
https://github.com/dart-lang/webdev/blob/2958ede70f20c402e869b4169f3d2f97162e51d4/dwds/lib/src/connections/debug_connection.dart#L50

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice! I've done that - and I committed the move as its own changeset to try and avoid Git just showing "this file deleted" and "this file added", however the overall diff here seems to have ignored that. This could cause conflicts if anyone else has outstanding changes in that file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have any pending PRs so LGTM.


void _close() {
if (!_closedCompleter.isCompleted) {
_closedCompleter.complete();
_outgoingStreamSubscription.cancel();
_sink.close();
if (!_outgoingController.isClosed) _outgoingController.close();
if (!_incomingController.isClosed) _incomingController.close();
Expand All @@ -73,12 +120,13 @@ class SseConnection extends StreamChannelMixin<String> {
class SseHandler {
final _logger = Logger('SseHandler');
final Uri _uri;
final Duration _keepAlive;
final _connections = <String, SseConnection>{};
final _connectionController = StreamController<SseConnection>();

StreamQueue<SseConnection> _connectionsStream;

SseHandler(this._uri);
SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive;

StreamQueue<SseConnection> get connections =>
_connectionsStream ??= StreamQueue(_connectionController.stream);
Expand All @@ -92,20 +140,28 @@ class SseHandler {
var sink = utf8.encoder.startChunkedConversion(channel.sink);
sink.add(_sseHeaders(req.headers['origin']));
var clientId = req.url.queryParameters['sseClientId'];
var connection = SseConnection(sink);
_connections[clientId] = connection;
unawaited(connection._closedCompleter.future.then((_) {
_connections.remove(clientId);
}));
// Remove connection when it is remotely closed or the stream is
// cancelled.
channel.stream.listen((_) {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
connection._close();
});

_connectionController.add(connection);
// Check if we already have a connection for this ID that is in the process
// of timing out (in which case we can reconnect it transparently).
if (_connections[clientId] != null &&
_connections[clientId]._isTimingOut) {
_connections[clientId]._acceptReconnection(sink);
} else {
var connection = SseConnection(sink, keepAlive: _keepAlive);
_connections[clientId] = connection;
unawaited(connection._closedCompleter.future.then((_) {
_connections.remove(clientId);
}));
// Remove connection when it is remotely closed or the stream is
// cancelled.
channel.stream.listen((_) {
// SSE is unidirectional. Responses are handled through POST requests.
}, onDone: () {
connection._handleDisconnect();
});

_connectionController.add(connection);
}
});
return shelf.Response.notFound('');
}
Expand Down
244 changes: 153 additions & 91 deletions test/sse_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,99 +33,161 @@ void main() {
chromeDriver.kill();
});

setUp(() async {
handler = SseHandler(Uri.parse('/test'));

var cascade = shelf.Cascade().add(handler.handler).add(_faviconHandler).add(
createStaticHandler('test/web',
listDirectories: true, defaultDocument: 'index.html'));

server = await io.serve(cascade.handler, 'localhost', 0);
var capabilities = Capabilities.chrome
..addAll({
Capabilities.chromeOptions: {
'args': ['--headless']
}
});
webdriver = await createDriver(desired: capabilities);
group('SSE', () {
setUp(() async {
handler = SseHandler(Uri.parse('/test'));

var cascade = shelf.Cascade()
.add(handler.handler)
.add(_faviconHandler)
.add(createStaticHandler('test/web',
listDirectories: true, defaultDocument: 'index.html'));

server = await io.serve(cascade.handler, 'localhost', 0);
var capabilities = Capabilities.chrome
..addAll({
Capabilities.chromeOptions: {
'args': ['--headless']
}
});
webdriver = await createDriver(desired: capabilities);
});

tearDown(() async {
await webdriver.quit();
await server.close();
});

test('Can round trip messages', () async {
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
connection.sink.add('blah');
expect(await connection.stream.first, 'blah');
});

test('Multiple clients can connect', () async {
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
await connections.next;
await webdriver.get('http://localhost:${server.port}');
await connections.next;
});

test('Routes data correctly', () async {
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
var connectionA = await connections.next;
connectionA.sink.add('foo');
expect(await connectionA.stream.first, 'foo');

await webdriver.get('http://localhost:${server.port}');
var connectionB = await connections.next;
connectionB.sink.add('bar');
expect(await connectionB.stream.first, 'bar');
});

test('Can close from the server', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);
await connection.sink.close();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

test('Client reconnects after being disconnected', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);
await connection.sink.close();
await pumpEventQueue();
expect(handler.numberOfClients, 0);

// Ensure the client reconnects
await handler.connections.next;
});

test('Can close from the client-side', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);

var closeButton = await webdriver.findElement(const By.tagName('button'));
await closeButton.click();

// Should complete since the connection is closed.
await connection.stream.toList();
expect(handler.numberOfClients, 0);
});

test('Cancelling the listener closes the connection', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);

var sub = connection.stream.listen((_) {});
await sub.cancel();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

test('Disconnects when navigating away', () async {
await webdriver.get('http://localhost:${server.port}');
expect(handler.numberOfClients, 1);

await webdriver.get('chrome://version/');
expect(handler.numberOfClients, 0);
});
});

tearDown(() async {
await webdriver.quit();
await server.close();
});

test('Can round trip messages', () async {
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
connection.sink.add('blah');
expect(await connection.stream.first, 'blah');
});

test('Multiple clients can connect', () async {
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
await connections.next;
await webdriver.get('http://localhost:${server.port}');
await connections.next;
});

test('Routes data correctly', () async {
var connections = handler.connections;
await webdriver.get('http://localhost:${server.port}');
var connectionA = await connections.next;
connectionA.sink.add('foo');
expect(await connectionA.stream.first, 'foo');

await webdriver.get('http://localhost:${server.port}');
var connectionB = await connections.next;
connectionB.sink.add('bar');
expect(await connectionB.stream.first, 'bar');
});

test('Can close from the server', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);
await connection.sink.close();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

test('Can close from the client-side', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);

var closeButton = await webdriver.findElement(const By.tagName('button'));
await closeButton.click();

// Should complete since the connection is closed.
await connection.stream.toList();
expect(handler.numberOfClients, 0);
});

test('Cancelling the listener closes the connection', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);

var sub = connection.stream.listen((_) {});
await sub.cancel();
await pumpEventQueue();
expect(handler.numberOfClients, 0);
});

test('Disconnects when navigating away', () async {
await webdriver.get('http://localhost:${server.port}');
expect(handler.numberOfClients, 1);

await webdriver.get('chrome://version/');
expect(handler.numberOfClients, 0);
});
group('SSE with server keep-alive', () {
setUp(() async {
handler =
SseHandler(Uri.parse('/test'), keepAlive: const Duration(seconds: 5));

var cascade = shelf.Cascade()
.add(handler.handler)
.add(_faviconHandler)
.add(createStaticHandler('test/web',
listDirectories: true, defaultDocument: 'index.html'));

server = await io.serve(cascade.handler, 'localhost', 0);
var capabilities = Capabilities.chrome
..addAll({
Capabilities.chromeOptions: {
'args': ['--headless']
}
});
webdriver = await createDriver(desired: capabilities);
});

tearDown(() async {
await webdriver.quit();
await server.close();
});

test('Client reconnect use the same connection', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
var connection = await handler.connections.next;
expect(handler.numberOfClients, 1);

// Close the underlying connection.
connection.closeSink();
await pumpEventQueue();

// Ensure there's still a connection.
expect(handler.numberOfClients, 1);

// Ensure we can still round-trip data on the original connection.
connection.sink.add('bar');
expect(await connection.stream.first, 'bar');
});
}, timeout: const Timeout(Duration(seconds: 120)));
}

FutureOr<shelf.Response> _faviconHandler(shelf.Request request) {
Expand Down