Skip to content

Commit

Permalink
Fix a race condition in IsolateChannel.connectReceive() (dart-lang/st…
Browse files Browse the repository at this point in the history
  • Loading branch information
nex3 authored Jun 20, 2023
1 parent eedb369 commit 67ed073
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 4 deletions.
4 changes: 3 additions & 1 deletion pkgs/stream_channel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
## 2.1.2-dev
## 2.1.2

* Require Dart 2.19
* Add an example.
* Fix a race condition in `IsolateChannel.connectReceive()` where the channel
could hang forever if its sink was closed before the connection was established.

## 2.1.1

Expand Down
14 changes: 12 additions & 2 deletions pkgs/stream_channel/lib/src/isolate_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,26 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
factory IsolateChannel.connectReceive(ReceivePort receivePort) {
// We can't use a [StreamChannelCompleter] here because we need the return
// value to be an [IsolateChannel].
var isCompleted = false;
var streamCompleter = StreamCompleter<T>();
var sinkCompleter = StreamSinkCompleter<T>();
var channel =
IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink);

var channel = IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink
.transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) {
if (!isCompleted) {
receivePort.close();
streamCompleter.setSourceStream(Stream.empty());
sinkCompleter.setDestinationSink(NullStreamSink<T>());
}
sink.close();
})));

// The first message across the ReceivePort should be a SendPort pointing to
// the remote end. If it's not, we'll make the stream emit an error
// complaining.
late StreamSubscription<dynamic> subscription;
subscription = receivePort.listen((message) {
isCompleted = true;
if (message is SendPort) {
var controller =
StreamChannelController<T>(allowForeignErrors: false, sync: true);
Expand Down
2 changes: 1 addition & 1 deletion pkgs/stream_channel/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: stream_channel
version: 2.1.2-dev
version: 2.1.2
description: >-
An abstraction for two-way communication channels based on the Dart Stream
class.
Expand Down
8 changes: 8 additions & 0 deletions pkgs/stream_channel/test/isolate_channel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,13 @@ void main() {
expect(connectedChannel.stream.toList(), throwsStateError);
expect(connectedChannel.sink.done, completes);
});

test('the receiving channel closes gracefully without a connection',
() async {
var connectedChannel = IsolateChannel.connectReceive(connectPort);
await connectedChannel.sink.close();
await expectLater(connectedChannel.stream.toList(), completion(isEmpty));
await expectLater(connectedChannel.sink.done, completes);
});
});
}

0 comments on commit 67ed073

Please sign in to comment.