Revert "Add StreamQueue.fork and ForkableStream."

This reverts commit 312d39641225b64b275e57d167b57a87b335654a.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d9d85d8..39d204b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,9 +10,6 @@
 - Added `SubscriptionStream` which creates a single-subscription stream
   from an existing stream subscription.
 
-- Added `ForkableStream` which wraps a stream and allows independent forks to be
-  created that emit the same events as the original.
-
 - Added a `ResultFuture` class for synchronously accessing the result of a
   wrapped future.
 
diff --git a/lib/async.dart b/lib/async.dart
index 6cfd6e1..71a1849 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -12,7 +12,6 @@
 export "src/delegate/stream_consumer.dart";
 export "src/delegate/stream_sink.dart";
 export "src/delegate/stream_subscription.dart";
-export "src/forkable_stream.dart";
 export "src/future_group.dart";
 export "src/result_future.dart";
 export "src/stream_completer.dart";
diff --git a/lib/src/forkable_stream.dart b/lib/src/forkable_stream.dart
deleted file mode 100644
index bb8f465..0000000
--- a/lib/src/forkable_stream.dart
+++ /dev/null
@@ -1,166 +0,0 @@
-// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-library async.forkable_stream;
-
-import 'dart:async';
-
-import 'stream_completer.dart';
-
-/// A single-subscription stream from which other streams may be forked off at
-/// the current position.
-///
-/// This adds an operation, [fork], which produces a new stream that
-/// independently emits the same events as this stream. Unlike the branches
-/// produced by [StreamSplitter], a fork only emits events that arrive *after*
-/// the call to [fork].
-///
-/// Each fork can be paused or canceled independently of one another and of this
-/// stream. The underlying stream will be listened to once any branch is
-/// listened to. It will be paused when all branches are paused or not yet
-/// listened to. It will be canceled when all branches have been listened to and
-/// then canceled.
-class ForkableStream<T> extends StreamView<T> {
-  /// The underlying stream.
-  final Stream _sourceStream;
-
-  /// The subscription to [_sourceStream].
-  ///
-  /// This will be `null` until this stream or any of its forks are listened to.
-  StreamSubscription _subscription;
-
-  /// Whether this has been cancelled and no more forks may be created.
-  bool _isCanceled = false;
-
-  /// The controllers for any branches that have not yet been canceled.
-  ///
-  /// This includes a controller for this stream, until that has been cancelled.
-  final _controllers = new Set<StreamController<T>>();
-
-  /// Creates a new forkable stream wrapping [sourceStream].
-  ForkableStream(Stream sourceStream)
-      // Use a completer here so that we can provide its stream to the
-      // superclass constructor while also adding the stream controller to
-      // [_controllers].
-      : this._(sourceStream, new StreamCompleter());
-
-  ForkableStream._(this._sourceStream, StreamCompleter completer)
-      : super(completer.stream) {
-    completer.setSourceStream(_fork(primary: true));
-  }
-
-  /// Creates a new fork of this stream.
-  ///
-  /// From this point forward, the fork will emit the same events as this
-  /// stream. It will *not* emit any events that have already been emitted by
-  /// this stream. The fork is independent of this stream, which means each one
-  /// may be paused or canceled without affecting the other.
-  ///
-  /// If this stream is done or its subscription has been canceled, this returns
-  /// an empty stream.
-  Stream<T> fork() => _fork(primary: false);
-
-  /// Creates a stream forwarding [_sourceStream].
-  ///
-  /// If [primary] is true, this is the stream underlying this object;
-  /// otherwise, it's a fork. The only difference is that when the primary
-  /// stream is canceled, [fork] starts throwing [StateError]s.
-  Stream<T> _fork({bool primary: false}) {
-    if (_isCanceled) {
-      var controller = new StreamController<T>()..close();
-      return controller.stream;
-    }
-
-    var controller;
-    controller = new StreamController<T>(
-        onListen: () => _onListenOrResume(controller),
-        onCancel: () => _onCancel(controller, primary: primary),
-        onPause: () => _onPause(controller),
-        onResume: () => _onListenOrResume(controller),
-        sync: true);
-
-    _controllers.add(controller);
-
-    return controller.stream;
-  }
-
-  /// The callback called when `onListen` or `onResume` is called for the branch
-  /// managed by [controller].
-  ///
-  /// This ensures that we're subscribed to [_sourceStream] and that the
-  /// subscription isn't paused.
-  void _onListenOrResume(StreamController<T> controller) {
-    if (controller.isClosed) return;
-    if (_subscription == null) {
-      _subscription =
-          _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
-    } else {
-      _subscription.resume();
-    }
-  }
-
-  /// The callback called when `onCancel` is called for the branch managed by
-  /// [controller].
-  ///
-  /// This cancels or pauses the underlying subscription as necessary. If
-  /// [primary] is true, it also ensures that future calls to [fork] throw
-  /// [StateError]s.
-  Future _onCancel(StreamController<T> controller, {bool primary: false}) {
-    if (primary) _isCanceled = true;
-
-    if (controller.isClosed) return null;
-    _controllers.remove(controller);
-
-    if (_controllers.isEmpty) return _subscription.cancel();
-
-    _onPause(controller);
-    return null;
-  }
-
-  /// The callback called when `onPause` is called for the branch managed by
-  /// [controller].
-  ///
-  /// This pauses the underlying subscription if necessary.
-  void _onPause(StreamController<T> controller) {
-    if (controller.isClosed) return;
-    if (_subscription.isPaused) return;
-    if (_controllers.any((controller) =>
-        controller.hasListener && !controller.isPaused)) {
-      return;
-    }
-
-    _subscription.pause();
-  }
-
-  /// Forwards data events to all branches.
-  void _onData(value) {
-    // Don't iterate directly over the set because [controller.add] might cause
-    // it to be modified synchronously.
-    for (var controller in _controllers.toList()) {
-      controller.add(value);
-    }
-  }
-
-  /// Forwards error events to all branches.
-  void _onError(error, StackTrace stackTrace) {
-    // Don't iterate directly over the set because [controller.addError] might
-    // cause it to be modified synchronously.
-    for (var controller in _controllers.toList()) {
-      controller.addError(error, stackTrace);
-    }
-  }
-
-  /// Forwards close events to all branches.
-  void _onDone() {
-    _isCanceled = true;
-
-    // Don't iterate directly over the set because [controller.close] might
-    // cause it to be modified synchronously.
-    for (var controller in _controllers.toList()) {
-      controller.close();
-    }
-    _controllers.clear();
-  }
-}
-
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index b8941e1..e4b3b64 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -7,7 +7,6 @@
 import 'dart:async';
 import 'dart:collection';
 
