Fix a bug in StreamQueue.cancel().

If cancel was called on the queue before anything else, it wouldn't call
StreamSubscription.cancel.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1239543004 .
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 7d78ac5..e4b3b64 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -530,9 +530,10 @@
   }
 
   void _shutdown() {
-    if (_streamQueue._subscription == null) {
+    if (_streamQueue._isDone) {
       _completer.complete();
     } else {
+      _streamQueue._ensureListening();
       _completer.complete(_streamQueue._dispose().cancel());
     }
   }
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index cad0ad5..228ba8a 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -344,6 +344,14 @@
       expect(() => events.cancel(), throwsStateError);
     });
 
+    test("cancels underlying subscription when called before any event",
+        () async {
+      var cancelFuture = new Future.value(42);
+      var controller = new StreamController(onCancel: () => cancelFuture);
+      var events = new StreamQueue<int>(controller.stream);
+      expect(await events.cancel(), 42);
+    });
+
     test("cancels underlying subscription, returns result", () async {
       var cancelFuture = new Future.value(42);
       var controller = new StreamController(onCancel: () => cancelFuture);
@@ -353,7 +361,7 @@
       expect(await events.cancel(), 42);
     });
 
-    group("with immediate: true", () async {
+    group("with immediate: true", () {
       test("closes the events, prevents any other operation", () async {
         var events = new StreamQueue<int>(createStream());
         await events.cancel(immediate: true);
@@ -376,6 +384,15 @@
         await expect(controller.hasListener, isFalse);
       });
 
+      test("cancels the underlying subscription when called before any event",
+          () async {
+        var cancelFuture = new Future.value(42);
+        var controller = new StreamController(onCancel: () => cancelFuture);
+
+        var events = new StreamQueue<int>(controller.stream);
+        expect(await events.cancel(immediate: true), 42);
+      });
+
       test("closes pending requests", () async {
         var events = new StreamQueue<int>(createStream());
         expect(await events.next, 1);