-
Notifications
You must be signed in to change notification settings - Fork 24
Support transparent reconnects on the server #19
Changes from 9 commits
c48bbeb
aca4698
a1fb6af
39b7d99
dc5e488
652a016
c3931c1
5d5f4f8
fea430f
0a5827c
254142e
fee2787
bef2937
b5f53f9
8af4a5e
76bd181
1437f08
58fc9e5
611cee9
eac5b4e
a57ed2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,16 +28,37 @@ class SseConnection extends StreamChannelMixin<String> { | |
/// Outgoing messages to the Browser client. | ||
final _outgoingController = StreamController<String>(); | ||
|
||
final Sink _sink; | ||
Sink _sink; | ||
|
||
/// How long to wait after a connection drops before considering it closed. | ||
final Duration _keepAlive; | ||
|
||
/// Whether the connection is in the timeout period waiting for a reconnect. | ||
bool _isTimingOut = false; | ||
|
||
/// The subscription that passes messages outgoing messages to the sink. This | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment reads odd to me. Also new line after the first sentence. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, there was an extra "messages" there. Fixed - let me know if it still doesn't look right. |
||
/// will be paused during the timeout/reconnect period. | ||
StreamSubscription _outgoingStreamSubscription; | ||
|
||
final _closedCompleter = Completer<void>(); | ||
|
||
SseConnection(this._sink) { | ||
_outgoingController.stream.listen((data) { | ||
SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a comment indicating what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a comment to the constructor describing this (I presume it's what you had in mind - if not, let me know). |
||
_outgoingStreamSubscription = _outgoingController.stream.listen((data) { | ||
if (!_closedCompleter.isCompleted) { | ||
// JSON encode the message to escape new lines. | ||
_sink.add('data: ${json.encode(data)}\n'); | ||
_sink.add('\n'); | ||
try { | ||
// JSON encode the message to escape new lines. | ||
_sink.add('data: ${json.encode(data)}\n'); | ||
_sink.add('\n'); | ||
} catch (StateError) { | ||
if (_keepAlive == null) { | ||
rethrow; | ||
} | ||
// If we got here then the sink may have closed but the stream.onDone | ||
// hasn't fired yet, so pause the subscription, re-queue the message | ||
// and handle the error as a disconnect. | ||
_handleDisconnect(); | ||
_outgoingController.add(data); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An alternative would be to use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds good - the order does concern me. However I'm not sure how this would work - without the stream, how would we fire this code? I see there's a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking something like this: var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
while (await outgoingStreamQueue.hasNext) {
var peek = await outgoingStreamQueue.peek;
try {
send(peek);
await outgoingStreamQueue.next;
} catch (e) {
// Handle error and don't call `next`
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Aha, I missed that - this looks sensible to me. I'll try and make it work Monday (as well as test on GitPod to confirm it actually solves the issue). Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this code would burn CPU when an exception occurs (if we don't call A fix could be to Can you think of a better way? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went with a wait period of 200ms for now and added a test for in-order messages. I think this all works, though I don't know how you feel about the 200ms retry (I thought about using exponential backoff, but we could easily end up with a large delay that would blow the keepAlive period and a simple check 5 times per second doesn't seem too wasteful for something we'd expect to occur within a number of seconds). LMK what you think! // If we're in a KeepAlive timeout, there's nowhere to send messages so
// wait a short period and check again.
if (isInKeepAlivePeriod) {
await Future.delayed(const Duration(milliseconds: 200));
continue;
} |
||
} | ||
} | ||
}); | ||
_outgoingController.onCancel = _close; | ||
|
@@ -55,9 +76,35 @@ class SseConnection extends StreamChannelMixin<String> { | |
@override | ||
Stream<String> get stream => _incomingController.stream; | ||
|
||
void _acceptReconnection(Sink sink) { | ||
_isTimingOut = false; | ||
_sink = sink; | ||
_outgoingStreamSubscription.resume(); | ||
} | ||
|
||
void _handleDisconnect() { | ||
if (_keepAlive == null) { | ||
_close(); | ||
} else if (!_isTimingOut) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic will likely be cleaned up if we only have reference to the current timer instead of indirectly making refrerence through the _isTimeOut member. |
||
_outgoingStreamSubscription.pause(); | ||
_isTimingOut = true; | ||
// If after the timeout period we're still in this state, we'll close. | ||
Timer(_keepAlive, () { | ||
if (_isTimingOut) { | ||
_isTimingOut = false; | ||
_close(); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
// TODO(dantup): @visibleForTesting? | ||
void closeSink() => _sink.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a slightly better pattern to this but it will require some work. Let's move this In the test you can import from See this parallel example: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, nice! I've done that - and I committed the move as its own changeset to try and avoid Git just showing "this file deleted" and "this file added", however the overall diff here seems to have ignored that. This could cause conflicts if anyone else has outstanding changes in that file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we have any pending PRs so LGTM. |
||
|
||
void _close() { | ||
if (!_closedCompleter.isCompleted) { | ||
_closedCompleter.complete(); | ||
_outgoingStreamSubscription.cancel(); | ||
_sink.close(); | ||
if (!_outgoingController.isClosed) _outgoingController.close(); | ||
if (!_incomingController.isClosed) _incomingController.close(); | ||
|
@@ -73,12 +120,13 @@ class SseConnection extends StreamChannelMixin<String> { | |
class SseHandler { | ||
final _logger = Logger('SseHandler'); | ||
final Uri _uri; | ||
final Duration _keepAlive; | ||
final _connections = <String, SseConnection>{}; | ||
final _connectionController = StreamController<SseConnection>(); | ||
|
||
StreamQueue<SseConnection> _connectionsStream; | ||
|
||
SseHandler(this._uri); | ||
SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive; | ||
|
||
StreamQueue<SseConnection> get connections => | ||
_connectionsStream ??= StreamQueue(_connectionController.stream); | ||
|
@@ -92,20 +140,28 @@ class SseHandler { | |
var sink = utf8.encoder.startChunkedConversion(channel.sink); | ||
sink.add(_sseHeaders(req.headers['origin'])); | ||
var clientId = req.url.queryParameters['sseClientId']; | ||
var connection = SseConnection(sink); | ||
_connections[clientId] = connection; | ||
unawaited(connection._closedCompleter.future.then((_) { | ||
_connections.remove(clientId); | ||
})); | ||
// Remove connection when it is remotely closed or the stream is | ||
// cancelled. | ||
channel.stream.listen((_) { | ||
// SSE is unidirectional. Responses are handled through POST requests. | ||
}, onDone: () { | ||
connection._close(); | ||
}); | ||
|
||
_connectionController.add(connection); | ||
// Check if we already have a connection for this ID that is in the process | ||
// of timing out (in which case we can reconnect it transparently). | ||
if (_connections[clientId] != null && | ||
_connections[clientId]._isTimingOut) { | ||
_connections[clientId]._acceptReconnection(sink); | ||
} else { | ||
var connection = SseConnection(sink, keepAlive: _keepAlive); | ||
_connections[clientId] = connection; | ||
unawaited(connection._closedCompleter.future.then((_) { | ||
_connections.remove(clientId); | ||
})); | ||
// Remove connection when it is remotely closed or the stream is | ||
// cancelled. | ||
channel.stream.listen((_) { | ||
// SSE is unidirectional. Responses are handled through POST requests. | ||
}, onDone: () { | ||
connection._handleDisconnect(); | ||
}); | ||
|
||
_connectionController.add(connection); | ||
} | ||
}); | ||
return shelf.Response.notFound(''); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of holding onto a boolean reference we can probably hold onto the underlying timer and cancel as necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! However it's slightly weird in that after cancelling I also need to set it to
null
so I can check for nulls elsewhere (otherwise we'd probably need a getter that wraps up checking that it is either null or cancelled - which would look a lot like this boolean - but let me know if you think that would be better).