Make MultiChannel follow the stream channel rules.

It was written before the rules were fully fleshed-out, and I forgot to
update it to comply.

R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1686263002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 05a83e0..b1b7350 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,8 @@
 
 * Make `IsolateChannel` slightly more efficient.
 
+* Make `MultiChannel` follow the stream channel rules.
+
 ## 1.3.0
 
 * Add `Disconnector`, a transformer that allows the caller to disconnect the
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
index cba592a..112997e 100644
--- a/lib/src/multi_channel.dart
+++ b/lib/src/multi_channel.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import 'package:async/async.dart';
+
 import '../stream_channel.dart';
 
 /// A class that multiplexes multiple virtual channels across a single
@@ -85,21 +87,15 @@
   /// The subscription to [_inner.stream].
   StreamSubscription _innerStreamSubscription;
 
-  Stream get stream => _streamController.stream;
-  final _streamController = new StreamController(sync: true);
+  Stream get stream => _mainController.foreign.stream;
+  StreamSink get sink => _mainController.foreign.sink;
 
-  StreamSink get sink => _sinkController.sink;
-  final _sinkController = new StreamController(sync: true);
+  /// The controller for this channel.
+  final _mainController = new StreamChannelController(sync: true);
 
-  /// A map from virtual channel ids to [StreamController]s that should be used
-  /// to write messages received from those channels.
-  final _streamControllers = new Map<int, StreamController>();
-
-  /// A map from virtual channel ids to [StreamControllers]s that are used
-  /// to receive messages to write to those channels.
-  ///
-  /// Note that this uses the same keys as [_streamControllers].
-  final _sinkControllers = new Map<int, StreamController>();
+  /// A map from virtual channel ids to [StreamChannelController]s that should
+  /// be used to communicate over those channels.
+  final _controllers = <int, StreamChannelController>{};
 
   /// The next id to use for a local virtual channel.
   ///
@@ -125,35 +121,34 @@
   _MultiChannel(this._inner) {
     // The default connection is a special case which has id 0 on both ends.
     // This allows it to begin connected without having to send over an id.
-    _streamControllers[0] = _streamController;
-    _sinkControllers[0] = _sinkController;
-    _sinkController.stream.listen(
+    _controllers[0] = _mainController;
+    _mainController.local.stream.listen(
         (message) => _inner.sink.add([0, message]),
         onDone: () => _closeChannel(0, 0));
 
     _innerStreamSubscription = _inner.stream.listen((message) {
       var id = message[0];
-      var sink = _streamControllers[id];
+      var controller = _controllers[id];
 
-      // A sink might not exist if the channel was closed before an incoming
-      // message was processed.
-      if (sink == null) return;
+      // A controller might not exist if the channel was closed before an
+      // incoming message was processed.
+      if (controller == null) return;
       if (message.length > 1) {
-        sink.add(message[1]);
+        controller.local.sink.add(message[1]);
         return;
       }
 
-      // A message without data indicates that the channel has been closed.
-      _sinkControllers[id].close();
-    }, onDone: _closeInnerChannel,
-        onError: _streamController.addError);
+      // A message without data indicates that the channel has been closed. We
+      // can only close the sink here without doing any more cleanup, because
+      // the sink closing will cause the stream to emit a done event which will
+      // trigger more cleanup.
+      controller.local.sink.close();
+    },
+        onDone: _closeInnerChannel,
+        onError: _mainController.local.sink.addError);
   }
 
   VirtualChannel virtualChannel([id]) {
-    if (_inner == null) {
-      throw new StateError("The underlying channel is closed.");
-    }
-
     var inputId;
     var outputId;
     if (id != null) {
@@ -171,34 +166,39 @@
       _nextId += 2;
     }
 
-    if (_streamControllers.containsKey(inputId)) {
+    // If the inner channel has already closed, create new virtual channels in a
+    // closed state.
+    if (_inner == null) {
+      return new VirtualChannel._(
+          this, inputId, new Stream.empty(), new NullStreamSink());
+    }
+
+    if (_controllers.containsKey(inputId)) {
       throw new ArgumentError("A virtual channel with id $id already exists.");
     }
 
-    var streamController = new StreamController(sync: true);
-    var sinkController = new StreamController(sync: true);
-    _streamControllers[inputId] = streamController;
-    _sinkControllers[inputId] = sinkController;
-    sinkController.stream.listen(
+    var controller = new StreamChannelController(sync: true);
+    _controllers[inputId] = controller;
+    controller.local.stream.listen(
         (message) => _inner.sink.add([outputId, message]),
         onDone: () => _closeChannel(inputId, outputId));
 
     return new VirtualChannel._(
-        this, outputId, streamController.stream, sinkController.sink);
+        this, outputId, controller.foreign.stream, controller.foreign.sink);
   }
 
   /// Closes the virtual channel for which incoming messages have [inputId] and
   /// outgoing messages have [outputId].
   void _closeChannel(int inputId, int outputId) {
-    _streamControllers.remove(inputId).close();
-    _sinkControllers.remove(inputId).close();
+    var controller = _controllers.remove(inputId);
+    controller.local.sink.close();
 
     if (_inner == null) return;
 
     // A message without data indicates that the virtual channel has been
     // closed.
     _inner.sink.add([outputId]);
-    if (_streamControllers.isEmpty) _closeInnerChannel();
+    if (_controllers.isEmpty) _closeInnerChannel();
   }
 
   /// Closes the underlying communication channel.
@@ -206,9 +206,13 @@
     _inner.sink.close();
     _innerStreamSubscription.cancel();
     _inner = null;
-    for (var controller in _sinkControllers.values.toList()) {
-      controller.close();
+
+    // Convert this to a list because the close is dispatched synchronously, and
+    // that could conceivably remove a controller from [_controllers].
+    for (var controller in new List.from(_controllers.values)) {
+      controller.local.sink.close();
     }
+    _controllers.clear();
   }
 }
 