-import "forkable_stream.dart";
 import "subscription_stream.dart";
 import "stream_completer.dart";
 import "../result.dart";
@@ -79,7 +78,7 @@
   // by the content of the fifth event.
 
   /// Source of events.
-  final ForkableStream _sourceStream;
+  final Stream _sourceStream;
 
   /// Subscription on [_sourceStream] while listening for events.
   ///
@@ -105,9 +104,7 @@
 
   /// Create a `StreamQueue` of the events of [source].
   StreamQueue(Stream source)
-      : _sourceStream = source is ForkableStream
-          ? source
-          : new ForkableStream(source);
+      : _sourceStream = source;
 
   /// Asks if the stream has any more events.
   ///
@@ -219,22 +216,6 @@
     throw _failClosed();
   }
 
-  /// Creates a new stream queue in the same position as this one.
-  ///
-  /// The fork is subscribed to the same underlying stream as this queue, but
-  /// it's otherwise wholly independent. If requests are made on one, they don't
-  /// move the other forward; if one is closed, the other is still open.
-  ///
-  /// The underlying stream will only be paused when all forks have no
-  /// outstanding requests, and only canceled when all forks are canceled.
-  StreamQueue<T> fork() {
-    if (_isClosed) throw _failClosed();
-
-    var request = new _ForkRequest<T>(this);
-    _addRequest(request);
-    return request.queue;
-  }
-
   /// Cancels the underlying stream subscription.
   ///
   /// If [immediate] is `false` (the default), the cancel operation waits until
@@ -255,15 +236,14 @@
     if (_isClosed) throw _failClosed();
     _isClosed = true;
 
-    if (_isDone) return new Future.value();
-    if (_subscription == null) _subscription = _sourceStream.listen(null);
-
     if (!immediate) {
       var request = new _CancelRequest(this);
       _addRequest(request);
       return request.future;
     }
 
+    if (_isDone) return new Future.value();
+    if (_subscription == null) _subscription = _sourceStream.listen(null);
     var future = _subscription.cancel();
     _onDone();
     return future;
@@ -353,7 +333,6 @@
         return;
       }
     }
-
     if (!_isDone) {
       _subscription.pause();
     }
@@ -649,50 +628,3 @@
     _completer.complete(false);
   }
 }
