diff --git a/CHANGELOG.md b/CHANGELOG.md index 8932188..c78f64f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ -## 2.1.2-dev +## 2.1.2 * Require Dart 2.19 * Add an example. +* Fix a race condition in `IsolateChannel.connectReceive()` where the channel + could hang forever if its sink was closed before the connection was established. ## 2.1.1 diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart index 55c9814..3fbd46d 100644 --- a/lib/src/isolate_channel.dart +++ b/lib/src/isolate_channel.dart @@ -43,16 +43,26 @@ class IsolateChannel extends StreamChannelMixin { factory IsolateChannel.connectReceive(ReceivePort receivePort) { // We can't use a [StreamChannelCompleter] here because we need the return // value to be an [IsolateChannel]. + var isCompleted = false; var streamCompleter = StreamCompleter(); var sinkCompleter = StreamSinkCompleter(); - var channel = - IsolateChannel._(streamCompleter.stream, sinkCompleter.sink); + + var channel = IsolateChannel._(streamCompleter.stream, sinkCompleter.sink + .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) { + if (!isCompleted) { + receivePort.close(); + streamCompleter.setSourceStream(Stream.empty()); + sinkCompleter.setDestinationSink(NullStreamSink()); + } + sink.close(); + }))); // The first message across the ReceivePort should be a SendPort pointing to // the remote end. If it's not, we'll make the stream emit an error // complaining. late StreamSubscription subscription; subscription = receivePort.listen((message) { + isCompleted = true; if (message is SendPort) { var controller = StreamChannelController(allowForeignErrors: false, sync: true); diff --git a/pubspec.yaml b/pubspec.yaml index 5eb57ae..0b4f62d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: stream_channel -version: 2.1.2-dev +version: 2.1.2 description: >- An abstraction for two-way communication channels based on the Dart Stream class. diff --git a/test/isolate_channel_test.dart b/test/isolate_channel_test.dart index 1850664..10f1fe5 100644 --- a/test/isolate_channel_test.dart +++ b/test/isolate_channel_test.dart @@ -162,5 +162,13 @@ void main() { expect(connectedChannel.stream.toList(), throwsStateError); expect(connectedChannel.sink.done, completes); }); + + test('the receiving channel closes gracefully without a connection', + () async { + var connectedChannel = IsolateChannel.connectReceive(connectPort); + await connectedChannel.sink.close(); + await expectLater(connectedChannel.stream.toList(), completion(isEmpty)); + await expectLater(connectedChannel.sink.done, completes); + }); }); }