Fix a bug in StreamQueue.cancel().
If cancel was called on the queue before anything else, it wouldn't call
StreamSubscription.cancel.
R=lrn@google.com
Review URL: https://codereview.chromium.org//1239543004 .
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 7d78ac5..e4b3b64 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -530,9 +530,10 @@
}
void _shutdown() {
- if (_streamQueue._subscription == null) {
+ if (_streamQueue._isDone) {
_completer.complete();
} else {
+ _streamQueue._ensureListening();
_completer.complete(_streamQueue._dispose().cancel());
}
}
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index cad0ad5..228ba8a 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -344,6 +344,14 @@
expect(() => events.cancel(), throwsStateError);
});
+ test("cancels underlying subscription when called before any event",
+ () async {
+ var cancelFuture = new Future.value(42);
+ var controller = new StreamController(onCancel: () => cancelFuture);
+ var events = new StreamQueue<int>(controller.stream);
+ expect(await events.cancel(), 42);
+ });
+
test("cancels underlying subscription, returns result", () async {
var cancelFuture = new Future.value(42);
var controller = new StreamController(onCancel: () => cancelFuture);
@@ -353,7 +361,7 @@
expect(await events.cancel(), 42);
});
- group("with immediate: true", () async {
+ group("with immediate: true", () {
test("closes the events, prevents any other operation", () async {
var events = new StreamQueue<int>(createStream());
await events.cancel(immediate: true);
@@ -376,6 +384,15 @@
await expect(controller.hasListener, isFalse);
});
+ test("cancels the underlying subscription when called before any event",
+ () async {
+ var cancelFuture = new Future.value(42);
+ var controller = new StreamController(onCancel: () => cancelFuture);
+
+ var events = new StreamQueue<int>(controller.stream);
+ expect(await events.cancel(immediate: true), 42);
+ });
+
test("closes pending requests", () async {
var events = new StreamQueue<int>(createStream());
expect(await events.next, 1);