From 67ed073cbf7ef5d99011718e328bfc903ce34e14 Mon Sep 17 00:00:00 2001 From: Natalie Weizenbaum Date: Tue, 20 Jun 2023 15:13:45 -0700 Subject: [PATCH] Fix a race condition in IsolateChannel.connectReceive() (dart-lang/stream_channel#92) --- pkgs/stream_channel/CHANGELOG.md | 4 +++- pkgs/stream_channel/lib/src/isolate_channel.dart | 14 ++++++++++++-- pkgs/stream_channel/pubspec.yaml | 2 +- pkgs/stream_channel/test/isolate_channel_test.dart | 8 ++++++++ 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/pkgs/stream_channel/CHANGELOG.md b/pkgs/stream_channel/CHANGELOG.md index 893218875..c78f64f0e 100644 --- a/pkgs/stream_channel/CHANGELOG.md +++ b/pkgs/stream_channel/CHANGELOG.md @@ -1,7 +1,9 @@ -## 2.1.2-dev +## 2.1.2 * Require Dart 2.19 * Add an example. +* Fix a race condition in `IsolateChannel.connectReceive()` where the channel + could hang forever if its sink was closed before the connection was established. ## 2.1.1 diff --git a/pkgs/stream_channel/lib/src/isolate_channel.dart b/pkgs/stream_channel/lib/src/isolate_channel.dart index 55c98143f..3fbd46d23 100644 --- a/pkgs/stream_channel/lib/src/isolate_channel.dart +++ b/pkgs/stream_channel/lib/src/isolate_channel.dart @@ -43,16 +43,26 @@ class IsolateChannel extends StreamChannelMixin { factory IsolateChannel.connectReceive(ReceivePort receivePort) { // We can't use a [StreamChannelCompleter] here because we need the return // value to be an [IsolateChannel]. + var isCompleted = false; var streamCompleter = StreamCompleter(); var sinkCompleter = StreamSinkCompleter(); - var channel = - IsolateChannel._(streamCompleter.stream, sinkCompleter.sink); + + var channel = IsolateChannel._(streamCompleter.stream, sinkCompleter.sink + .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) { + if (!isCompleted) { + receivePort.close(); + streamCompleter.setSourceStream(Stream.empty()); + sinkCompleter.setDestinationSink(NullStreamSink()); + } + sink.close(); + }))); // The first message across the ReceivePort should be a SendPort pointing to // the remote end. If it's not, we'll make the stream emit an error // complaining. late StreamSubscription subscription; subscription = receivePort.listen((message) { + isCompleted = true; if (message is SendPort) { var controller = StreamChannelController(allowForeignErrors: false, sync: true); diff --git a/pkgs/stream_channel/pubspec.yaml b/pkgs/stream_channel/pubspec.yaml index 5eb57ae41..0b4f62d4e 100644 --- a/pkgs/stream_channel/pubspec.yaml +++ b/pkgs/stream_channel/pubspec.yaml @@ -1,5 +1,5 @@ name: stream_channel -version: 2.1.2-dev +version: 2.1.2 description: >- An abstraction for two-way communication channels based on the Dart Stream class. diff --git a/pkgs/stream_channel/test/isolate_channel_test.dart b/pkgs/stream_channel/test/isolate_channel_test.dart index 1850664cf..10f1fe53e 100644 --- a/pkgs/stream_channel/test/isolate_channel_test.dart +++ b/pkgs/stream_channel/test/isolate_channel_test.dart @@ -162,5 +162,13 @@ void main() { expect(connectedChannel.stream.toList(), throwsStateError); expect(connectedChannel.sink.done, completes); }); + + test('the receiving channel closes gracefully without a connection', + () async { + var connectedChannel = IsolateChannel.connectReceive(connectPort); + await connectedChannel.sink.close(); + await expectLater(connectedChannel.stream.toList(), completion(isEmpty)); + await expectLater(connectedChannel.sink.done, completes); + }); }); }