Make MultiChannel follow the stream channel rules.
It was written before the rules were fully fleshed-out, and I forgot to
update it to comply.
R=tjblasi@google.com
Review URL: https://codereview.chromium.org//1686263002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 05a83e0..b1b7350 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,8 @@
* Make `IsolateChannel` slightly more efficient.
+* Make `MultiChannel` follow the stream channel rules.
+
## 1.3.0
* Add `Disconnector`, a transformer that allows the caller to disconnect the
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
index cba592a..112997e 100644
--- a/lib/src/multi_channel.dart
+++ b/lib/src/multi_channel.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:async/async.dart';
+
import '../stream_channel.dart';
/// A class that multiplexes multiple virtual channels across a single
@@ -85,21 +87,15 @@
/// The subscription to [_inner.stream].
StreamSubscription _innerStreamSubscription;
- Stream get stream => _streamController.stream;
- final _streamController = new StreamController(sync: true);
+ Stream get stream => _mainController.foreign.stream;
+ StreamSink get sink => _mainController.foreign.sink;
- StreamSink get sink => _sinkController.sink;
- final _sinkController = new StreamController(sync: true);
+ /// The controller for this channel.
+ final _mainController = new StreamChannelController(sync: true);
- /// A map from virtual channel ids to [StreamController]s that should be used
- /// to write messages received from those channels.
- final _streamControllers = new Map<int, StreamController>();
-
- /// A map from virtual channel ids to [StreamControllers]s that are used
- /// to receive messages to write to those channels.
- ///
- /// Note that this uses the same keys as [_streamControllers].
- final _sinkControllers = new Map<int, StreamController>();
+ /// A map from virtual channel ids to [StreamChannelController]s that should
+ /// be used to communicate over those channels.
+ final _controllers = <int, StreamChannelController>{};
/// The next id to use for a local virtual channel.
///
@@ -125,35 +121,34 @@
_MultiChannel(this._inner) {
// The default connection is a special case which has id 0 on both ends.
// This allows it to begin connected without having to send over an id.
- _streamControllers[0] = _streamController;
- _sinkControllers[0] = _sinkController;
- _sinkController.stream.listen(
+ _controllers[0] = _mainController;
+ _mainController.local.stream.listen(
(message) => _inner.sink.add([0, message]),
onDone: () => _closeChannel(0, 0));
_innerStreamSubscription = _inner.stream.listen((message) {
var id = message[0];
- var sink = _streamControllers[id];
+ var controller = _controllers[id];
- // A sink might not exist if the channel was closed before an incoming
- // message was processed.
- if (sink == null) return;
+ // A controller might not exist if the channel was closed before an
+ // incoming message was processed.
+ if (controller == null) return;
if (message.length > 1) {
- sink.add(message[1]);
+ controller.local.sink.add(message[1]);
return;
}
- // A message without data indicates that the channel has been closed.
- _sinkControllers[id].close();
- }, onDone: _closeInnerChannel,
- onError: _streamController.addError);
+ // A message without data indicates that the channel has been closed. We
+ // can only close the sink here without doing any more cleanup, because
+ // the sink closing will cause the stream to emit a done event which will
+ // trigger more cleanup.
+ controller.local.sink.close();
+ },
+ onDone: _closeInnerChannel,
+ onError: _mainController.local.sink.addError);
}
VirtualChannel virtualChannel([id]) {
- if (_inner == null) {
- throw new StateError("The underlying channel is closed.");
- }
-
var inputId;
var outputId;
if (id != null) {
@@ -171,34 +166,39 @@
_nextId += 2;
}
- if (_streamControllers.containsKey(inputId)) {
+ // If the inner channel has already closed, create new virtual channels in a
+ // closed state.
+ if (_inner == null) {
+ return new VirtualChannel._(
+ this, inputId, new Stream.empty(), new NullStreamSink());
+ }
+
+ if (_controllers.containsKey(inputId)) {
throw new ArgumentError("A virtual channel with id $id already exists.");
}
- var streamController = new StreamController(sync: true);
- var sinkController = new StreamController(sync: true);
- _streamControllers[inputId] = streamController;
- _sinkControllers[inputId] = sinkController;
- sinkController.stream.listen(
+ var controller = new StreamChannelController(sync: true);
+ _controllers[inputId] = controller;
+ controller.local.stream.listen(
(message) => _inner.sink.add([outputId, message]),
onDone: () => _closeChannel(inputId, outputId));
return new VirtualChannel._(
- this, outputId, streamController.stream, sinkController.sink);
+ this, outputId, controller.foreign.stream, controller.foreign.sink);
}
/// Closes the virtual channel for which incoming messages have [inputId] and
/// outgoing messages have [outputId].
void _closeChannel(int inputId, int outputId) {
- _streamControllers.remove(inputId).close();
- _sinkControllers.remove(inputId).close();
+ var controller = _controllers.remove(inputId);
+ controller.local.sink.close();
if (_inner == null) return;
// A message without data indicates that the virtual channel has been
// closed.
_inner.sink.add([outputId]);
- if (_streamControllers.isEmpty) _closeInnerChannel();
+ if (_controllers.isEmpty) _closeInnerChannel();
}
/// Closes the underlying communication channel.
@@ -206,9 +206,13 @@
_inner.sink.close();
_innerStreamSubscription.cancel();
_inner = null;
- for (var controller in _sinkControllers.values.toList()) {
- controller.close();
+
+ // Convert this to a list because the close is dispatched synchronously, and
+ // that could conceivably remove a controller from [_controllers].
+ for (var controller in new List.from(_controllers.values)) {
+ controller.local.sink.close();
}
+ _controllers.clear();
}
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 930f744..c38c167 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 1.3.1-dev
+version: 1.3.1
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart
index cc9ed1d..5be2505 100644
--- a/test/multi_channel_test.dart
+++ b/test/multi_channel_test.dart
@@ -10,17 +10,13 @@
import 'utils.dart';
void main() {
- var oneToTwo;
- var twoToOne;
+ var controller;
var channel1;
var channel2;
setUp(() {
- oneToTwo = new StreamController();
- twoToOne = new StreamController();
- channel1 = new MultiChannel(
- new StreamChannel(twoToOne.stream, oneToTwo.sink));
- channel2 = new MultiChannel(
- new StreamChannel(oneToTwo.stream, twoToOne.sink));
+ controller = new StreamChannelController();
+ channel1 = new MultiChannel(controller.local);
+ channel2 = new MultiChannel(controller.foreign);
});
group("the default virtual channel", () {
@@ -66,16 +62,16 @@
test("closes the underlying channel when it closes without any other "
"virtual channels", () {
- expect(oneToTwo.done, completes);
- expect(twoToOne.done, completes);
+ expect(controller.local.sink.done, completes);
+ expect(controller.foreign.sink.done, completes);
channel1.sink.close();
});
test("doesn't close the underlying channel when it closes with other "
"virtual channels", () {
- oneToTwo.done.then(expectAsync((_) {}, count: 0));
- twoToOne.done.then(expectAsync((_) {}, count: 0));
+ controller.local.sink.done.then(expectAsync((_) {}, count: 0));
+ controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
// Establish another virtual connection which should keep the underlying
// connection open.
@@ -149,16 +145,16 @@
channel1.sink.close();
await channel2.stream.toList();
- expect(oneToTwo.done, completes);
- expect(twoToOne.done, completes);
+ expect(controller.local.sink.done, completes);
+ expect(controller.foreign.sink.done, completes);
virtual1.sink.close();
});
test("doesn't close the underlying channel when it closes with other "
"virtual channels", () {
- oneToTwo.done.then(expectAsync((_) {}, count: 0));
- twoToOne.done.then(expectAsync((_) {}, count: 0));
+ controller.local.sink.done.then(expectAsync((_) {}, count: 0));
+ controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
virtual1.sink.close();
@@ -246,16 +242,16 @@
channel2.sink.close();
await channel1.stream.toList();
- expect(oneToTwo.done, completes);
- expect(twoToOne.done, completes);
+ expect(controller.local.sink.done, completes);
+ expect(controller.foreign.sink.done, completes);
virtual2.sink.close();
});
test("doesn't close the underlying channel when it closes with other "
"virtual channels", () {
- oneToTwo.done.then(expectAsync((_) {}, count: 0));
- twoToOne.done.then(expectAsync((_) {}, count: 0));
+ controller.local.sink.done.then(expectAsync((_) {}, count: 0));
+ controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
virtual2.sink.close();
@@ -288,16 +284,24 @@
expect(virtual2.stream.toList(), completion(isEmpty));
expect(virtual2.sink.done, completes);
- oneToTwo.close();
+ controller.local.sink.close();
});
- test("closes, no more virtual channels may be created", () {
- expect(channel1.sink.done.then((_) => channel1.virtualChannel()),
- throwsStateError);
- expect(channel2.sink.done.then((_) => channel2.virtualChannel()),
- throwsStateError);
+ test("closes, more virtual channels are created closed", () async {
+ channel2.sink.close();
+ virtual2.sink.close();
- oneToTwo.close();
+ // Wait for the existing channels to emit done events.
+ await channel1.stream.toList();
+ await virtual1.stream.toList();
+
+ var virtual = channel1.virtualChannel();
+ expect(virtual.stream.toList(), completion(isEmpty));
+ expect(virtual.sink.done, completes);
+
+ virtual = channel1.virtualChannel();
+ expect(virtual.stream.toList(), completion(isEmpty));
+ expect(virtual.sink.done, completes);
});
test("emits an error, the error is sent only to the default channel", () {
@@ -306,7 +310,134 @@
virtual1.stream.listen(expectAsync((_) {}, count: 0),
onError: expectAsync((_) {}, count: 0));
- twoToOne.addError("oh no");
+ controller.foreign.sink.addError("oh no");
+ });
+ });
+
+ group("stream channel rules", () {
+ group("for the main stream:", () {
+ test("closing the sink causes the stream to close before it emits any more "
+ "events", () {
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+
+ channel2.stream.listen(expectAsync((message) {
+ expect(message, equals(1));
+ channel2.sink.close();
+ }, count: 1));
+ });
+
+ test("after the stream closes, the sink ignores events", () async {
+ channel1.sink.close();
+
+ // Wait for the done event to be delivered.
+ await channel2.stream.toList();
+ channel2.sink.add(1);
+ channel2.sink.add(2);
+ channel2.sink.add(3);
+ channel2.sink.close();
+
+ // None of our channel.sink additions should make it to the other endpoint.
+ channel1.stream.listen(expectAsync((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("canceling the stream's subscription has no effect on the sink",
+ () async {
+ channel1.stream.listen(null).cancel();
+ await pumpEventQueue();
+
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+ channel1.sink.close();
+ expect(channel2.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("canceling the stream's subscription doesn't stop a done event",
+ () async {
+ channel1.stream.listen(null).cancel();
+ await pumpEventQueue();
+
+ channel2.sink.close();
+ await pumpEventQueue();
+
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+ channel1.sink.close();
+
+ // The sink should be ignoring events because the channel closed.
+ channel2.stream.listen(expectAsync((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+ });
+
+ group("for a virtual channel:", () {
+ var virtual1;
+ var virtual2;
+ setUp(() {
+ virtual1 = channel1.virtualChannel();
+ virtual2 = channel2.virtualChannel(virtual1.id);
+ });
+
+ test("closing the sink causes the stream to close before it emits any more "
+ "events", () {
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
+ virtual1.sink.add(3);
+
+ virtual2.stream.listen(expectAsync((message) {
+ expect(message, equals(1));
+ virtual2.sink.close();
+ }, count: 1));
+ });
+
+ test("after the stream closes, the sink ignores events", () async {
+ virtual1.sink.close();
+
+ // Wait for the done event to be delivered.
+ await virtual2.stream.toList();
+ virtual2.sink.add(1);
+ virtual2.sink.add(2);
+ virtual2.sink.add(3);
+ virtual2.sink.close();
+
+ // None of our virtual.sink additions should make it to the other endpoint.
+ virtual1.stream.listen(expectAsync((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("canceling the stream's subscription has no effect on the sink",
+ () async {
+ virtual1.stream.listen(null).cancel();
+ await pumpEventQueue();
+
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
+ virtual1.sink.add(3);
+ virtual1.sink.close();
+ expect(virtual2.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("canceling the stream's subscription doesn't stop a done event",
+ () async {
+ virtual1.stream.listen(null).cancel();
+ await pumpEventQueue();
+
+ virtual2.sink.close();
+ await pumpEventQueue();
+
+ virtual1.sink.add(1);
+ virtual1.sink.add(2);
+ virtual1.sink.add(3);
+ virtual1.sink.close();
+
+ // The sink should be ignoring events because the stream closed.
+ virtual2.stream.listen(expectAsync((_) {}, count: 0));
+ await pumpEventQueue();
+ });
});
});
}