Make it possible to not complete a request when the event source is done.

Abstract the event source of a stream queue so it doesn't have to be a stream.

This is in anticipation of adding look-ahead which will need to keep the
request alive until the user fast-forwards or rewindes, even after the
original event source is done, and which will need a stream queue fed by
the look-ahead request, not by a stream.

R=nweiz@google.com

Review URL: https://codereview.chromium.org//1305063002 .
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 36d03ef..09b3a75 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -60,15 +60,17 @@
 ///
 /// When you need no further events the `StreamQueue` should be closed
 /// using [cancel]. This releases the underlying stream subscription.
-class StreamQueue<T> {
+abstract class StreamQueue<T> {
   // This class maintains two queues: one of events and one of requests.
   // The active request (the one in front of the queue) is called with
-  // the current event queue when it becomes active.
+  // the current event queue when it becomes active, every time a
+  // new event arrives, and when the event source closes.
   //
-  // If the request returns true, it's complete and will be removed from the
+  // If the request returns `true`, it's complete and will be removed from the
   // request queue.
-  // If the request returns false, it needs more events, and will be called
-  // again when new events are available.
+  // If the request returns `false`, it needs more events, and will be called
+  // again when new events are available. It may trigger a call itself by
+  // calling [_updateRequests].
   // The request can remove events that it uses, or keep them in the event
   // queue until it has all that it needs.
   //
@@ -77,16 +79,7 @@
   // potentially a request that takes either five or zero events, determined
   // by the content of the fifth event.
 
-  /// Source of events.
-  final Stream _sourceStream;
-
-  /// Subscription on [_sourceStream] while listening for events.
-  ///
-  /// Set to subscription when listening, and set to `null` when the
-  /// subscription is done (and [_isDone] is set to true).
-  StreamSubscription _subscription;
-
-  /// Whether we have listened on [_sourceStream] and the subscription is done.
+  /// Whether the event source is done.
   bool _isDone = false;
 
   /// Whether a closing operation has been performed on the stream queue.
@@ -103,8 +96,9 @@
   final Queue<_EventRequest> _requestQueue = new Queue();
 
   /// Create a `StreamQueue` of the events of [source].
-  StreamQueue(Stream source)
-      : _sourceStream = source;
+  factory StreamQueue(Stream source) = _StreamQueue<T>;
+
+  StreamQueue._();
 
   /// Asks if the stream has any more events.
   ///
@@ -115,6 +109,8 @@
   ///
   /// Can be used before using [next] to avoid getting an error in the
   /// future returned by `next` in the case where there are no more events.
+  /// Another alternative is to use `take(1)` which returns either zero or
+  /// one events.
   Future<bool> get hasNext {
     if (!_isClosed) {
       var hasNextRequest = new _HasNextRequest();
@@ -216,15 +212,15 @@
     throw _failClosed();
   }
 
-  /// Cancels the underlying stream subscription.
+  /// Cancels the underlying event source.
   ///
   /// If [immediate] is `false` (the default), the cancel operation waits until
   /// all previously requested events have been processed, then it cancels the
   /// subscription providing the events.
   ///
-  /// If [immediate] is `true`, the subscription is instead canceled
-  /// immediately. Any pending events complete with a 'closed'-event, as though
-  /// the stream had closed by itself.
+  /// If [immediate] is `true`, the source is instead canceled
+  /// immediately. Any pending events are completed as though the underlying
+  /// stream had closed.
   ///
   /// The returned future completes with the result of calling
   /// `cancel`.
@@ -242,13 +238,84 @@
       return request.future;
     }
 
-    if (_isDone) return new Future.value();
-    if (_subscription == null) _subscription = _sourceStream.listen(null);
-    var future = _subscription.cancel();
-    _onDone();
-    return future;
+    if (_isDone && _eventQueue.isEmpty) return new Future.value();
+    return _cancel();
   }
 
+  // ------------------------------------------------------------------
+  // Methods that may be called from the request implementations to
+  // control the even stream.
+
+  /// Matches events with requests.
+  ///
+  /// Called after receiving an event or when the event source closes.
+  ///
+  /// May be called by requests which have returned `false` (saying they
+  /// are not yet done) so they can be checked again before any new
+  /// events arrive.
+  /// Any request returing `false` from `update` when `isDone` is `true`
+  /// *must* call `_updateRequests` when they are ready to continue
+  /// (since no further events will trigger the call).
+  void _updateRequests() {
+    while (_requestQueue.isNotEmpty) {
+      if (_requestQueue.first.update(_eventQueue, _isDone)) {
+        _requestQueue.removeFirst();
+      } else {
+        return;
+      }
+    }
+
+    if (!_isDone) {
+      _pause();
+    }
+  }
+
+  /// Extracts a stream from the event source and makes this stream queue
+  /// unusable.
+  ///
+  /// Can only be used by the very last request (the stream queue must
+  /// be closed by that request).
+  /// Only used by [rest].
+  Stream _extractStream();
+
+  /// Requests that the event source pauses events.
+  ///
+  /// This is called automatically when the request queue is empty.
+  ///
+  /// The event source is restarted by the next call to [_ensureListening].
+  void _pause();
+
+  /// Ensures that we are listening on events from the event source.
+  ///
+  /// Starts listening for the first time or resumes after a [_pause].
+  ///
+  /// Is called automatically if a request requires more events.
+  void _ensureListening();
+
+  /// Cancels the underlying event source.
+  Future _cancel();
+
+  // ------------------------------------------------------------------
+  // Methods called by the event source to add events or say that it's
+  // done.
+
+  /// Called when the event source adds a new data or error event.
+  /// Always calls [_updateRequests] after adding.
+  void _addResult(Result result) {
+    _eventQueue.add(result);
+    _updateRequests();
+  }
+
+  /// Called when the event source is done.
+  /// Always calls [_updateRequests] after adding.
+  void _close() {
+    _isDone = true;
+    _updateRequests();
+  }
+
+  // ------------------------------------------------------------------
+  // Internal helper methods.
+
   /// Returns an error for when a request is made after cancel.
   ///
   /// Returns a [StateError] with a message saying that either
@@ -257,99 +324,92 @@
     return new StateError("Already cancelled");
   }
 