diff --git a/pubspec.yaml b/pubspec.yaml
index 930f744..c38c167 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.3.1-dev
+version: 1.3.1
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart
index cc9ed1d..5be2505 100644
--- a/test/multi_channel_test.dart
+++ b/test/multi_channel_test.dart
@@ -10,17 +10,13 @@
 import 'utils.dart';
 
 void main() {
-  var oneToTwo;
-  var twoToOne;
+  var controller;
   var channel1;
   var channel2;
   setUp(() {
-    oneToTwo = new StreamController();
-    twoToOne = new StreamController();
-    channel1 = new MultiChannel(
-        new StreamChannel(twoToOne.stream, oneToTwo.sink));
-    channel2 = new MultiChannel(
-        new StreamChannel(oneToTwo.stream, twoToOne.sink));
+    controller = new StreamChannelController();
+    channel1 = new MultiChannel(controller.local);
+    channel2 = new MultiChannel(controller.foreign);
   });
 
   group("the default virtual channel", () {
@@ -66,16 +62,16 @@
 
     test("closes the underlying channel when it closes without any other "
         "virtual channels", () {
-      expect(oneToTwo.done, completes);
-      expect(twoToOne.done, completes);
+      expect(controller.local.sink.done, completes);
+      expect(controller.foreign.sink.done, completes);
 
       channel1.sink.close();
     });
 
     test("doesn't close the underlying channel when it closes with other "
         "virtual channels", () {
-      oneToTwo.done.then(expectAsync((_) {}, count: 0));
-      twoToOne.done.then(expectAsync((_) {}, count: 0));
+      controller.local.sink.done.then(expectAsync((_) {}, count: 0));
+      controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
 
       // Establish another virtual connection which should keep the underlying
       // connection open.
@@ -149,16 +145,16 @@
       channel1.sink.close();
 
       await channel2.stream.toList();
-      expect(oneToTwo.done, completes);
-      expect(twoToOne.done, completes);
+      expect(controller.local.sink.done, completes);
+      expect(controller.foreign.sink.done, completes);
 
       virtual1.sink.close();
     });
 
     test("doesn't close the underlying channel when it closes with other "
         "virtual channels", () {
-      oneToTwo.done.then(expectAsync((_) {}, count: 0));
-      twoToOne.done.then(expectAsync((_) {}, count: 0));
+      controller.local.sink.done.then(expectAsync((_) {}, count: 0));
+      controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
 
       virtual1.sink.close();
 
@@ -246,16 +242,16 @@
       channel2.sink.close();
 
       await channel1.stream.toList();
-      expect(oneToTwo.done, completes);
-      expect(twoToOne.done, completes);
+      expect(controller.local.sink.done, completes);
+      expect(controller.foreign.sink.done, completes);
 
       virtual2.sink.close();
     });
 
     test("doesn't close the underlying channel when it closes with other "
         "virtual channels", () {
-      oneToTwo.done.then(expectAsync((_) {}, count: 0));
-      twoToOne.done.then(expectAsync((_) {}, count: 0));
+      controller.local.sink.done.then(expectAsync((_) {}, count: 0));
+      controller.foreign.sink.done.then(expectAsync((_) {}, count: 0));
 
       virtual2.sink.close();
 
@@ -288,16 +284,24 @@
       expect(virtual2.stream.toList(), completion(isEmpty));
       expect(virtual2.sink.done, completes);
 
-      oneToTwo.close();
+      controller.local.sink.close();
     });
 
-    test("closes, no more virtual channels may be created", () {
-      expect(channel1.sink.done.then((_) => channel1.virtualChannel()),
-          throwsStateError);
-      expect(channel2.sink.done.then((_) => channel2.virtualChannel()),
-          throwsStateError);
+    test("closes, more virtual channels are created closed", () async {
+      channel2.sink.close();
+      virtual2.sink.close();
 
-      oneToTwo.close();
+      // Wait for the existing channels to emit done events.
+      await channel1.stream.toList();
+      await virtual1.stream.toList();
+
+      var virtual = channel1.virtualChannel();
+      expect(virtual.stream.toList(), completion(isEmpty));
+      expect(virtual.sink.done, completes);
+
+      virtual = channel1.virtualChannel();
+      expect(virtual.stream.toList(), completion(isEmpty));
+      expect(virtual.sink.done, completes);
     });
 
     test("emits an error, the error is sent only to the default channel", () {
@@ -306,7 +310,134 @@
       virtual1.stream.listen(expectAsync((_) {}, count: 0),
           onError: expectAsync((_) {}, count: 0));
 
-      twoToOne.addError("oh no");
+      controller.foreign.sink.addError("oh no");
+    });
+  });
+
+  group("stream channel rules", () {
+    group("for the main stream:", () {
+      test("closing the sink causes the stream to close before it emits any more "
+          "events", () {
+        channel1.sink.add(1);
+        channel1.sink.add(2);
+        channel1.sink.add(3);
+
+        channel2.stream.listen(expectAsync((message) {
+          expect(message, equals(1));
+          channel2.sink.close();
+        }, count: 1));
+      });
+
+      test("after the stream closes, the sink ignores events", () async {
+        channel1.sink.close();
+
+        // Wait for the done event to be delivered.
+        await channel2.stream.toList();
+        channel2.sink.add(1);
+        channel2.sink.add(2);
+        channel2.sink.add(3);
+        channel2.sink.close();
+
+        // None of our channel.sink additions should make it to the other endpoint.
+        channel1.stream.listen(expectAsync((_) {}, count: 0));
+        await pumpEventQueue();
+      });
+
+      test("canceling the stream's subscription has no effect on the sink",
+          () async {
+        channel1.stream.listen(null).cancel();
+        await pumpEventQueue();
+
+        channel1.sink.add(1);
+        channel1.sink.add(2);
+        channel1.sink.add(3);
+        channel1.sink.close();
+        expect(channel2.stream.toList(), completion(equals([1, 2, 3])));
+      });
+
+      test("canceling the stream's subscription doesn't stop a done event",
+          () async {
+        channel1.stream.listen(null).cancel();
+        await pumpEventQueue();
+
+        channel2.sink.close();
+        await pumpEventQueue();
+
+        channel1.sink.add(1);
+        channel1.sink.add(2);
+        channel1.sink.add(3);
+        channel1.sink.close();
+
+        // The sink should be ignoring events because the channel closed.
+        channel2.stream.listen(expectAsync((_) {}, count: 0));
+        await pumpEventQueue();
+      });
+    });
+
+    group("for a virtual channel:", () {
+      var virtual1;
+      var virtual2;
+      setUp(() {
+        virtual1 = channel1.virtualChannel();
+        virtual2 = channel2.virtualChannel(virtual1.id);
+      });
+
+      test("closing the sink causes the stream to close before it emits any more "
+          "events", () {
+        virtual1.sink.add(1);
+        virtual1.sink.add(2);
+        virtual1.sink.add(3);
+
+        virtual2.stream.listen(expectAsync((message) {
+          expect(message, equals(1));
+          virtual2.sink.close();
+        }, count: 1));
+      });
+
+      test("after the stream closes, the sink ignores events", () async {
+        virtual1.sink.close();
+
+        // Wait for the done event to be delivered.
+        await virtual2.stream.toList();
+        virtual2.sink.add(1);
+        virtual2.sink.add(2);
+        virtual2.sink.add(3);
+        virtual2.sink.close();
+
+        // None of our virtual.sink additions should make it to the other endpoint.
+        virtual1.stream.listen(expectAsync((_) {}, count: 0));
+        await pumpEventQueue();
+      });
+
+      test("canceling the stream's subscription has no effect on the sink",
+          () async {
+        virtual1.stream.listen(null).cancel();
+        await pumpEventQueue();
+
+        virtual1.sink.add(1);
+        virtual1.sink.add(2);
+        virtual1.sink.add(3);
+        virtual1.sink.close();
+        expect(virtual2.stream.toList(), completion(equals([1, 2, 3])));
+      });
+
+      test("canceling the stream's subscription doesn't stop a done event",
+          () async {
+        virtual1.stream.listen(null).cancel();
+        await pumpEventQueue();
+
+        virtual2.sink.close();
+        await pumpEventQueue();
+
+        virtual1.sink.add(1);
+        virtual1.sink.add(2);
+        virtual1.sink.add(3);
+        virtual1.sink.close();
+
+        // The sink should be ignoring events because the stream closed.
+        virtual2.stream.listen(expectAsync((_) {}, count: 0));
+        await pumpEventQueue();
+      });
     });
   });
 }