Skip to content

Commit

Permalink
Remove optional new/const (#37)
Browse files Browse the repository at this point in the history
- Run `dartfmt --fix`.
- Update version to start working towards next breaking change.
- Update minimum SDK to 2.0.0 stable.
  • Loading branch information
natebosch authored Mar 26, 2019
1 parent e7de9f4 commit 1df5483
Show file tree
Hide file tree
Showing 21 changed files with 159 additions and 163 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.6.9

* Require `2.0.0` or newer SDK.
* Drop unnecessary `new` and `const`.

## 1.6.8

* Set max SDK version to `<3.0.0`, and adjust other dependencies.
Expand Down
4 changes: 2 additions & 2 deletions lib/src/close_guarantee_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class CloseGuaranteeChannel<T> extends StreamChannelMixin<T> {
bool _disconnected = false;

CloseGuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
_sink = new _CloseGuaranteeSink<T>(innerSink, this);
_stream = new _CloseGuaranteeStream<T>(innerStream, this);
_sink = _CloseGuaranteeSink<T>(innerSink, this);
_stream = _CloseGuaranteeStream<T>(innerStream, this);
}
}

Expand Down
22 changes: 11 additions & 11 deletions lib/src/disconnector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class Disconnector<T> implements StreamChannelTransformer<T, T> {
_sinks.clear();
return Future.wait(futures, eagerError: true);
});
final _disconnectMemo = new AsyncMemoizer();
final _disconnectMemo = AsyncMemoizer();

StreamChannel<T> bind(StreamChannel<T> channel) {
return channel.changeSink((innerSink) {
var sink = new _DisconnectorSink<T>(innerSink);
var sink = _DisconnectorSink<T>(innerSink);

if (isDisconnected) {
// Ignore errors here, because otherwise there would be no way for the
Expand Down Expand Up @@ -84,33 +84,33 @@ class _DisconnectorSink<T> implements StreamSink<T> {
_DisconnectorSink(this._inner);

void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_closed) throw StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
throw StateError("Cannot add event while adding stream.");
}
if (_isDisconnected) return;

_inner.add(data);
}

void addError(error, [StackTrace stackTrace]) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_closed) throw StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
throw StateError("Cannot add event while adding stream.");
}
if (_isDisconnected) return;

_inner.addError(error, stackTrace);
}

