Use a named param for StreamQueue.cancelImmediately.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1223423002 .
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 53ed142..7d78ac5 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -91,7 +91,7 @@
 
   /// Whether a closing operation has been performed on the stream queue.
   ///
-  /// Closing operations are [cancel], [cancelImmediately], and [rest].
+  /// Closing operations are [cancel] and [rest].
   bool _isClosed = false;
 
   /// Queue of events not used by a request yet.
@@ -218,9 +218,13 @@
 
   /// Cancels the underlying stream subscription.
   ///
-  /// The cancel operation waits until all previously requested
-  /// events have been processed, then it cancels the subscription
-  /// providing the events.
+  /// If [immediate] is `false` (the default), the cancel operation waits until
+  /// all previously requested events have been processed, then it cancels the
+  /// subscription providing the events.
+  ///
+  /// If [immediate] is `true`, the subscription is instead canceled
+  /// immediately. Any pending events are completed as though the underlying
+  /// stream had closed.
   ///
   /// The returned future completes with the result of calling
   /// `cancel`.
@@ -228,29 +232,15 @@
   /// After calling `cancel`, no further events can be requested.
   /// None of [next], [rest], [skip], [take] or [cancel] may be
   /// called again.
-  Future cancel() {
-    if (!_isClosed) {
-      _isClosed = true;
+  Future cancel({bool immediate: false}) {
+    if (_isClosed) throw _failClosed();
+    _isClosed = true;
+
+    if (!immediate) {
       var request = new _CancelRequest(this);
       _addRequest(request);
       return request.future;
     }
-    throw _failClosed();
-  }
-
-  /// Cancels the underlying stream subscription immediately.
-  ///
-  /// Any pending events will complete as though the stream had closed when
-  /// [cancel] was called.
-  ///
-  /// The returned future completes with the result of calling
-  /// `StreamSubscription.cancel`.
-  ///
-  /// After calling `cancelImmediately`, no further events can be requested.
-  /// None of [next], [rest], [skip], [take] or [cancel] may be called again.
-  Future cancelImmediately() {
-    if (_isClosed) throw _failClosed();
-    _isClosed = true;
 
     if (_isDone) return new Future.value();
     if (_subscription == null) _subscription = _sourceStream.listen(null);
@@ -262,7 +252,7 @@
   /// Returns an error for when a request is made after cancel.
   ///
   /// Returns a [StateError] with a message saying that either
-  /// [cancel], [cancelImmediately], or [rest] have already been called.
+  /// [cancel] or [rest] have already been called.
   Error _failClosed() {
     return new StateError("Already cancelled");
   }
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index dae8ed2..cad0ad5 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -352,59 +352,60 @@
       expect(await events.next, 1);
       expect(await events.cancel(), 42);
     });
-  });
 
-  group("cancelImmediately()", () async {
-    test("closes the events, prevents any other operation", () async {
-      var events = new StreamQueue<int>(createStream());
-      await events.cancelImmediately();
-      expect(() => events.next, throwsStateError);
-      expect(() => events.skip(1), throwsStateError);
-      expect(() => events.take(1), throwsStateError);
-      expect(() => events.rest, throwsStateError);
-      expect(() => events.cancel(), throwsStateError);
-    });
+    group("with immediate: true", () async {
+      test("closes the events, prevents any other operation", () async {
+        var events = new StreamQueue<int>(createStream());
+        await events.cancel(immediate: true);
+        expect(() => events.next, throwsStateError);
+        expect(() => events.skip(1), throwsStateError);
+        expect(() => events.take(1), throwsStateError);
+        expect(() => events.rest, throwsStateError);
+        expect(() => events.cancel(), throwsStateError);
+      });
 
-    test("cancels the underlying subscription immediately", () async {
-      var controller = new StreamController();
-      controller.add(1);
+      test("cancels the underlying subscription immediately", () async {
+        var controller = new StreamController();
+        controller.add(1);
 
-      var events = new StreamQueue<int>(controller.stream);
-      expect(await events.next, 1);
-      expect(controller.hasListener, isTrue);
+        var events = new StreamQueue<int>(controller.stream);
+        expect(await events.next, 1);
+        expect(controller.hasListener, isTrue);
 
-      events.cancelImmediately();
-      await expect(controller.hasListener, isFalse);
-    });
+        events.cancel(immediate: true);
+        await expect(controller.hasListener, isFalse);
+      });
 
-    test("closes pending requests", () async {
-      var events = new StreamQueue<int>(createStream());
-      expect(await events.next, 1);
-      expect(events.next, throwsStateError);
-      expect(events.hasNext, completion(isFalse));
+      test("closes pending requests", () async {
+        var events = new StreamQueue<int>(createStream());
+        expect(await events.next, 1);
+        expect(events.next, throwsStateError);
+        expect(events.hasNext, completion(isFalse));
 
-      await events.cancelImmediately();
-    });
+        await events.cancel(immediate: true);
+      });
 
-    test("returns the result of closing the underlying subscription", () async {
-      var controller = new StreamController(
-          onCancel: () => new Future.value(42));
-      var events = new StreamQueue<int>(controller.stream);
-      expect(await events.cancelImmediately(), 42);
-    });
+      test("returns the result of closing the underlying subscription",
+          () async {
+        var controller = new StreamController(
+            onCancel: () => new Future.value(42));
+        var events = new StreamQueue<int>(controller.stream);
+        expect(await events.cancel(immediate: true), 42);
+      });
 
-    test("listens and then cancels a stream that hasn't been listened to yet",
-        () async {
-      var wasListened = false;
-      var controller = new StreamController(
-          onListen: () => wasListened = true);
-      var events = new StreamQueue<int>(controller.stream);
-      expect(wasListened, isFalse);
-      expect(controller.hasListener, isFalse);
+      test("listens and then cancels a stream that hasn't been listened to yet",
+          () async {
+        var wasListened = false;
+        var controller = new StreamController(
+            onListen: () => wasListened = true);
+        var events = new StreamQueue<int>(controller.stream);
+        expect(wasListened, isFalse);
+        expect(controller.hasListener, isFalse);
 
-      await events.cancelImmediately();
-      expect(wasListened, isTrue);
-      expect(controller.hasListener, isFalse);
+        await events.cancel(immediate: true);
+        expect(wasListened, isTrue);
+        expect(controller.hasListener, isFalse);
+      });
     });
   });