Skip to content

Commit

Permalink
Feature/remove singletons (#7)
Browse files Browse the repository at this point in the history
* Update logger

* Remove singletons
  • Loading branch information
PlugFox authored Jul 25, 2023
1 parent 44d1f0a commit d2b8889
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 88 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 1.0.0-pre.5
## 1.0.0-pre.6

- **BREAKING CHANGE**: Change options to separate, platform-specific object
- You can now pass headers and other options to the IO web socket client
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ server-up:
server-down:
@docker compose -f server/docker-compose.yml down --remove-orphans

# dart run coverage:test_with_coverage -fb -o coverage -- --concurrency=6 --platform chrome,vm --coverage=./coverage --reporter=expanded test/ws_test.dart
coverage: get
@dart test --concurrency=6 --platform vm --coverage=coverage test/
@dart test --concurrency=6 --platform chrome,vm --coverage=coverage test/
@dart run coverage:format_coverage --lcov --in=coverage --out=coverage/lcov.info --packages=.packages --report-on=lib
# @mv coverage/lcov.info coverage/lcov.base.info
# @lcov -r coverage/lcov.base.info -o coverage/lcov.base.info "lib/**.freezed.dart" "lib/**.g.dart"
Expand Down
37 changes: 27 additions & 10 deletions lib/src/client/ws_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ final class WebSocketClient implements IWebSocketClient {
WebSocketClient([WebSocketOptions? options])
: _client = $platformWebSocketClient(options),
_options = options ?? WebSocketOptions.common() {
WebSocketMetricsManager.instance.startObserving(this);
_init();
}

/// Creates a [WebSocketClient] from an existing [IWebSocketClient].
Expand All @@ -38,7 +38,7 @@ final class WebSocketClient implements IWebSocketClient {
[WebSocketOptions? options])
: _client = client,
_options = options ?? WebSocketOptions.common() {
WebSocketMetricsManager.instance.startObserving(this);
_init();
}

/// {@macro ws_client}
Expand All @@ -47,6 +47,10 @@ final class WebSocketClient implements IWebSocketClient {

final IWebSocketClient _client;
final WebSocketEventQueue _eventQueue = WebSocketEventQueue();
late final WebSocketMetricsManager _metricsManager =
WebSocketMetricsManager(this);
late final WebSocketConnectionManager _connectionManager =
WebSocketConnectionManager(this);

/// Current options.
/// {@nodoc}
Expand All @@ -57,8 +61,18 @@ final class WebSocketClient implements IWebSocketClient {
bool _isClosed = false;

/// Get the metrics for this client.
WebSocketMetrics get metrics =>
WebSocketMetricsManager.instance.buildMetricFor(this);
WebSocketMetrics get metrics {
final (
:bool active,
:int attempt,
:DateTime? nextReconnectionAttempt,
) = _connectionManager.status;
return _metricsManager.buildMetric(
active: active,
attempt: attempt,
nextReconnectionAttempt: nextReconnectionAttempt,
);
}

@override
WebSocketMessagesStream get stream => _client.stream;
Expand All @@ -69,6 +83,10 @@ final class WebSocketClient implements IWebSocketClient {
@override
WebSocketClientState get state => _client.state;

void _init() {
_metricsManager.startObserving();
}

@override
Future<void> add(Object data) async {
if (_isClosed) return Future<void>.error(const WSClientClosedException());
Expand All @@ -84,15 +102,14 @@ final class WebSocketClient implements IWebSocketClient {
);
}
});
WebSocketMetricsManager.instance.sent(this, data);
_metricsManager.sent(this, data);
}

@override
Future<void> connect(String url) {
if (_isClosed) return Future<void>.error(const WSClientClosedException());
return _eventQueue.push('connect', () async {
WebSocketConnectionManager.instance.startMonitoringConnection(
this,
_connectionManager.startMonitoringConnection(
url,
_options.connectionRetryInterval,
);
Expand All @@ -115,7 +132,7 @@ final class WebSocketClient implements IWebSocketClient {
[int? code = 1000, String? reason = 'NORMAL_CLOSURE']) {
if (_isClosed) return Future<void>.error(const WSClientClosedException());
return _eventQueue.push('disconnect', () async {
WebSocketConnectionManager.instance.stopMonitoringConnection(this);
_connectionManager.stopMonitoringConnection();
try {
await Future<void>.sync(() => _client.disconnect(code, reason))
.timeout(_options.timeout);
Expand All @@ -136,7 +153,7 @@ final class WebSocketClient implements IWebSocketClient {
try {
_isClosed = true;
// Stop monitoring the connection.
WebSocketConnectionManager.instance.stopMonitoringConnection(this);
_connectionManager.stopMonitoringConnection();
// Clear the event queue and prevent new events from being processed.
// Returns when the queue is empty and no new events are being processed.
Future<void>.sync(_eventQueue.close).ignore();
Expand All @@ -154,7 +171,7 @@ final class WebSocketClient implements IWebSocketClient {
// Wait for the next microtask to ensure that the metrics are updated
// from state stream, before stopping observing.
scheduleMicrotask(() {
WebSocketMetricsManager.instance.stopObserving(this);
_metricsManager.stopObserving();
});
}
}
Expand Down
75 changes: 38 additions & 37 deletions lib/src/manager/connection_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,59 +10,60 @@ import 'package:ws/src/util/logger.dart';
@internal
final class WebSocketConnectionManager {
/// {@nodoc}
static final WebSocketConnectionManager instance =
WebSocketConnectionManager._internal();
WebSocketConnectionManager(IWebSocketClient client)
: _client = WeakReference<IWebSocketClient>(client);

/// {@nodoc}
WebSocketConnectionManager._internal();
final WeakReference<IWebSocketClient> _client;

/// {@nodoc}
final Expando<StreamSubscription<void>> _watchers =
Expando<StreamSubscription<void>>();
StreamSubscription<void>? _watcher;

/// {@nodoc}
final Expando<Timer> _timers = Expando<Timer>();
Timer? _timer;

/// {@nodoc}
final Expando<int> _attempts = Expando<int>();
int? _attempt;

/// {@nodoc}
final Expando<DateTime> _nextReconnectionAttempts = Expando<DateTime>();
DateTime? _nextReconnectionAttempt;

/// Recive the current status of reconnection for the client.
/// {@nodoc}
({
int attempt,
bool active,
DateTime? nextReconnectionAttempt,
}) getStatusFor(IWebSocketClient client) => (
attempt: _attempts[client] ?? 0,
active: _timers[client]?.isActive == true,
nextReconnectionAttempt: _nextReconnectionAttempts[client],
}) get status => (
attempt: _attempt ?? 0,
active: _timer?.isActive == true,
nextReconnectionAttempt: _nextReconnectionAttempt,
);

/// {@nodoc}
void startMonitoringConnection(
IWebSocketClient client,
String url,
({Duration max, Duration min})? connectionRetryInterval,
) {
stopMonitoringConnection(client);
if (client.isClosed || connectionRetryInterval == null) return;
stopMonitoringConnection();
final client = _client.target;
if (client == null || client.isClosed || connectionRetryInterval == null) {
return;
}
final stateChangesHandler = _handleStateChange(
client,
url,
connectionRetryInterval.min.inMilliseconds,
connectionRetryInterval.max.inMilliseconds,
);
_watchers[client] =
_watcher =
client.stateChanges.listen(stateChangesHandler, cancelOnError: false);
}

/// {@nodoc}
void stopMonitoringConnection(IWebSocketClient client) {
_stopSubscription(client);
_stopTimer(client);
void stopMonitoringConnection() {
_stopSubscription();
_stopTimer();
}

/// {@nodoc}
Expand All @@ -75,51 +76,51 @@ final class WebSocketConnectionManager {
(state) {
switch (state) {
case WebSocketClientState$Open _:
_stopTimer(client);
_attempts[client] = null; // reset attempt
_nextReconnectionAttempts[client] = null; // reset expected time
_stopTimer();
_attempt = null; // reset attempt
_nextReconnectionAttempt = null; // reset expected time
case WebSocketClientState$Closed _:
_stopTimer(client);
_stopTimer();
if (client.isClosed) return;
final attempt = _attempts[client] ?? 0;
final attempt = _attempt ?? 0;
final delay = backoffDelay(attempt, minMs, maxMs);
if (delay <= Duration.zero) {
config('Reconnecting to $lastUrl immediately.');
Future<void>.sync(() => client.connect(lastUrl)).ignore();
_attempts[client] = attempt + 1;
_attempt = attempt + 1;
return;
}
config('Reconnecting to $lastUrl '
'after ${delay.inMilliseconds} ms.');
_nextReconnectionAttempts[client] = DateTime.now().add(delay);
_timers[client] = Timer(
_nextReconnectionAttempt = DateTime.now().add(delay);
_timer = Timer(
delay,
() {
_nextReconnectionAttempts[client] = null;
_nextReconnectionAttempt = null;
if (client.isClosed) {
_stopTimer(client);
_stopTimer();
} else if (client.state.readyState.isClosed) {
config('Auto reconnecting to $lastUrl '
'after ${delay.inMilliseconds} ms.');
Future<void>.sync(() => client.connect(lastUrl)).ignore();
}
},
);
_attempts[client] = attempt + 1;
_attempt = attempt + 1;
case WebSocketClientState$Connecting _:
case WebSocketClientState$Disconnecting _:
}
};

void _stopSubscription(IWebSocketClient client) {
_watchers[client]?.cancel().ignore();
_watchers[client] = null;
void _stopSubscription() {
_watcher?.cancel().ignore();
_watcher = null;
}

void _stopTimer(IWebSocketClient client) {
_nextReconnectionAttempts[client] = null;
_timers[client]?.cancel();
_timers[client] = null;
void _stopTimer() {
_nextReconnectionAttempt = null;
_timer?.cancel();
_timer = null;
}

/// Full jitter technique.
Expand Down
61 changes: 28 additions & 33 deletions lib/src/manager/metrics_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,46 @@ import 'package:ws/src/client/metrics.dart';
import 'package:ws/src/client/state.dart';
import 'package:ws/src/client/web_socket_ready_state.dart';
import 'package:ws/src/client/ws_client_interface.dart';
import 'package:ws/src/manager/connection_manager.dart';

/// {@nodoc}
@internal
final class WebSocketMetricsManager {
/// {@nodoc}
static final WebSocketMetricsManager instance =
WebSocketMetricsManager._internal();
WebSocketMetricsManager(IWebSocketClient client)
: _client = WeakReference<IWebSocketClient>(client);

/// {@nodoc}
WebSocketMetricsManager._internal();
final WeakReference<IWebSocketClient> _client;

/// {@nodoc}
final Expando<StreamSubscription<Object>> _receiveObservers =
Expando<StreamSubscription<Object>>();
StreamSubscription<Object>? _receiveObserver;

/// {@nodoc}
final Expando<StreamSubscription<WebSocketClientState>> _stateObservers =
Expando<StreamSubscription<WebSocketClientState>>();
StreamSubscription<WebSocketClientState>? _stateObserver;

/// {@nodoc}
final Expando<$WebSocketMetrics> _metrics = Expando<$WebSocketMetrics>();
final $WebSocketMetrics _metrics = $WebSocketMetrics();

/// {@nodoc}
void startObserving(IWebSocketClient client) {
stopObserving(client);
final metrics = _getMetrics(client);
_receiveObservers[client] = client.stream.listen(
void startObserving() {
stopObserving();
final metrics = _metrics;
_receiveObserver = _client.target?.stream.listen(
(data) => _onDataReceived(metrics, data),
cancelOnError: false,
);
_stateObservers[client] = client.stateChanges.listen(
_stateObserver = _client.target?.stateChanges.listen(
(state) => _onStateChanged(metrics, state),
cancelOnError: false,
);
}

/// {@nodoc}
void stopObserving(IWebSocketClient client) {
_receiveObservers[client]?.cancel().ignore();
_stateObservers[client]?.cancel().ignore();
_receiveObservers[client] = null;
_stateObservers[client] = null;
void stopObserving() {
_receiveObserver?.cancel().ignore();
_stateObserver?.cancel().ignore();
_receiveObserver = null;
_stateObserver = null;
}

void _onDataReceived($WebSocketMetrics metrics, Object data) {
Expand Down Expand Up @@ -91,13 +88,9 @@ final class WebSocketMetricsManager {
}
}

/// {@nodoc}
$WebSocketMetrics _getMetrics(IWebSocketClient client) =>
_metrics[client] ??= $WebSocketMetrics();

/// {@nodoc}
@internal
void sent(IWebSocketClient client, Object data) => _getMetrics(client)
void sent(IWebSocketClient client, Object data) => _metrics
..transferredCount += BigInt.one
..transferredSize += switch (data) {
String text => BigInt.from(text.length),
Expand All @@ -107,12 +100,15 @@ final class WebSocketMetricsManager {

/// {@nodoc}
@internal
WebSocketMetrics buildMetricFor(IWebSocketClient client) {
final metrics = _getMetrics(client);
final readyState = client.state.readyState;
WebSocketMetrics buildMetric({
required bool active,
required int attempt,
required DateTime? nextReconnectionAttempt,
}) {
final metrics = _metrics;
final readyState =
_client.target?.state.readyState ?? WebSocketReadyState.closed;
final lastDisconnectTime = metrics.lastDisconnectTime;
final reconnectionStatus =
WebSocketConnectionManager.instance.getStatusFor(client);
return WebSocketMetrics(
timestamp: DateTime.now(),
readyState: readyState,
Expand All @@ -126,14 +122,13 @@ final class WebSocketMetricsManager {
lastDisconnectTime: lastDisconnectTime,
lastDisconnect: metrics.lastDisconnect,
lastUrl: metrics.lastUrl,
isReconnectionActive: reconnectionStatus.active,
currentReconnectAttempts: reconnectionStatus.attempt,
isReconnectionActive: active,
currentReconnectAttempts: attempt,
nextReconnectionAttempt: switch (readyState) {
WebSocketReadyState.open => null,
WebSocketReadyState.connecting => DateTime.now(),
WebSocketReadyState.disconnecting => null,
WebSocketReadyState.closed =>
reconnectionStatus.nextReconnectionAttempt,
WebSocketReadyState.closed => nextReconnectionAttempt,
},
);
}
Expand Down
Loading

0 comments on commit d2b8889

Please sign in to comment.