Future addStream(Stream<T> stream) {
if (_closed) throw new StateError("Cannot add stream after closing.");
if (_closed) throw StateError("Cannot add stream after closing.");
if (_inAddStream) {
throw new StateError("Cannot add stream while adding stream.");
throw StateError("Cannot add stream while adding stream.");
}
if (_isDisconnected) return new Future.value();
if (_isDisconnected) return Future.value();

_addStreamCompleter = new Completer.sync();
_addStreamCompleter = Completer.sync();
_addStreamSubscription = stream.listen(_inner.add,
onError: _inner.addError, onDone: _addStreamCompleter.complete);
return _addStreamCompleter.future.then((_) {
Expand All @@ -121,7 +121,7 @@ class _DisconnectorSink<T> implements StreamSink<T> {

Future close() {
if (_inAddStream) {
throw new StateError("Cannot close sink while adding stream.");
throw StateError("Cannot close sink while adding stream.");
}

_closed = true;
Expand Down
31 changes: 15 additions & 16 deletions lib/src/guarantee_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,17 @@ class GuaranteeChannel<T> extends StreamChannelMixin<T> {
bool _disconnected = false;

GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink,
{bool allowSinkErrors: true}) {
_sink =
new _GuaranteeSink<T>(innerSink, this, allowErrors: allowSinkErrors);
{bool allowSinkErrors = true}) {
_sink = _GuaranteeSink<T>(innerSink, this, allowErrors: allowSinkErrors);

// Enforce the single-subscription guarantee by changing a broadcast stream
// to single-subscription.
if (innerStream.isBroadcast) {
innerStream =
innerStream.transform(new SingleSubscriptionTransformer<T, T>());
innerStream.transform(SingleSubscriptionTransformer<T, T>());
}

_streamController = new StreamController<T>(
_streamController = StreamController<T>(
onListen: () {
// If the sink has disconnected, we've already called
// [_streamController.close].
Expand Down Expand Up @@ -80,7 +79,7 @@ class _GuaranteeSink<T> implements StreamSink<T> {
final GuaranteeChannel<T> _channel;

Future get done => _doneCompleter.future;
final _doneCompleter = new Completer();
final _doneCompleter = Completer();

/// Whether connection is disconnected.
///
Expand Down Expand Up @@ -108,23 +107,23 @@ class _GuaranteeSink<T> implements StreamSink<T> {
/// the underlying sink is closed.
final bool _allowErrors;

_GuaranteeSink(this._inner, this._channel, {bool allowErrors: true})
_GuaranteeSink(this._inner, this._channel, {bool allowErrors = true})
: _allowErrors = allowErrors;

void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_closed) throw StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
throw StateError("Cannot add event while adding stream.");
}
if (_disconnected) return;

_inner.add(data);
}

void addError(error, [StackTrace stackTrace]) {
if (_closed) throw new StateError("Cannot add event after closing.");
if (_closed) throw StateError("Cannot add event after closing.");
if (_inAddStream) {
throw new StateError("Cannot add event while adding stream.");
throw StateError("Cannot add event while adding stream.");
}
if (_disconnected) return;

Expand Down Expand Up @@ -153,13 +152,13 @@ class _GuaranteeSink<T> implements StreamSink<T> {
}

Future addStream(Stream<T> stream) {
if (_closed) throw new StateError("Cannot add stream after closing.");
if (_closed) throw StateError("Cannot add stream after closing.");
if (_inAddStream) {
throw new StateError("Cannot add stream while adding stream.");
throw StateError("Cannot add stream while adding stream.");
}
if (_disconnected) return new Future.value();
if (_disconnected) return Future.value();

_addStreamCompleter = new Completer.sync();
_addStreamCompleter = Completer.sync();
_addStreamSubscription = stream.listen(_inner.add,
onError: _addError, onDone: _addStreamCompleter.complete);
return _addStreamCompleter.future.then((_) {
Expand All @@ -170,7 +169,7 @@ class _GuaranteeSink<T> implements StreamSink<T> {

Future close() {
if (_inAddStream) {
throw new StateError("Cannot close sink while adding stream.");
throw StateError("Cannot close sink while adding stream.");
}

if (_closed) return done;
Expand Down
27 changes: 12 additions & 15 deletions lib/src/isolate_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,20 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
factory IsolateChannel.connectReceive(ReceivePort receivePort) {
// We can't use a [StreamChannelCompleter] here because we need the return
// value to be an [IsolateChannel].
var streamCompleter = new StreamCompleter<T>();
var sinkCompleter = new StreamSinkCompleter<T>();
var streamCompleter = StreamCompleter<T>();
var sinkCompleter = StreamSinkCompleter<T>();
var channel =
new IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink);
IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink);

// The first message across the ReceivePort should be a SendPort pointing to
// the remote end. If it's not, we'll make the stream emit an error
// complaining.
StreamSubscription<dynamic> subscription;
subscription = receivePort.listen((message) {
if (message is SendPort) {
var controller = new StreamChannelController<T>(
allowForeignErrors: false, sync: true);
new SubscriptionStream(subscription)
.cast<T>()
.pipe(controller.local.sink);
var controller =
StreamChannelController<T>(allowForeignErrors: false, sync: true);
SubscriptionStream(subscription).cast<T>().pipe(controller.local.sink);
controller.local.stream
.listen((data) => message.send(data), onDone: receivePort.close);

Expand All @@ -66,9 +64,9 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
}

streamCompleter.setError(
new StateError('Unexpected Isolate response "$message".'),
StateError('Unexpected Isolate response "$message".'),
StackTrace.current);
sinkCompleter.setDestinationSink(new NullStreamSink<T>());
sinkCompleter.setDestinationSink(NullStreamSink<T>());
subscription.cancel();
});

Expand All @@ -85,21 +83,20 @@ class IsolateChannel<T> extends StreamChannelMixin<T> {
/// The connection protocol is guaranteed to remain compatible across versions
/// at least until the next major version release.
factory IsolateChannel.connectSend(SendPort sendPort) {
var receivePort = new ReceivePort();
var receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
return new IsolateChannel(receivePort, sendPort);
return IsolateChannel(receivePort, sendPort);
}

/// Creates a stream channel that receives messages from [receivePort] and
/// sends them over [sendPort].
factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
var controller =
new StreamChannelController<T>(allowForeignErrors: false, sync: true);
StreamChannelController<T>(allowForeignErrors: false, sync: true);
receivePort.cast<T>().pipe(controller.local.sink);
controller.local.stream
.listen((data) => sendPort.send(data), onDone: receivePort.close);
return new IsolateChannel._(
controller.foreign.stream, controller.foreign.sink);
return IsolateChannel._(controller.foreign.stream, controller.foreign.sink);
}

IsolateChannel._(this.stream, this.sink);
Expand Down
8 changes: 4 additions & 4 deletions lib/src/json_document_transformer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import '../stream_channel.dart';
import 'stream_channel_transformer.dart';

/// The canonical instance of [JsonDocumentTransformer].
final jsonDocument = new JsonDocumentTransformer();
final jsonDocument = JsonDocumentTransformer();

/// A [StreamChannelTransformer] that transforms JSON documents—strings that
/// contain individual objects encoded as JSON—into decoded Dart objects.
Expand All @@ -31,16 +31,16 @@ class JsonDocumentTransformer
/// The [reviver] and [toEncodable] arguments work the same way as the
/// corresponding arguments to [new JsonCodec].
JsonDocumentTransformer({reviver(key, value), toEncodable(object)})
: _codec = new JsonCodec(reviver: reviver, toEncodable: toEncodable);
: _codec = JsonCodec(reviver: reviver, toEncodable: toEncodable);

JsonDocumentTransformer._(this._codec);

StreamChannel<Object> bind(StreamChannel<String> channel) {
var stream = channel.stream.map(_codec.decode);
var sink = new StreamSinkTransformer<Object, String>.fromHandlers(
var sink = StreamSinkTransformer<Object, String>.fromHandlers(
handleData: (data, sink) {
sink.add(_codec.encode(data));
}).bind(channel.sink);
return new StreamChannel.withCloseGuarantee(stream, sink);
return StreamChannel.withCloseGuarantee(stream, sink);
}
}
22 changes: 10 additions & 12 deletions lib/src/multi_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ abstract class MultiChannel<T> implements StreamChannel<T> {
/// [inner].
///
/// The inner channel must take JSON-like objects.
factory MultiChannel(StreamChannel<dynamic> inner) =>
new _MultiChannel<T>(inner);
factory MultiChannel(StreamChannel<dynamic> inner) => _MultiChannel<T>(inner);

/// Creates a new virtual channel.
///
Expand Down Expand Up @@ -93,19 +92,19 @@ class _MultiChannel<T> extends StreamChannelMixin<T>
StreamSink<T> get sink => _mainController.foreign.sink;

/// The controller for this channel.
final _mainController = new StreamChannelController<T>(sync: true);
final _mainController = StreamChannelController<T>(sync: true);

/// A map from input IDs to [StreamChannelController]s that should be used to
/// communicate over those channels.
final _controllers = <int, StreamChannelController<T>>{};

/// Input IDs of controllers in [_controllers] that we've received messages
/// for but that have not yet had a local [virtualChannel] created.
final _pendingIds = new Set<int>();
final _pendingIds = Set<int>();

/// Input IDs of virtual channels that used to exist but have since been
/// closed.
final _closedIds = new Set<int>();
final _closedIds = Set<int>();

/// The next id to use for a local virtual channel.
///
Expand Down Expand Up @@ -149,7 +148,7 @@ class _MultiChannel<T> extends StreamChannelMixin<T>
// counterpart yet, create a controller for it to buffer incoming
// messages for when a local connection is created.
_pendingIds.add(id);
return new StreamChannelController(sync: true);
return StreamChannelController(sync: true);
});

if (message.length > 1) {
Expand Down Expand Up @@ -187,8 +186,7 @@ class _MultiChannel<T> extends StreamChannelMixin<T>
// If the inner channel has already closed, create new virtual channels in a
// closed state.
if (_inner == null) {
return new VirtualChannel._(
this, inputId, new Stream.empty(), new NullStreamSink());
return VirtualChannel._(this, inputId, Stream.empty(), NullStreamSink());
}

StreamChannelController<T> controller;
Expand All @@ -198,16 +196,16 @@ class _MultiChannel<T> extends StreamChannelMixin<T>
controller = _controllers[inputId];
} else if (_controllers.containsKey(inputId) ||
_closedIds.contains(inputId)) {
throw new ArgumentError("A virtual channel with id $id already exists.");
throw ArgumentError("A virtual channel with id $id already exists.");
} else {
controller = new StreamChannelController(sync: true);
controller = StreamChannelController(sync: true);
_controllers[inputId] = controller;
}

controller.local.stream.listen(
(message) => _inner.sink.add([outputId, message]),
onDone: () => _closeChannel(inputId, outputId));
return new VirtualChannel._(
return VirtualChannel._(
this, outputId, controller.foreign.stream, controller.foreign.sink);
}

Expand All @@ -234,7 +232,7 @@ class _MultiChannel<T> extends StreamChannelMixin<T>

// Convert this to a list because the close is dispatched synchronously, and
// that could conceivably remove a controller from [_controllers].
for (var controller in new List.from(_controllers.values)) {
for (var controller in List.from(_controllers.values)) {
controller.local.sink.close();
}
_controllers.clear();
Expand Down
Loading

0 comments on commit 1df5483

Please sign in to comment.