-  // Callbacks receiving the events of the source stream.
-
-  void _onData(T data) {
-    _eventQueue.add(new Result.value(data));
-    _checkQueues();
-  }
-
-  void _onError(error, StackTrace stack) {
-    _eventQueue.add(new Result.error(error, stack));
-    _checkQueues();
-  }
-
-  void _onDone() {
-    _subscription = null;
-    _isDone = true;
-    _closeAllRequests();
-  }
-
-  // Request queue management.
-
   /// Adds a new request to the queue.
+  ///
+  /// If the request queue is empty and the request can be completed
+  /// immediately, it skips the queue.
   void _addRequest(_EventRequest request) {
-    if (_isDone) {
-      assert(_requestQueue.isEmpty);
-      if (!request.addEvents(_eventQueue)) {
-        request.close(_eventQueue);
-      }
-      return;
-    }
     if (_requestQueue.isEmpty) {
-      if (request.addEvents(_eventQueue)) return;
+      if (request.update(_eventQueue, _isDone)) return;
       _ensureListening();
     }
     _requestQueue.add(request);
   }
+}
 
-  /// Ensures that we are listening on events from [_sourceStream].
+
+/// The default implementation of [StreamQueue].
+///
+/// This queue gets its events from a stream which is listened
+/// to when a request needs events.
+class _StreamQueue<T> extends StreamQueue<T> {
+  /// Source of events.
+  final Stream _sourceStream;
+
+  /// Subscription on [_sourceStream] while listening for events.
   ///
-  /// Resumes subscription on [_sourceStream], or creates it if necessary.
+  /// Set to subscription when listening, and set to `null` when the
+  /// subscription is done (and [_isDone] is set to true).
+  StreamSubscription _subscription;
+
+  _StreamQueue(this._sourceStream) : super._();
+
+  Future _cancel() {
+    if (_isDone) return null;
+    if (_subscription == null) _subscription = _sourceStream.listen(null);
+    var future = _subscription.cancel();
+    _close();
+    return future;
+  }
+
   void _ensureListening() {
     assert(!_isDone);
     if (_subscription == null) {
       _subscription =
-          _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
+          _sourceStream.listen(
+              (data) {
+                _addResult(new Result.value(data));
+              },
+              onError: (error, StackTrace stackTrace) {
+                _addResult(new Result.error(error, stackTrace));
+              },
+              onDone: () {
+                _subscription = null;
+                this._close();
+              });
     } else {
       _subscription.resume();
     }
   }
 
-  /// Removes all requests and closes them.
-  ///
-  /// Used when the source stream is done.
-  /// After this, no further requests will be added to the queue,
-  /// requests are immediately served entirely by events already in the event
-  /// queue, if any.
-  void _closeAllRequests() {
-    assert(_isDone);
-    while (_requestQueue.isNotEmpty) {
-      var request = _requestQueue.removeFirst();
-      if (!request.addEvents(_eventQueue)) {
-        request.close(_eventQueue);
-      }
-    }
+  void _pause() {
+    _subscription.pause();
   }
 
-  /// Matches events with requests.
-  ///
-  /// Called after receiving an event.
-  void _checkQueues() {
-    while (_requestQueue.isNotEmpty) {
-      if (_requestQueue.first.addEvents(_eventQueue)) {
-        _requestQueue.removeFirst();
-      } else {
-        return;
-      }
-    }
-    if (!_isDone) {
-      _subscription.pause();
-    }
-  }
-
-  /// Extracts the subscription and makes this stream queue unusable.
-  ///
-  /// Can only be used by the very last request.
-  StreamSubscription _dispose() {
+  Stream<T> _extractStream() {
     assert(_isClosed);
+    if (_isDone) {
+      return new Stream<T>.empty();
+    }
+
+    if (_subscription == null) {
+      return _sourceStream;
+    }
+
     var subscription = _subscription;
     _subscription = null;
     _isDone = true;
-    return subscription;
+
+    var wasPaused = subscription.isPaused;
+    var result = new SubscriptionStream<T>(subscription);
+    // Resume after creating stream because that pauses the subscription too.
+    // This way there won't be a short resumption in the middle.
+    if (wasPaused) subscription.resume();
+    return result;
   }
 }
 
+
 /// Request object that receives events when they arrive, until fulfilled.
 ///
 /// Each request that cannot be fulfilled immediately is represented by
@@ -367,7 +427,7 @@
 abstract class _EventRequest {
   /// Handle available events.
   ///
-  /// The available events are provided as a queue. The `addEvents` function
+  /// The available events are provided as a queue. The `update` function
   /// should only remove events from the front of the event queue, e.g.,
   /// using [removeFirst].
   ///
@@ -382,22 +442,10 @@
   /// This method is called when a request reaches the front of the request
   /// queue, and if it returns `false`, it's called again every time a new event
   /// becomes available, or when the stream closes.
-  bool addEvents(Queue<Result> events);
-
-  /// Complete the request.
-  ///
-  /// This is called when the source stream is done before the request
-  /// had a chance to receive all its events. That is, after a call
-  /// to [addEvents] has returned `false`.
-  /// If there are any unused events available, they are in the [events] queue.
-  /// No further events will become available.
-  ///
-  /// The queue should only remove events from the front of the event queue,
-  /// e.g., using [removeFirst].
-  ///
-  /// If the request kept events in the queue after an [addEvents] call,
-  /// this is the last chance to use them.
-  void close(Queue<Result> events);
+  /// If the function returns `false` when the stream has already closed
+  /// ([isDone] is true), then the request must call
+  /// [StreamQueue._updateRequests] itself when it's ready to continue.
+  bool update(Queue<Result> events, bool isDone);
 }
 
 /// Request for a [StreamQueue.next] call.
@@ -412,16 +460,18 @@
 
   Future<T> get future => _completer.future;
 
-  bool addEvents(Queue<Result> events) {
-    if (events.isEmpty) return false;
-    events.removeFirst().complete(_completer);
-    return true;
-  }
-
-  void close(Queue<Result> events) {
-    var errorFuture =
-        new Future.sync(() => throw new StateError("No elements"));
-    _completer.complete(errorFuture);
+  bool update(Queue<Result> events, bool isDone) {
+    if (events.isNotEmpty) {
+      events.removeFirst().complete(_completer);
+      return true;
+    }
+    if (isDone) {
+      var errorFuture =
+          new Future.sync(() => throw new StateError("No elements"));
+      _completer.complete(errorFuture);
+      return true;
+    }
+    return false;
   }
 }
 
@@ -443,22 +493,22 @@
   /// The future completed when the correct number of events have been skipped.
   Future get future => _completer.future;
 
-  bool addEvents(Queue<Result> events) {
+  bool update(Queue<Result> events, bool isDone) {
     while (_eventsToSkip > 0) {
-      if (events.isEmpty) return false;
+      if (events.isEmpty) {
+        if (isDone) break;
+        return false;
+      }
       _eventsToSkip--;
+
       var event = events.removeFirst();
       if (event.isError) {
         event.complete(_completer);
         return true;
       }
     }
-    _completer.complete(0);
-    return true;
-  }
-
-  void close(Queue<Result> events) {
     _completer.complete(_eventsToSkip);
+    return true;
   }
 }
 
@@ -481,9 +531,13 @@
   /// The future completed when the correct number of events have been captured.
   Future get future => _completer.future;
 
-  bool addEvents(Queue<Result> events) {
+  bool update(Queue<Result> events, bool isDone) {
     while (_list.length < _eventsToTake) {
-      if (events.isEmpty) return false;
+      if (events.isEmpty) {
+        if (isDone) break;
+        return false;
+      }
+
       var result = events.removeFirst();
       if (result.isError) {
         result.complete(_completer);
@@ -494,10 +548,6 @@
     _completer.complete(_list);
     return true;
   }
-
-  void close(Queue<Result> events) {
-    _completer.complete(_list);
-  }
 }
 
 /// Request for a [StreamQueue.cancel] call.
@@ -520,22 +570,14 @@
   /// The future completed when the cancel request is completed.
   Future get future => _completer.future;
 
-  bool addEvents(Queue<Result> events) {
-    _shutdown();
-    return true;
-  }
-
-  void close(_) {
-    _shutdown();
-  }
-
-  void _shutdown() {
+  bool update(Queue<Result> events, bool isDone) {
     if (_streamQueue._isDone) {
       _completer.complete();
     } else {
       _streamQueue._ensureListening();
-      _completer.complete(_streamQueue._dispose().cancel());
+      _completer.complete(_streamQueue._extractStream().listen(null).cancel());
     }
+    return true;
   }
 }
 
@@ -559,21 +601,12 @@
   /// The stream which will contain the remaining events of [_streamQueue].
   Stream<T> get stream => _completer.stream;
 
-  bool addEvents(Queue<Result> events) {
-    _completeStream(events);
-    return true;
-  }
-
-  void close(Queue<Result> events) {
-    _completeStream(events);
-  }
-
-  void _completeStream(Queue<Result> events) {
+  bool update(Queue<Result> events, bool isDone) {
     if (events.isEmpty) {
       if (_streamQueue._isDone) {
         _completer.setEmpty();
       } else {
-        _completer.setSourceStream(_getRestStream());
+        _completer.setSourceStream(_streamQueue._extractStream());
       }
     } else {
       // There are prefetched events which needs to be added before the
@@ -582,26 +615,11 @@
       for (var event in events) {
         event.addTo(controller);
       }
-      controller.addStream(_getRestStream(), cancelOnError: false)
+      controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
                 .whenComplete(controller.close);
       _completer.setSourceStream(controller.stream);
     }
-  }
-
-  /// Create a stream from the rest of [_streamQueue]'s subscription.
-  Stream _getRestStream() {
-    if (_streamQueue._isDone) {
-      var controller = new StreamController<T>()..close();
-      return controller.stream;
-      // TODO(lrn). Use the following when 1.11 is released.
-      // return new Stream<T>.empty();
-    }
-    if (_streamQueue._subscription == null) {
-      return _streamQueue._sourceStream;
-    }
-    var subscription = _streamQueue._dispose();
-    subscription.resume();
-    return new SubscriptionStream<T>(subscription);
+    return true;
   }
 }
 
@@ -616,15 +634,15 @@
 
   Future<bool> get future => _completer.future;
 
-  bool addEvents(Queue<Result> events) {
+  bool update(Queue<Result> events, bool isDone) {
     if (events.isNotEmpty) {
       _completer.complete(true);
       return true;
     }
+    if (isDone) {
+      _completer.complete(false);
+      return true;
+    }
     return false;
   }
-
-  void close(_) {
-    _completer.complete(false);
-  }
 }