-
-/// Request for a [StreamQueue.fork] call.
-class _ForkRequest<T> implements _EventRequest {
-  /// Completer for the stream used by the queue by the `fork` call.
-  StreamCompleter _completer;
-
-  StreamQueue<T> queue;
-
-  /// The [StreamQueue] object that has this request queued.
-  final StreamQueue _streamQueue;
-
-  _ForkRequest(this._streamQueue) {
-    _completer = new StreamCompleter<T>();
-    queue = new StreamQueue<T>(_completer.stream);
-  }
-
-  bool addEvents(Queue<Result> events) {
-    _completeStream(events);
-    return true;
-  }
-
-  void close(Queue<Result> events) {
-    _completeStream(events);
-  }
-
-  void _completeStream(Queue<Result> events) {
-    if (events.isEmpty) {
-      if (_streamQueue._isDone) {
-        _completer.setEmpty();
-      } else {
-        _completer.setSourceStream(_streamQueue._sourceStream.fork());
-      }
-    } else {
-      // There are prefetched events which need to be added before the
-      // remaining stream.
-      var controller = new StreamController<T>();
-      for (var event in events) {
-        event.addTo(controller);
-      }
-
-      var fork = _streamQueue._sourceStream.fork();
-      controller.addStream(fork, cancelOnError: false)
-          .whenComplete(controller.close);
-      _completer.setSourceStream(controller.stream);
-    }
-  }
-}
diff --git a/test/forkable_stream_test.dart b/test/forkable_stream_test.dart
deleted file mode 100644
index 80242a3..0000000
--- a/test/forkable_stream_test.dart
+++ /dev/null
@@ -1,413 +0,0 @@
-// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-
-import 'package:async/async.dart';
-import 'package:test/test.dart';
-
-import 'utils.dart';
-
-void main() {
-  var controller;
-  var stream;
-  setUp(() {
-    var cancelFuture = new Future.value(42);
-    controller = new StreamController<int>(onCancel: () => cancelFuture);
-    stream = new ForkableStream<int>(controller.stream);
-  });
-
-  group("with no forks", () {
-    test("forwards events, errors, and close", () async {
-      var queue = new StreamQueue(stream);
-
-      controller.add(1);
-      expect(await queue.next, equals(1));
-
-      controller.add(2);
-      expect(await queue.next, equals(2));
-
-      controller.addError("error");
-      expect(queue.next, throwsA("error"));
-      await flushMicrotasks();
-
-      controller.add(3);
-      expect(await queue.next, equals(3));
-
-      controller.close();
-      expect(await queue.hasNext, isFalse);
-    });
-
-    test("listens to, pauses, and cancels the controller", () {
-      expect(controller.hasListener, isFalse);
-
-      var sub = stream.listen(null);
-      expect(controller.hasListener, isTrue);
-
-      sub.pause();
-      expect(controller.isPaused, isTrue);
-
-      sub.resume();
-      expect(controller.isPaused, isFalse);
-
-      sub.cancel();
-      expect(controller.hasListener, isFalse);
-    });
-
-    test("unpauses the controller when a fork is listened", () {
-      stream.listen(null).pause();
-      expect(controller.isPaused, isTrue);
-
-      var fork = stream.fork();
-      expect(controller.isPaused, isTrue);
-
-      fork.listen(null);
-      expect(controller.isPaused, isFalse);
-    });
-  });
-
-  group("with a fork created before the stream was listened", () {
-    var fork;
-    setUp(() {
-      fork = stream.fork();
-    });
-
-    test("forwards events, errors, and close to both branches", () async {
-      var queue = new StreamQueue(stream);
-      var forkQueue = new StreamQueue(fork);
-
-      controller.add(1);
-      expect(await queue.next, equals(1));
-      expect(await forkQueue.next, equals(1));
-
-      controller.add(2);
-      expect(await queue.next, equals(2));
-      expect(await forkQueue.next, equals(2));
-
-      controller.addError("error");
-      expect(queue.next, throwsA("error"));
-      expect(forkQueue.next, throwsA("error"));
-      await flushMicrotasks();
-
-      controller.add(3);
-      expect(await queue.next, equals(3));
-      expect(await forkQueue.next, equals(3));
-
-      controller.close();
-      expect(await queue.hasNext, isFalse);
-      expect(await forkQueue.hasNext, isFalse);
-    });
-
-    test('listens to the source when the original is listened', () {
-      expect(controller.hasListener, isFalse);
-      stream.listen(null);
-      expect(controller.hasListener, isTrue);
-    });
-
-    test('listens to the source when the fork is listened', () {
-      expect(controller.hasListener, isFalse);
-      fork.listen(null);
-      expect(controller.hasListener, isTrue);
-    });
-  });
-
-  test("with a fork created after the stream emitted a few events, forwards "
-      "future events, errors, and close to both branches", () async {
-    var queue = new StreamQueue(stream);
-
-    controller.add(1);
-    expect(await queue.next, equals(1));
-
-    controller.add(2);
-    expect(await queue.next, equals(2));
-
-    var fork = stream.fork();
-    var forkQueue = new StreamQueue(fork);
-
-    controller.add(3);
-    expect(await queue.next, equals(3));
-    expect(await forkQueue.next, equals(3));
-
-    controller.addError("error");
-    expect(queue.next, throwsA("error"));
-    expect(forkQueue.next, throwsA("error"));
-    await flushMicrotasks();
-
-    controller.close();
-    expect(await queue.hasNext, isFalse);
-    expect(await forkQueue.hasNext, isFalse);
-  });
-
-  group("with multiple forks", () {
-    var fork1;
-    var fork2;
-    var fork3;
-    var fork4;
-    setUp(() {
-      fork1 = stream.fork();
-      fork2 = stream.fork();
-      fork3 = stream.fork();
-      fork4 = stream.fork();
-    });
-
-    test("forwards events, errors, and close to all branches", () async {
-      var queue1 = new StreamQueue(stream);
-      var queue2 = new StreamQueue(fork1);
-      var queue3 = new StreamQueue(fork2);
-      var queue4 = new StreamQueue(fork3);
-      var queue5 = new StreamQueue(fork4);
-
-      controller.add(1);
-      expect(await queue1.next, equals(1));
-      expect(await queue2.next, equals(1));
-      expect(await queue3.next, equals(1));
-      expect(await queue4.next, equals(1));
-      expect(await queue5.next, equals(1));
-
-      controller.add(2);
-      expect(await queue1.next, equals(2));
-      expect(await queue2.next, equals(2));
-      expect(await queue3.next, equals(2));
-      expect(await queue4.next, equals(2));
-      expect(await queue5.next, equals(2));
-
-      controller.addError("error");
-      expect(queue1.next, throwsA("error"));
-      expect(queue2.next, throwsA("error"));
-      expect(queue3.next, throwsA("error"));
-      expect(queue4.next, throwsA("error"));
-      expect(queue5.next, throwsA("error"));
-      await flushMicrotasks();
-
-      controller.add(3);
-      expect(await queue1.next, equals(3));
-      expect(await queue2.next, equals(3));
-      expect(await queue3.next, equals(3));
-      expect(await queue4.next, equals(3));
-      expect(await queue5.next, equals(3));
-
-      controller.close();
-      expect(await queue1.hasNext, isFalse);
-      expect(await queue2.hasNext, isFalse);
-      expect(await queue3.hasNext, isFalse);
-      expect(await queue4.hasNext, isFalse);
-      expect(await queue5.hasNext, isFalse);
-    });
-
-    test("forwards events in order of forking", () async {
-      var queue1 = new StreamQueue(stream);
-      var queue2 = new StreamQueue(fork1);
-      var queue3 = new StreamQueue(fork2);
-      var queue4 = new StreamQueue(fork3);
-      var queue5 = new StreamQueue(fork4);
-
-      for (var i = 0; i < 4; i++) {
-        controller.add(i);
-
-        var queue1Fired = false;
-        var queue2Fired = false;
-        var queue3Fired = false;
-        var queue4Fired = false;
-        var queue5Fired = false;
-
-        queue5.next.then(expectAsync((_) {
-          queue5Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue2Fired, isTrue);
-          expect(queue3Fired, isTrue);
-          expect(queue4Fired, isTrue);
-        }));
-
-        queue1.next.then(expectAsync((_) {
-          queue1Fired = true;
-          expect(queue2Fired, isFalse);
-          expect(queue3Fired, isFalse);
-          expect(queue4Fired, isFalse);
-          expect(queue5Fired, isFalse);
-        }));
-
-        queue4.next.then(expectAsync((_) {
-          queue4Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue2Fired, isTrue);
-          expect(queue3Fired, isTrue);
-          expect(queue5Fired, isFalse);
-        }));
-
-        queue2.next.then(expectAsync((_) {
-          queue2Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue3Fired, isFalse);
-          expect(queue4Fired, isFalse);
-          expect(queue5Fired, isFalse);
-        }));
-
-        queue3.next.then(expectAsync((_) {
-          queue3Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue2Fired, isTrue);
-          expect(queue4Fired, isFalse);
-          expect(queue5Fired, isFalse);
-        }));
-      }
-    });
-
-    test("pauses the source when all forks are paused and/or not listening",
-        () {
-      var sub1 = stream.listen(null);
-      var sub2 = fork1.listen(null);
-      expect(controller.isPaused, isFalse);
-
-      sub1.pause();
-      expect(controller.isPaused, isFalse);
-
-      sub2.pause();
-      expect(controller.isPaused, isTrue);
-
-      var sub3 = fork2.listen(null);
-      expect(controller.isPaused, isFalse);
-
-      sub3.pause();
-      expect(controller.isPaused, isTrue);
-
-      sub2.resume();
-      expect(controller.isPaused, isFalse);
-
-      sub2.cancel();
-      expect(controller.isPaused, isTrue);
-    });
-
-    test("cancels the source when all forks are canceled", () async {
-      var sub1 = stream.listen(null);
-      expect(controller.hasListener, isTrue);
-
-      var sub2 = fork1.listen(null);
-      expect(controller.hasListener, isTrue);
-
-      expect(sub1.cancel(), isNull);
-      await flushMicrotasks();
-      expect(controller.hasListener, isTrue);
-
-      expect(sub2.cancel(), isNull);
-      await flushMicrotasks();
-      expect(controller.hasListener, isTrue);
-
-      expect(fork2.listen(null).cancel(), isNull);
-      await flushMicrotasks();
-      expect(controller.hasListener, isTrue);
-
-      expect(fork3.listen(null).cancel(), isNull);
-      await flushMicrotasks();
-      expect(controller.hasListener, isTrue);
-
-      expect(fork4.listen(null).cancel(), completion(equals(42)));
-      await flushMicrotasks();
-      expect(controller.hasListener, isFalse);
-    });
-  });
-
-  group("modification during dispatch:", () {
-    test("forking during onCancel", () {
-      controller = new StreamController<int>(onCancel: expectAsync(() {
-        expect(stream.fork().toList(), completion(isEmpty));
-      }));
-      stream = new ForkableStream<int>(controller.stream);
-
-      stream.listen(null).cancel();
-    });
-
-    test("forking during onPause", () {
-      controller = new StreamController<int>(onPause: expectAsync(() {
-        stream.fork().listen(null);
-      }));
-      stream = new ForkableStream<int>(controller.stream);
-
-      stream.listen(null).pause();
-
-      // The fork created in onPause should have resumed the stream.
-      expect(controller.isPaused, isFalse);
-    });
-
-    test("forking during onData", () {
-      var sub;
-      sub = stream.listen(expectAsync((value1) {
-        expect(value1, equals(1));
-        stream.fork().listen(expectAsync((value2) {
-          expect(value2, equals(2));
-        }));
-        sub.cancel();
-      }));
-
-      controller.add(1);
-      controller.add(2);
-    });
-
-    test("canceling a fork during onData", () {
-      var fork = stream.fork();
-      var forkSub = fork.listen(expectAsync((_) {}, count: 0));
-
-      stream.listen(expectAsync((_) => forkSub.cancel()));
-      controller.add(null);
-    });
-
-    test("forking during onError", () {
-      var sub;
-      sub = stream.listen(null, onError: expectAsync((error1) {
-        expect(error1, equals("error 1"));
-        stream.fork().listen(null, onError: expectAsync((error2) {
-          expect(error2, equals("error 2"));
-        }));
-        sub.cancel();
-      }));
-
-      controller.addError("error 1");
-      controller.addError("error 2");
-    });
-
-    test("canceling a fork during onError", () {
-      var fork = stream.fork();
-      var forkSub = fork.listen(expectAsync((_) {}, count: 0));
-
-      stream.listen(null, onError: expectAsync((_) => forkSub.cancel()));
-      controller.addError("error");
-    });
-
-    test("forking during onDone", () {
-      stream.listen(null, onDone: expectAsync(() {
-        expect(stream.fork().toList(), completion(isEmpty));
-      }));
-
-      controller.close();
-    });
-
-    test("canceling a fork during onDone", () {
-      var fork = stream.fork();
-      var forkSub = fork.listen(null, onDone: expectAsync(() {}, count: 0));
-
-      stream.listen(null, onDone: expectAsync(() => forkSub.cancel()));
-      controller.close();
-    });
-  });
-
-  group("throws an error when", () {
-    test("a cancelled stream is forked", () {
-      stream.listen(null).cancel();
-      expect(stream.fork().toList(), completion(isEmpty));
-    });
-
-    test("a cancelled stream is forked even when other forks are alive", () {
-      stream.fork().listen(null);
-      stream.listen(null).cancel();
-
-      expect(controller.hasListener, isTrue);
-      expect(stream.fork().toList(), completion(isEmpty));
-    });
-
-    test("a closed stream is forked", () async {
-      controller.close();
-      await stream.listen(null).asFuture();
-      expect(stream.fork().toList(), completion(isEmpty));
-    });
-  });
-}
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 3768bff..228ba8a 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -629,434 +629,6 @@
     });
   });
 
