Skip to content

Commit

Permalink
Support shutting down the handler ignoring any keepAlive periods (dar…
Browse files Browse the repository at this point in the history
…t-archive/sse#27)

* Support shutting down the handler ignoring any keepAlive periods
  • Loading branch information
DanTup authored Apr 23, 2020
1 parent c882ac6 commit 51ef2d5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 5 deletions.
5 changes: 5 additions & 0 deletions pkgs/sse/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 3.5.0

- Add new `shutdown` methods on `SseHandler` and `SseConnection` to allow closing
connections immediately, ignoring any keep-alive periods.

## 3.4.0

- Remove `onClose` from `SseConnection` and ensure the corresponding
Expand Down
24 changes: 20 additions & 4 deletions pkgs/sse/lib/src/server/sse_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,21 @@ class SseConnection extends StreamChannelMixin<String> {
if (_keepAlive == null) {
// Close immediately if we're not keeping alive.
_close();
} else if (!isInKeepAlivePeriod) {
// Otherwise if we didn't already have an active timer, set a timer to
// close after the timeout period. If the connection comes back, this will
// be cancelled and all messages left in the queue tried again.
} else if (!isInKeepAlivePeriod && !_closedCompleter.isCompleted) {
// Otherwise if we didn't already have an active timer and we've not already
// been completely closed, set a timer to close after the timeout period.
// If the connection comes back, this will be cancelled and all messages left
// in the queue tried again.
_keepAliveTimer = Timer(_keepAlive, _close);
}
}

void _close() {
if (!_closedCompleter.isCompleted) {
_closedCompleter.complete();
// Cancel any existing timer in case we were told to explicitly shut down
// to avoid keeping the process alive.
_keepAliveTimer?.cancel();
_sink.close();
if (!_outgoingController.isClosed) {
_outgoingStreamQueue.cancel(immediate: true);
Expand All @@ -134,6 +138,11 @@ class SseConnection extends StreamChannelMixin<String> {
if (!_incomingController.isClosed) _incomingController.close();
}
}

/// Immediately close the connection, ignoring any keepAlive period.
void shutdown() {
_close();
}
}

/// [SseHandler] handles requests on a user defined path to create
Expand Down Expand Up @@ -228,6 +237,13 @@ class SseHandler {
// Firefox does not set header "origin".
// https://bugzilla.mozilla.org/show_bug.cgi?id=1508661
req.headers['origin'] ?? req.headers['host'];

/// Immediately close all connections, ignoring any keepAlive periods.
void shutdown() {
for (final connection in _connections.values) {
connection.shutdown();
}
}
}

void closeSink(SseConnection connection) => connection._sink.close();
2 changes: 1 addition & 1 deletion pkgs/sse/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: sse
version: 3.4.0
version: 3.5.0
homepage: https://github.com/dart-lang/sse
description: >-
Provides client and server functionality for setting up bi-directional
Expand Down
20 changes: 20 additions & 0 deletions pkgs/sse/test/sse_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,26 @@ void main() {
// Ensure messages arrive in the same order
expect(await connection.stream.take(2).toList(), equals(['one', 'two']));
});

test('Explicit shutdown does not wait for keepAlive', () async {
expect(handler.numberOfClients, 0);
await webdriver.get('http://localhost:${server.port}');
await handler.connections.next;
expect(handler.numberOfClients, 1);

// Close the underlying connection.
handler.shutdown();

// Wait for a short period to allow the connection to close, but not
// long enough that the 30second keep-alive may have expired.
var maxPumps = 50;
while (handler.numberOfClients > 0 && maxPumps-- > 0) {
await pumpEventQueue(times: 1);
}

// Ensure there are not connected clients.
expect(handler.numberOfClients, 0);
});
}, timeout: const Timeout(Duration(seconds: 120)));
}

Expand Down

0 comments on commit 51ef2d5

Please sign in to comment.