-  group("fork operation", () {
-    test("produces a stream queue with the same events", () async {
-      var queue1 = new StreamQueue<int>(createStream());
-      var queue2 = queue1.fork();
-
-      expect(await queue1.next, 1);
-      expect(await queue1.next, 2);
-      expect(await queue1.next, 3);
-      expect(await queue1.next, 4);
-      expect(await queue1.hasNext, isFalse);
-
-      expect(await queue2.next, 1);
-      expect(await queue2.next, 2);
-      expect(await queue2.next, 3);
-      expect(await queue2.next, 4);
-      expect(await queue2.hasNext, isFalse);
-    });
-
-    test("produces a stream queue with the same errors", () async {
-      var queue1 = new StreamQueue<int>(createErrorStream());
-      var queue2 = queue1.fork();
-
-      expect(await queue1.next, 1);
-      expect(await queue1.next, 2);
-      expect(queue1.next, throwsA("To err is divine!"));
-      expect(await queue1.next, 4);
-      expect(await queue1.hasNext, isFalse);
-
-      expect(await queue2.next, 1);
-      expect(await queue2.next, 2);
-      expect(queue2.next, throwsA("To err is divine!"));
-      expect(await queue2.next, 4);
-      expect(await queue2.hasNext, isFalse);
-    });
-
-    test("forks at the current point in the source queue", () {
-      var queue1 = new StreamQueue<int>(createStream());
-
-      expect(queue1.next, completion(1));
-      expect(queue1.next, completion(2));
-
-      var queue2 = queue1.fork();
-
-      expect(queue1.next, completion(3));
-      expect(queue1.next, completion(4));
-      expect(queue1.hasNext, completion(isFalse));
-
-      expect(queue2.next, completion(3));
-      expect(queue2.next, completion(4));
-      expect(queue2.hasNext, completion(isFalse));
-    });
-
-    test("can be created after there are pending values", () async {
-      var queue1 = new StreamQueue<int>(createStream());
-      await flushMicrotasks();
-
-      var queue2 = queue1.fork();
-      expect(await queue2.next, 1);
-      expect(await queue2.next, 2);
-      expect(await queue2.next, 3);
-      expect(await queue2.next, 4);
-      expect(await queue2.hasNext, isFalse);
-    });
-
-    test("multiple forks can be created at different points", () async {
-      var queue1 = new StreamQueue<int>(createStream());
-
-      var queue2 = queue1.fork();
-      expect(await queue1.next, 1);
-      expect(await queue2.next, 1);
-
-      var queue3 = queue1.fork();
-      expect(await queue1.next, 2);
-      expect(await queue2.next, 2);
-      expect(await queue3.next, 2);
-
-      var queue4 = queue1.fork();
-      expect(await queue1.next, 3);
-      expect(await queue2.next, 3);
-      expect(await queue3.next, 3);
-      expect(await queue4.next, 3);
-
-      var queue5 = queue1.fork();
-      expect(await queue1.next, 4);
-      expect(await queue2.next, 4);
-      expect(await queue3.next, 4);
-      expect(await queue4.next, 4);
-      expect(await queue5.next, 4);
-
-      var queue6 = queue1.fork();
-      expect(await queue1.hasNext, isFalse);
-      expect(await queue2.hasNext, isFalse);
-      expect(await queue3.hasNext, isFalse);
-      expect(await queue4.hasNext, isFalse);
-      expect(await queue5.hasNext, isFalse);
-      expect(await queue6.hasNext, isFalse);
-    });
-
-    test("same-level forks receive data in the order they were created",
-        () async {
-      var queue1 = new StreamQueue<int>(createStream());
-      var queue2 = queue1.fork();
-      var queue3 = queue1.fork();
-      var queue4 = queue1.fork();
-      var queue5 = queue1.fork();
-
-      for (var i = 0; i < 4; i++) {
-        var queue1Fired = false;
-        var queue2Fired = false;
-        var queue3Fired = false;
-        var queue4Fired = false;
-        var queue5Fired = false;
-
-        queue5.next.then(expectAsync((_) {
-          queue5Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue2Fired, isTrue);
-          expect(queue3Fired, isTrue);
-          expect(queue4Fired, isTrue);
-        }));
-
-        queue1.next.then(expectAsync((_) {
-          queue1Fired = true;
-          expect(queue2Fired, isFalse);
-          expect(queue3Fired, isFalse);
-          expect(queue4Fired, isFalse);
-          expect(queue5Fired, isFalse);
-        }));
-
-        queue4.next.then(expectAsync((_) {
-          queue4Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue2Fired, isTrue);
-          expect(queue3Fired, isTrue);
-          expect(queue5Fired, isFalse);
-        }));
-
-        queue2.next.then(expectAsync((_) {
-          queue2Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue3Fired, isFalse);
-          expect(queue4Fired, isFalse);
-          expect(queue5Fired, isFalse);
-        }));
-
-        queue3.next.then(expectAsync((_) {
-          queue3Fired = true;
-          expect(queue1Fired, isTrue);
-          expect(queue2Fired, isTrue);
-          expect(queue4Fired, isFalse);
-          expect(queue5Fired, isFalse);
-        }));
-      }
-    });
-
-    test("forks can be created from forks", () async {
-      var queue1 = new StreamQueue<int>(createStream());
-
-      var queue2 = queue1.fork();
-      expect(await queue1.next, 1);
-      expect(await queue2.next, 1);
-
-      var queue3 = queue2.fork();
-      expect(await queue1.next, 2);
-      expect(await queue2.next, 2);
-      expect(await queue3.next, 2);
-
-      var queue4 = queue3.fork();
-      expect(await queue1.next, 3);
-      expect(await queue2.next, 3);
-      expect(await queue3.next, 3);
-      expect(await queue4.next, 3);
-
-      var queue5 = queue4.fork();
-      expect(await queue1.next, 4);
-      expect(await queue2.next, 4);
-      expect(await queue3.next, 4);
-      expect(await queue4.next, 4);
-      expect(await queue5.next, 4);
-
-      var queue6 = queue5.fork();
-      expect(await queue1.hasNext, isFalse);
-      expect(await queue2.hasNext, isFalse);
-      expect(await queue3.hasNext, isFalse);
-      expect(await queue4.hasNext, isFalse);
-      expect(await queue5.hasNext, isFalse);
-      expect(await queue6.hasNext, isFalse);
-    });
-
-    group("canceling:", () {
-      test("cancelling a fork doesn't cancel its source", () async {
-        var queue1 = new StreamQueue<int>(createStream());
-        var queue2 = queue1.fork();
-
-        queue2.cancel();
-        expect(() => queue2.next, throwsStateError);
-
-        expect(await queue1.next, 1);
-        expect(await queue1.next, 2);
-        expect(await queue1.next, 3);
-        expect(await queue1.next, 4);
-        expect(await queue1.hasNext, isFalse);
-      });
-
-      test("cancelling a source doesn't cancel its unmaterialized fork",
-          () async {
-        var queue1 = new StreamQueue<int>(createStream());
-        var queue2 = queue1.fork();
-
-        queue1.cancel();
-        expect(() => queue1.next, throwsStateError);
-
-        expect(await queue2.next, 1);
-        expect(await queue2.next, 2);
-        expect(await queue2.next, 3);
-        expect(await queue2.next, 4);
-        expect(await queue2.hasNext, isFalse);
-      });
-
-      test("cancelling a source doesn't cancel its materialized fork",
-          () async {
-        var queue1 = new StreamQueue<int>(createStream());
-        var queue2 = queue1.fork();
-
-        expect(await queue1.next, 1);
-
-        queue1.cancel();
-        expect(() => queue1.next, throwsStateError);
-
-        expect(await queue2.next, 1);
-        expect(await queue2.next, 2);
-        expect(await queue2.next, 3);
-        expect(await queue2.next, 4);
-        expect(await queue2.hasNext, isFalse);
-      });
-
-      test("the underlying stream is only canceled once all forks are canceled",
-          () async {
-        var controller = new StreamController();
-        var queue1 = new StreamQueue<int>(controller.stream);
-        var queue2 = queue1.fork();
-
-        await flushMicrotasks();
-        expect(controller.hasListener, isFalse);
-
-        expect(queue1.next, completion(1));
-        await flushMicrotasks();
-        expect(controller.hasListener, isTrue);
-
-        queue2.cancel();
-        await flushMicrotasks();
-        expect(controller.hasListener, isTrue);
-
-        controller.add(1);
-        queue1.cancel();
-        await flushMicrotasks();
-        expect(controller.hasListener, isFalse);
-      });
-
-      group("with immediate,", () {
-        test("cancelling a fork doesn't cancel its source", () async {
-          var queue1 = new StreamQueue<int>(createStream());
-          var queue2 = queue1.fork();
-
-          queue2.cancel(immediate: true);
-          expect(() => queue2.next, throwsStateError);
-
-          expect(await queue1.next, 1);
-          expect(await queue1.next, 2);
-          expect(await queue1.next, 3);
-          expect(await queue1.next, 4);
-          expect(await queue1.hasNext, isFalse);
-        });
-
-        test("cancelling a source doesn't cancel its unmaterialized fork",
-            () async {
-          var queue1 = new StreamQueue<int>(createStream());
-          var queue2 = queue1.fork();
-
-          queue1.cancel(immediate: true);
-          expect(() => queue1.next, throwsStateError);
-
-          expect(await queue2.next, 1);
-          expect(await queue2.next, 2);
-          expect(await queue2.next, 3);
-          expect(await queue2.next, 4);
-          expect(await queue2.hasNext, isFalse);
-        });
-
-        test("cancelling a source doesn't cancel its materialized fork",
-            () async {
-          var queue1 = new StreamQueue<int>(createStream());
-          var queue2 = queue1.fork();
-
-          expect(await queue1.next, 1);
-
-          queue1.cancel(immediate: true);
-          expect(() => queue1.next, throwsStateError);
-
-          expect(await queue2.next, 1);
-          expect(await queue2.next, 2);
-          expect(await queue2.next, 3);
-          expect(await queue2.next, 4);
-          expect(await queue2.hasNext, isFalse);
-        });
-
-        test("the underlying stream is only canceled once all forks are "
-            "canceled", () async {
-          var controller = new StreamController();
-          var queue1 = new StreamQueue<int>(controller.stream);
-          var queue2 = queue1.fork();
-
-          await flushMicrotasks();
-          expect(controller.hasListener, isFalse);
-
-          expect(queue1.next, throwsStateError);
-          await flushMicrotasks();
-          expect(controller.hasListener, isTrue);
-
-          queue2.cancel(immediate: true);
-          await flushMicrotasks();
-          expect(controller.hasListener, isTrue);
-
-          queue1.cancel(immediate: true);
-          await flushMicrotasks();
-          expect(controller.hasListener, isFalse);
-        });
-      });
-    });
-
-    group("pausing:", () {
-      test("the underlying stream is only implicitly paused when no forks are "
-          "awaiting input", () async {
-        var controller = new StreamController();
-        var queue1 = new StreamQueue<int>(controller.stream);
-        var queue2 = queue1.fork();
-
-        controller.add(1);
-        expect(await queue1.next, 1);
-        expect(controller.hasListener, isTrue);
-        expect(controller.isPaused, isTrue);
-
-        expect(queue1.next, completion(2));
-        await flushMicrotasks();
-        expect(controller.isPaused, isFalse);
-
-        controller.add(2);
-        await flushMicrotasks();
-        expect(controller.isPaused, isTrue);
-
-        expect(queue2.next, completion(1));
-        expect(queue2.next, completion(2));
-        expect(queue2.next, completion(3));
-        await flushMicrotasks();
-        expect(controller.isPaused, isFalse);
-
-        controller.add(3);
-        await flushMicrotasks();
-        expect(controller.isPaused, isTrue);
-      });
-
-      test("pausing a fork doesn't pause its source", () async {
-        var queue1 = new StreamQueue<int>(createStream());
-        var queue2 = queue1.fork();
-
-        queue2.rest.listen(expectAsync((_) {}, count: 0)).pause();
-
-        expect(await queue1.next, 1);
-        expect(await queue1.next, 2);
-        expect(await queue1.next, 3);
-        expect(await queue1.next, 4);
-        expect(await queue1.hasNext, isFalse);
-      });
-
-      test("pausing a source doesn't pause its fork", () async {
-        var queue1 = new StreamQueue<int>(createStream());
-        var queue2 = queue1.fork();
-
-        queue1.rest.listen(expectAsync((_) {}, count: 0)).pause();
-
-        expect(await queue2.next, 1);
-        expect(await queue2.next, 2);
-        expect(await queue2.next, 3);
-        expect(await queue2.next, 4);
-        expect(await queue2.hasNext, isFalse);
-      });
-
-      test("the underlying stream is only paused when all forks are paused",
-          () async {
-        var controller = new StreamController();
-        var queue1 = new StreamQueue<int>(controller.stream);
-        var queue2 = queue1.fork();
-
-        await flushMicrotasks();
-        expect(controller.hasListener, isFalse);
-
-        var sub1 = queue1.rest.listen(null);
-        await flushMicrotasks();
-        expect(controller.hasListener, isTrue);
-        expect(controller.isPaused, isFalse);
-
-        sub1.pause();
-        await flushMicrotasks();
-        expect(controller.isPaused, isTrue);
-
-        expect(queue2.next, completion(1));
-        await flushMicrotasks();
-        expect(controller.isPaused, isFalse);
-
-        controller.add(1);
-        await flushMicrotasks();
-        expect(controller.isPaused, isTrue);
-
-        var sub2 = queue2.rest.listen(null);
-        await flushMicrotasks();
-        expect(controller.isPaused, isFalse);
-
-        sub2.pause();
-        await flushMicrotasks();
-        expect(controller.isPaused, isTrue);
-
-        sub1.resume();
-        await flushMicrotasks();
-        expect(controller.isPaused, isFalse);
-      });
-    });
-  });
-
   test("all combinations sequential skip/next/take operations", () async {
     // Takes all combinations of two of next, skip and take, then ends with
     // doing rest. Each of the first rounds do 10 events of each type,