dartfmt (with 1.23.x)
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index 96e1a50..af52cd0 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -42,8 +42,7 @@
/// An ephemeral cache guarantees that a callback function will only be
/// executed at most once concurrently. This is useful for requests for which
/// data is updated frequently but stale data is acceptable.
- factory AsyncCache.ephemeral() =>
- new AsyncCache(Duration.ZERO);
+ factory AsyncCache.ephemeral() => new AsyncCache(Duration.ZERO);
/// Creates a cache that invalidates its contents after [duration] has passed.
///
diff --git a/lib/src/byte_collector.dart b/lib/src/byte_collector.dart
index a8d2dbf..3c1d49d 100644
--- a/lib/src/byte_collector.dart
+++ b/lib/src/byte_collector.dart
@@ -29,9 +29,10 @@
/// an eight-bit unsigned value in the resulting list.
CancelableOperation<Uint8List> collectBytesCancelable(
Stream<List<int>> source) {
- return _collectBytes(source, (subscription, result) =>
- new CancelableOperation.fromFuture(result, onCancel: subscription.cancel)
- );
+ return _collectBytes(
+ source,
+ (subscription, result) => new CancelableOperation.fromFuture(result,
+ onCancel: subscription.cancel));
}
/// Generalization over [collectBytes] and [collectBytesCancelable].
@@ -41,8 +42,8 @@
/// so it can cancel the operation.
T _collectBytes<T>(
Stream<List<int>> source,
- T result(StreamSubscription<List<int>> subscription,
- Future<Uint8List> result)) {
+ T result(
+ StreamSubscription<List<int>> subscription, Future<Uint8List> result)) {
var byteLists = <List<int>>[];
var length = 0;
var completer = new Completer<Uint8List>.sync();
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index a9663c1..2bf0f3b 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -44,8 +44,8 @@
/// This is like `value.asStream()`, but if a subscription to the stream is
/// canceled, this is as well.
Stream<T> asStream() {
- var controller = new StreamController<T>(
- sync: true, onCancel: _completer._cancel);
+ var controller =
+ new StreamController<T>(sync: true, onCancel: _completer._cancel);
value.then((value) {
controller.add(value);
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index f746190..129ad49 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -25,13 +25,13 @@
Stream<T> asStream() => _future.asStream();
Future<T> catchError(Function onError, {bool test(Object error)}) =>
- _future.catchError(onError, test: test);
+ _future.catchError(onError, test: test);
Future<S> then<S>(FutureOr<S> onValue(T value), {Function onError}) =>
- _future.then(onValue, onError: onError);
+ _future.then(onValue, onError: onError);
Future<T> whenComplete(action()) => _future.whenComplete(action);
Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
- _future.timeout(timeLimit, onTimeout: onTimeout);
+ _future.timeout(timeLimit, onTimeout: onTimeout);
}
diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart
index 0823412..e7575d8 100644
--- a/lib/src/delegate/stream_subscription.dart
+++ b/lib/src/delegate/stream_subscription.dart
@@ -23,8 +23,7 @@
/// regardless of its original generic type, by asserting that its events are
/// instances of `T` whenever they're provided. If they're not, the
/// subscription throws a [CastError].
- static StreamSubscription<T> typed<T>(
- StreamSubscription subscription) =>
+ static StreamSubscription<T> typed<T>(StreamSubscription subscription) =>
subscription is StreamSubscription<T>
? subscription
: new TypeSafeStreamSubscription<T>(subscription);
@@ -51,8 +50,7 @@
Future cancel() => _source.cancel();
- Future<E> asFuture<E>([E futureValue]) =>
- _source.asFuture(futureValue);
+ Future<E> asFuture<E>([E futureValue]) => _source.asFuture(futureValue);
bool get isPaused => _source.isPaused;
}
diff --git a/lib/src/future_group.dart b/lib/src/future_group.dart
index 9b85c2f..0bf3158 100644
--- a/lib/src/future_group.dart
+++ b/lib/src/future_group.dart
@@ -47,6 +47,7 @@
}
return _onIdleController.stream;
}
+
StreamController _onIdleController;
/// The values emitted by the futures that have been added to the group, in
@@ -93,4 +94,3 @@
_completer.complete(_values);
}
}
-
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index d565b4d..f07c387 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -26,9 +26,7 @@
}
StreamSubscription<T> listen(void onData(T event),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
if (_callback == null) {
throw new StateError("Stream has already been listened to.");
}
diff --git a/lib/src/result.dart b/lib/src/result.dart
index 7c45846..49457c3 100644
--- a/lib/src/result.dart
+++ b/lib/src/result.dart
@@ -78,8 +78,7 @@
/// Errors have been converted to an [ErrorResult] value.
static Future<Result<T>> capture<T>(Future<T> future) {
return future.then((value) => new ValueResult(value),
- onError: (error, stackTrace) =>
- new ErrorResult<T>(error, stackTrace));
+ onError: (error, stackTrace) => new ErrorResult<T>(error, stackTrace));
}
/// Release the result of a captured future.
@@ -115,8 +114,7 @@
/// result with the value is returned.
static Result<T> flatten<T>(Result<Result<T>> result) {
if (result.isValue) return result.asValue.value;
- return new ErrorResult<T>(
- result.asError.error, result.asError.stackTrace);
+ return new ErrorResult<T>(result.asError.error, result.asError.stackTrace);
}
/// Whether this result is a value result.
diff --git a/lib/src/result/future.dart b/lib/src/result/future.dart
index 209e8b1..749b101 100644
--- a/lib/src/result/future.dart
+++ b/lib/src/result/future.dart
@@ -29,6 +29,5 @@
return resultFuture;
}
- ResultFuture._(Future<T> future)
- : super(future);
+ ResultFuture._(Future<T> future) : super(future);
}
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index e01efac..fcd6b06 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -18,8 +18,8 @@
Stream<T> bind(Stream<S> stream) {
var subscription;
- var controller = new StreamController<T>(sync: true,
- onCancel: () => subscription.cancel());
+ var controller = new StreamController<T>(
+ sync: true, onCancel: () => subscription.cancel());
subscription = stream.listen((value) {
// TODO(nweiz): When we release a new major version, get rid of the second
// type parameter and avoid this conversion.
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index 2bff6a5..4311de5 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -36,8 +36,7 @@
/// instead contain just that error.
static Stream<T> fromFuture<T>(Future<Stream<T>> streamFuture) {
var completer = new StreamCompleter<T>();
- streamFuture.then(completer.setSourceStream,
- onError: completer.setError);
+ streamFuture.then(completer.setSourceStream, onError: completer.setError);
return completer.stream;
}
@@ -118,23 +117,21 @@
Stream<T> _sourceStream;
StreamSubscription<T> listen(onData(T data),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
if (_controller == null) {
if (_sourceStream != null && !_sourceStream.isBroadcast) {
// If the source stream is itself single subscription,
// just listen to it directly instead of creating a controller.
- return _sourceStream.listen(onData, onError: onError, onDone: onDone,
- cancelOnError: cancelOnError);
+ return _sourceStream.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
_createController();
if (_sourceStream != null) {
_linkStreamToController();
}
}
- return _controller.stream.listen(onData, onError: onError, onDone: onDone,
- cancelOnError: cancelOnError);
+ return _controller.stream.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
/// Whether a source stream has been set.
@@ -161,8 +158,9 @@
void _linkStreamToController() {
assert(_controller != null);
assert(_sourceStream != null);
- _controller.addStream(_sourceStream, cancelOnError: false)
- .whenComplete(_controller.close);
+ _controller
+ .addStream(_sourceStream, cancelOnError: false)
+ .whenComplete(_controller.close);
}
/// Sets an empty source stream.
@@ -174,7 +172,7 @@
if (_controller == null) {
_createController();
}
- _sourceStream = _controller.stream; // Mark stream as set.
+ _sourceStream = _controller.stream; // Mark stream as set.
_controller.close();
}
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 4aa448e..6361a5c 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -73,9 +73,7 @@
/// Creates a new stream group where [stream] is a broadcast stream.
StreamGroup.broadcast() {
_controller = new StreamController<T>.broadcast(
- onListen: _onListen,
- onCancel: _onCancelBroadcast,
- sync: true);
+ onListen: _onListen, onCancel: _onCancelBroadcast, sync: true);
}
/// Adds [stream] as a member of this group.
@@ -193,10 +191,8 @@
///
/// This will pause the resulting subscription if [this] is paused.
StreamSubscription<T> _listenToStream(Stream<T> stream) {
- var subscription = stream.listen(
- _controller.add,
- onError: _controller.addError,
- onDone: () => remove(stream));
+ var subscription = stream.listen(_controller.add,
+ onError: _controller.addError, onDone: () => remove(stream));
if (_state == _StreamGroupState.paused) subscription.pause();
return subscription;
}
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index f578081..b9023ab 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -131,7 +131,6 @@
throw _failClosed();
}
-
/// Look at the next [count] data events without consuming them.
///
/// Works like [take] except that the events are left in the queue.
@@ -495,7 +494,6 @@
}
}
-
/// The default implementation of [StreamQueue].
///
/// This queue gets its events from a stream which is listened
@@ -523,18 +521,14 @@
void _ensureListening() {
if (_isDone) return;
if (_subscription == null) {
- _subscription =
- _sourceStream.listen(
- (data) {
- _addResult(new Result.value(data));
- },
- onError: (error, StackTrace stackTrace) {
- _addResult(new Result.error(error, stackTrace));
- },
- onDone: () {
- _subscription = null;
- this._close();
- });
+ _subscription = _sourceStream.listen((data) {
+ _addResult(new Result.value(data));
+ }, onError: (error, StackTrace stackTrace) {
+ _addResult(new Result.error(error, stackTrace));
+ }, onDone: () {
+ _subscription = null;
+ this._close();
+ });
} else {
_subscription.resume();
}
@@ -650,8 +644,8 @@
queue._cancel();
}
- assert((_parent._requestQueue.first as _TransactionRequest)
- .transaction == this);
+ assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
+ this);
_parent._requestQueue.removeFirst();
_parent._updateRequests();
}
@@ -722,15 +716,14 @@
return true;
}
if (isDone) {
- _completer.completeError(new StateError("No elements"),
- StackTrace.current);
+ _completer.completeError(
+ new StateError("No elements"), StackTrace.current);
return true;
}
return false;
}
}
-
/// Request for a [StreamQueue.peek] call.
///
/// Completes the returned future when receiving the first event,
@@ -749,15 +742,14 @@
return true;
}
if (isDone) {
- _completer.completeError(new StateError("No elements"),
- StackTrace.current);
+ _completer.completeError(
+ new StateError("No elements"), StackTrace.current);
return true;
}
return false;
}
}
-
/// Request for a [StreamQueue.skip] call.
class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
@@ -815,7 +807,6 @@
Future<List<T>> get future => _completer.future;
}
-
/// Request for a [StreamQueue.take] call.
class _TakeRequest<T> extends _ListRequest<T> {
_TakeRequest(int eventsToTake) : super(eventsToTake);
@@ -839,7 +830,6 @@
}
}
-
/// Request for a [StreamQueue.lookAhead] call.
class _LookAheadRequest<T> extends _ListRequest<T> {
_LookAheadRequest(int eventsToTake) : super(eventsToTake);
@@ -862,7 +852,6 @@
}
}
-
/// Request for a [StreamQueue.cancel] call.
///
/// The request needs no events, it just waits in the request queue
@@ -871,6 +860,7 @@
class _CancelRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the `cancel` call.
final _completer = new Completer();
+
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
@@ -926,8 +916,9 @@
for (var event in events) {
event.addTo(controller);
}
- controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
- .whenComplete(controller.close);
+ controller
+ .addStream(_streamQueue._extractStream(), cancelOnError: false)
+ .whenComplete(controller.close);
_completer.setSourceStream(controller.stream);
}
return true;
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index f8056c7..4219126 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -36,11 +36,9 @@
///
/// If the future completes with an error, the returned sink will instead
/// be closed. Its [Sink.done] future will contain the error.
- static StreamSink<T> fromFuture<T>(
- Future<StreamSink<T>> sinkFuture) {
+ static StreamSink<T> fromFuture<T>(Future<StreamSink<T>> sinkFuture) {
var completer = new StreamSinkCompleter<T>();
- sinkFuture.then(completer.setDestinationSink,
- onError: completer.setError);
+ sinkFuture.then(completer.setDestinationSink, onError: completer.setError);
return completer.sink;
}
diff --git a/lib/src/stream_sink_transformer.dart b/lib/src/stream_sink_transformer.dart
index 65adda6..503d28a 100644
--- a/lib/src/stream_sink_transformer.dart
+++ b/lib/src/stream_sink_transformer.dart
@@ -24,8 +24,7 @@
/// This is equivalent to piping all events from the outer sink through a
/// stream transformed by [transformer] and from there into the inner sink.
const factory StreamSinkTransformer.fromStreamTransformer(
- StreamTransformer<S, T> transformer) =
- StreamTransformerWrapper<S, T>;
+ StreamTransformer<S, T> transformer) = StreamTransformerWrapper<S, T>;
/// Creates a [StreamSinkTransformer] that delegates events to the given
/// handlers.
@@ -34,8 +33,8 @@
/// They're called for each incoming event, and any actions on the sink
/// they're passed are forwarded to the inner sink. If a handler is omitted,
/// the event is passed through unaltered.
- factory StreamSinkTransformer.fromHandlers({
- void handleData(S data, EventSink<T> sink),
+ factory StreamSinkTransformer.fromHandlers(
+ {void handleData(S data, EventSink<T> sink),
void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
void handleDone(EventSink<T> sink)}) {
return new HandlerTransformer<S, T>(handleData, handleError, handleDone);
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
index 8cc3d01..dba8240 100644
--- a/lib/src/stream_sink_transformer/handler_transformer.dart
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -28,8 +28,7 @@
/// The handler for done events.
final HandleDone<T> _handleDone;
- HandlerTransformer(
- this._handleData, this._handleError, this._handleDone);
+ HandlerTransformer(this._handleData, this._handleError, this._handleDone);
StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink);
}
diff --git a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
index 83d2b19..32ac648 100644
--- a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
+++ b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
@@ -30,17 +30,16 @@
Future get done => _inner.done;
- _StreamTransformerWrapperSink(StreamTransformer<S, T> transformer,
- this._inner) {
- _controller.stream.transform(transformer).listen(
- _inner.add,
- onError: _inner.addError,
- onDone: () {
- // Ignore any errors that come from this call to [_inner.close]. The
- // user can access them through [done] or the value returned from
- // [this.close], and we don't want them to get top-leveled.
- _inner.close().catchError((_) {});
- });
+ _StreamTransformerWrapperSink(
+ StreamTransformer<S, T> transformer, this._inner) {
+ _controller.stream
+ .transform(transformer)
+ .listen(_inner.add, onError: _inner.addError, onDone: () {
+ // Ignore any errors that come from this call to [_inner.close]. The
+ // user can access them through [done] or the value returned from
+ // [this.close], and we don't want them to get top-leveled.
+ _inner.close().catchError((_) {});
+ });
}
void add(S event) {
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index 3d8f994..6cec98c 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -57,8 +57,7 @@
///
/// [count] defaults to 2. This is the same as creating [count] branches and
/// then closing the [StreamSplitter].
- static List<Stream<T>> splitFrom<T>(Stream<T> stream,
- [int count]) {
+ static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) {
if (count == null) count = 2;
var splitter = new StreamSplitter<T>(stream);
var streams = new List<Stream>.generate(count, (_) => splitter.split());
@@ -77,9 +76,7 @@
}
var controller = new StreamController<T>(
- onListen: _onListen,
- onPause: _onPause,
- onResume: _onResume);
+ onListen: _onListen, onPause: _onPause, onResume: _onResume);
controller.onCancel = () => _onCancel(controller);
for (var result in _buffer) {
@@ -147,8 +144,8 @@
// wasn't paused, this will be a no-op.
_subscription.resume();
} else {
- _subscription = _stream.listen(
- _onData, onError: _onError, onDone: _onDone);
+ _subscription =
+ _stream.listen(_onData, onError: _onError, onDone: _onDone);
}
}
diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart
index ad54539..1443b18 100644
--- a/lib/src/stream_subscription_transformer.dart
+++ b/lib/src/stream_subscription_transformer.dart
@@ -35,12 +35,14 @@
return new _TransformedSubscription(
stream.listen(null, cancelOnError: cancelOnError),
handleCancel ?? (inner) => inner.cancel(),
- handlePause ?? (inner) {
- inner.pause();
- },
- handleResume ?? (inner) {
- inner.resume();
- });
+ handlePause ??
+ (inner) {
+ inner.pause();
+ },
+ handleResume ??
+ (inner) {
+ inner.resume();
+ });
});
}
@@ -61,8 +63,8 @@
bool get isPaused => _inner?.isPaused ?? false;
- _TransformedSubscription(this._inner, this._handleCancel, this._handlePause,
- this._handleResume);
+ _TransformedSubscription(
+ this._inner, this._handleCancel, this._handlePause, this._handleResume);
void onData(void handleData(T data)) {
_inner?.onData(handleData);
@@ -77,15 +79,15 @@
}
Future cancel() => _cancelMemoizer.runOnce(() {
- var inner = _inner;
- _inner.onData(null);
- _inner.onDone(null);
+ var inner = _inner;
+ _inner.onData(null);
+ _inner.onDone(null);
- // Setting onError to null will cause errors to be top-leveled.
- _inner.onError((_, __) {});
- _inner = null;
- return _handleCancel(inner);
- });
+ // Setting onError to null will cause errors to be top-leveled.
+ _inner.onError((_, __) {});
+ _inner = null;
+ return _handleCancel(inner);
+ });
final _cancelMemoizer = new AsyncMemoizer();
void pause([Future resumeFuture]) {
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index a65840c..3d5a811 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -17,10 +17,8 @@
StreamZip(Iterable<Stream<T>> streams) : _streams = streams;
- StreamSubscription<List<T>> listen(void onData(List<T> data), {
- Function onError,
- void onDone(),
- bool cancelOnError}) {
+ StreamSubscription<List<T>> listen(void onData(List<T> data),
+ {Function onError, void onDone(), bool cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
var subscriptions = <StreamSubscription<T>>[];
StreamController<List<T>> controller;
@@ -72,8 +70,9 @@
try {
for (var stream in _streams) {
int index = subscriptions.length;
- subscriptions.add(stream.listen(
- (data) { handleData(index, data); },
+ subscriptions.add(stream.listen((data) {
+ handleData(index, data);
+ },
onError: cancelOnError ? handleError : handleErrorCancel,
onDone: handleDone,
cancelOnError: cancelOnError));
@@ -87,34 +86,28 @@
current = new List(subscriptions.length);
- controller = new StreamController<List<T>>(
- onPause: () {
- for (int i = 0; i < subscriptions.length; i++) {
- // This may pause some subscriptions more than once.
- // These will not be resumed by onResume below, but must wait for the
- // next round.
- subscriptions[i].pause();
- }
- },
- onResume: () {
- for (int i = 0; i < subscriptions.length; i++) {
- subscriptions[i].resume();
- }
- },
- onCancel: () {
- for (int i = 0; i < subscriptions.length; i++) {
- // Canceling more than once is safe.
- subscriptions[i].cancel();
- }
+ controller = new StreamController<List<T>>(onPause: () {
+ for (int i = 0; i < subscriptions.length; i++) {
+ // This may pause some subscriptions more than once.
+ // These will not be resumed by onResume below, but must wait for the
+ // next round.
+ subscriptions[i].pause();
}
- );
+ }, onResume: () {
+ for (int i = 0; i < subscriptions.length; i++) {
+ subscriptions[i].resume();
+ }
+ }, onCancel: () {
+ for (int i = 0; i < subscriptions.length; i++) {
+ // Canceling more than once is safe.
+ subscriptions[i].cancel();
+ }
+ });
if (subscriptions.isEmpty) {
controller.close();
}
return controller.stream.listen(onData,
- onError: onError,
- onDone: onDone,
- cancelOnError: cancelOnError);
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 50ca81b..a235663 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -30,7 +30,7 @@
/// stream. That may be an issue if `subscription` was made to cancel on
/// an error.
SubscriptionStream(StreamSubscription<T> subscription)
- : _source = subscription {
+ : _source = subscription {
_source.pause();
// Clear callbacks to avoid keeping them alive unnecessarily.
_source.onData(null);
@@ -39,9 +39,7 @@
}
StreamSubscription<T> listen(void onData(T event),
- {Function onError,
- void onDone(),
- bool cancelOnError}) {
+ {Function onError, void onDone(), bool cancelOnError}) {
if (_source == null) {
throw new StateError("Stream has already been listened to.");
}
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
index 235b205..5b1f2ff 100644
--- a/lib/src/typed/stream.dart
+++ b/lib/src/typed/stream.dart
@@ -30,11 +30,11 @@
onListen: onListen == null
? null
: (subscription) =>
- onListen(new TypeSafeStreamSubscription<T>(subscription)),
+ onListen(new TypeSafeStreamSubscription<T>(subscription)),
onCancel: onCancel == null
? null
: (subscription) =>
- onCancel(new TypeSafeStreamSubscription<T>(subscription))));
+ onCancel(new TypeSafeStreamSubscription<T>(subscription))));
}
Stream<E> asyncExpand<E>(Stream<E> convert(T event)) =>
@@ -48,8 +48,7 @@
? null
: (previous, next) => equals(previous as T, next as T)));
- Future<E> drain<E>([E futureValue]) =>
- _stream.drain(futureValue);
+ Future<E> drain<E>([E futureValue]) => _stream.drain(futureValue);
Stream<S> expand<S>(Iterable<S> convert(T value)) =>
_stream.expand(_validateType(convert));
@@ -63,10 +62,9 @@
Future<T> singleWhere(bool test(T element)) async =>
(await _stream.singleWhere(_validateType(test))) as T;
- Future<S> fold<S>(S initialValue,
- S combine(S previous, T element)) =>
- _stream.fold(initialValue,
- (previous, element) => combine(previous, element as T));
+ Future<S> fold<S>(S initialValue, S combine(S previous, T element)) =>
+ _stream.fold(
+ initialValue, (previous, element) => combine(previous, element as T));
Future forEach(void action(T element)) =>
_stream.forEach(_validateType(action));
@@ -79,8 +77,7 @@
new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData),
onError: onError, onDone: onDone, cancelOnError: cancelOnError));
- Stream<S> map<S>(S convert(T event)) =>
- _stream.map(_validateType(convert));
+ Stream<S> map<S>(S convert(T event)) => _stream.map(_validateType(convert));
// Don't forward to `_stream.pipe` because we want the consumer to see the
// type-asserted stream.
@@ -88,8 +85,8 @@
consumer.addStream(this).then((_) => consumer.close());
Future<T> reduce(T combine(T previous, T element)) async {
- var result = await _stream.reduce(
- (previous, element) => combine(previous as T, element as T));
+ var result = await _stream
+ .reduce((previous, element) => combine(previous as T, element as T));
return result as T;
}
@@ -100,20 +97,17 @@
new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) =>
- new TypeSafeStream<T>(_stream.timeout(
- timeLimit,
+ new TypeSafeStream<T>(_stream.timeout(timeLimit,
onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink))));
Future<List<T>> toList() async =>
DelegatingList.typed<T>(await _stream.toList());
- Future<Set<T>> toSet() async =>
- DelegatingSet.typed<T>(await _stream.toSet());
+ Future<Set<T>> toSet() async => DelegatingSet.typed<T>(await _stream.toSet());
// Don't forward to `_stream.transform` because we want the transformer to see
// the type-asserted stream.
- Stream<S> transform<S>(
- StreamTransformer<T, S> transformer) =>
+ Stream<S> transform<S>(StreamTransformer<T, S> transformer) =>
transformer.bind(this);
Stream<T> where(bool test(T element)) =>
@@ -132,7 +126,6 @@
/// Returns a version of [function] that asserts that its argument is an
/// instance of `T`.
- UnaryFunction<dynamic, S> _validateType<S>(
- S function(T value)) =>
+ UnaryFunction<dynamic, S> _validateType<S>(S function(T value)) =>
function == null ? null : (value) => function(value as T);
}
diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart
index e02e1c0..0fab039 100644
--- a/lib/src/typed/stream_subscription.dart
+++ b/lib/src/typed/stream_subscription.dart
@@ -33,6 +33,5 @@
Future cancel() => _subscription.cancel();
- Future<E> asFuture<E>([E futureValue]) =>
- _subscription.asFuture(futureValue);
+ Future<E> asFuture<E>([E futureValue]) => _subscription.asFuture(futureValue);
}
diff --git a/pubspec.yaml b/pubspec.yaml
index adb2c82..70c13e9 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,13 +1,13 @@
name: async
-version: 1.13.2
+version: 1.13.3-dev
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
+environment:
+ sdk: ">=1.22.0 <2.0.0"
dependencies:
collection: "^1.5.0"
dev_dependencies:
fake_async: "^0.1.2"
stack_trace: "^1.0.0"
test: "^0.12.0"
-environment:
- sdk: ">=1.22.0 <2.0.0"
diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart
index b81d9f9..747835b 100644
--- a/test/async_cache_test.dart
+++ b/test/async_cache_test.dart
@@ -65,9 +65,11 @@
});
test('should fetch a stream via a callback', () async {
- expect(await cache.fetchStream(expectAsync0(() {
- return new Stream.fromIterable(['1', '2', '3']);
- })).toList(), ['1', '2', '3']);
+ expect(
+ await cache.fetchStream(expectAsync0(() {
+ return new Stream.fromIterable(['1', '2', '3']);
+ })).toList(),
+ ['1', '2', '3']);
});
test('should not fetch stream via callback when a cache exists', () async {
diff --git a/test/byte_collection_test.dart b/test/byte_collection_test.dart
index e3d4f05..b87b074 100644
--- a/test/byte_collection_test.dart
+++ b/test/byte_collection_test.dart
@@ -67,7 +67,7 @@
var sc = new StreamController<List<int>>();
var result = collectBytesCancelable(sc.stream);
// Value never completes.
- result.value.whenComplete(expectAsync0((){}, count: 0));
+ result.value.whenComplete(expectAsync0(() {}, count: 0));
expect(sc.hasListener, isTrue);
sc.add([1, 2]);
@@ -77,7 +77,7 @@
await nextTimerTick();
expect(sc.hasListener, isTrue);
result.cancel();
- expect(sc.hasListener, isFalse); // Cancelled immediately.
+ expect(sc.hasListener, isFalse); // Cancelled immediately.
var replacement = await result.valueOrCancellation();
expect(replacement, isNull);
await nextTimerTick();
@@ -87,4 +87,4 @@
});
}
-Future nextTimerTick() => new Future((){});
+Future nextTimerTick() => new Future(() {});
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index f30c134..bded402 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -13,8 +13,8 @@
group("without being canceled", () {
var completer;
setUp(() {
- completer = new CancelableCompleter(
- onCancel: expectAsync0(() {}, count: 0));
+ completer =
+ new CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
});
test("sends values to the future", () {
@@ -74,8 +74,8 @@
test("successfully then with a future", () {
completer.complete(1);
- expect(() => completer.complete(new Completer().future),
- throwsStateError);
+ expect(
+ () => completer.complete(new Completer().future), throwsStateError);
});
test("with a future then successfully", () {
@@ -85,8 +85,8 @@
test("with a future twice", () {
completer.complete(new Completer().future);
- expect(() => completer.complete(new Completer().future),
- throwsStateError);
+ expect(
+ () => completer.complete(new Completer().future), throwsStateError);
});
});
@@ -97,8 +97,8 @@
});
test("forwards errors", () {
- var operation = new CancelableOperation.fromFuture(
- new Future.error("error"));
+ var operation =
+ new CancelableOperation.fromFuture(new Future.error("error"));
expect(operation.value, throwsA("error"));
});
});
@@ -148,24 +148,26 @@
});
test("doesn't call onCancel if the completer has completed", () {
- var completer = new CancelableCompleter(
- onCancel: expectAsync0(() {}, count: 0));
+ var completer =
+ new CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
completer.complete(1);
expect(completer.operation.value, completion(equals(1)));
expect(completer.operation.cancel(), completes);
});
- test("does call onCancel if the completer has completed to an unfired "
+ test(
+ "does call onCancel if the completer has completed to an unfired "
"Future", () {
var completer = new CancelableCompleter(onCancel: expectAsync0(() {}));
completer.complete(new Completer().future);
expect(completer.operation.cancel(), completes);
});
- test("doesn't call onCancel if the completer has completed to a fired "
+ test(
+ "doesn't call onCancel if the completer has completed to a fired "
"Future", () async {
- var completer = new CancelableCompleter(
- onCancel: expectAsync0(() {}, count: 0));
+ var completer =
+ new CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
completer.complete(new Future.value(1));
await completer.operation.value;
expect(completer.operation.cancel(), completes);
@@ -195,7 +197,8 @@
test("valueOrCancellation waits on the onCancel future", () async {
var innerCompleter = new Completer();
- var completer = new CancelableCompleter(onCancel: () => innerCompleter.future);
+ var completer =
+ new CancelableCompleter(onCancel: () => innerCompleter.future);
var fired = false;
completer.operation.valueOrCancellation().then((_) {
@@ -229,8 +232,8 @@
test("cancels the completer when the subscription is canceled", () {
var completer = new CancelableCompleter(onCancel: expectAsync0(() {}));
- var sub = completer.operation.asStream()
- .listen(expectAsync1((_) {}, count: 0));
+ var sub =
+ completer.operation.asStream().listen(expectAsync1((_) {}, count: 0));
completer.operation.value.whenComplete(expectAsync0(() {}, count: 0));
sub.cancel();
expect(completer.isCanceled, isTrue);
diff --git a/test/restartable_timer_test.dart b/test/restartable_timer_test.dart
index b612f6e..4f59f62 100644
--- a/test/restartable_timer_test.dart
+++ b/test/restartable_timer_test.dart
@@ -100,8 +100,7 @@
test("only runs the callback once if the timer isn't reset", () {
new FakeAsync().run((async) {
new RestartableTimer(
- new Duration(seconds: 5),
- expectAsync0(() {}, count: 1));
+ new Duration(seconds: 5), expectAsync0(() {}, count: 1));
async.elapse(new Duration(seconds: 10));
});
});
diff --git a/test/result_test.dart b/test/result_test.dart
index 21e251c..210ae3f 100644
--- a/test/result_test.dart
+++ b/test/result_test.dart
@@ -58,54 +58,60 @@
test("complete with value", () {
Result<int> result = new ValueResult<int>(42);
var c = new Completer<int>();
- c.future.then(expectAsync1((int v) { expect(v, equals(42)); }),
- onError: (e, s) { fail("Unexpected error"); });
+ c.future.then(expectAsync1((int v) {
+ expect(v, equals(42));
+ }), onError: (e, s) {
+ fail("Unexpected error");
+ });
result.complete(c);
});
test("complete with error", () {
Result<bool> result = new ErrorResult("BAD", stack);
var c = new Completer<bool>();
- c.future.then((bool v) { fail("Unexpected value $v"); },
- onError: expectAsync2((e, s) {
- expect(e, equals("BAD"));
- expect(s, same(stack));
- }));
+ c.future.then((bool v) {
+ fail("Unexpected value $v");
+ }, onError: expectAsync2((e, s) {
+ expect(e, equals("BAD"));
+ expect(s, same(stack));
+ }));
result.complete(c);
});
test("add sink value", () {
var result = new ValueResult<int>(42);
- EventSink<int> sink = new TestSink(
- onData: expectAsync1((v) { expect(v, equals(42)); })
- );
+ EventSink<int> sink = new TestSink(onData: expectAsync1((v) {
+ expect(v, equals(42));
+ }));
result.addTo(sink);
});
test("add sink error", () {
Result<bool> result = new ErrorResult("BAD", stack);
- EventSink<bool> sink = new TestSink(
- onError: expectAsync2((e, s) {
- expect(e, equals("BAD"));
- expect(s, same(stack));
- })
- );
+ EventSink<bool> sink = new TestSink(onError: expectAsync2((e, s) {
+ expect(e, equals("BAD"));
+ expect(s, same(stack));
+ }));
result.addTo(sink);
});
test("value as future", () {
Result<int> result = new ValueResult<int>(42);
- result.asFuture.then(expectAsync1((int v) { expect(v, equals(42)); }),
- onError: (e, s) { fail("Unexpected error"); });
+ result.asFuture.then(expectAsync1((int v) {
+ expect(v, equals(42));
+ }), onError: (e, s) {
+ fail("Unexpected error");
+ });
});
test("error as future", () {
Result<bool> result = new ErrorResult("BAD", stack);
- result.asFuture.then((bool v) { fail("Unexpected value $v"); },
- onError: expectAsync2((e, s) {
- expect(e, equals("BAD"));
- expect(s, same(stack));
- }));
+ result.asFuture.then((bool v) {
+ fail("Unexpected value $v");
+ }, onError: expectAsync2((e, s) {
+ expect(e, equals("BAD"));
+ expect(s, same(stack));
+ }));
});
test("capture future value", () {
@@ -169,17 +175,19 @@
test("capture stream", () {
StreamController<int> c = new StreamController<int>();
Stream<Result> stream = Result.captureStream(c.stream);
- var expectedList = new Queue.from([new Result.value(42),
- new Result.error("BAD", stack),
- new Result.value(37)]);
+ var expectedList = new Queue.from([
+ new Result.value(42),
+ new Result.error("BAD", stack),
+ new Result.value(37)
+ ]);
void listener(Result actual) {
expect(expectedList.isEmpty, isFalse);
expectResult(actual, expectedList.removeFirst());
}
- stream.listen(expectAsync1(listener, count: 3),
- onError: (e, s) { fail("Unexpected error: $e"); },
- onDone: expectAsync0((){}),
- cancelOnError: true);
+
+ stream.listen(expectAsync1(listener, count: 3), onError: (e, s) {
+ fail("Unexpected error: $e");
+ }, onDone: expectAsync0(() {}), cancelOnError: true);
c.add(42);
c.addError("BAD", stack);
c.add(37);
@@ -189,12 +197,14 @@
test("release stream", () {
StreamController<Result<int>> c = new StreamController<Result<int>>();
Stream<int> stream = Result.releaseStream(c.stream);
- var events = [new Result<int>.value(42),
- new Result<int>.error("BAD", stack),
- new Result<int>.value(37)];
+ var events = [
+ new Result<int>.value(42),
+ new Result<int>.error("BAD", stack),
+ new Result<int>.value(37)
+ ];
// Expect the data events, and an extra error event.
var expectedList = new Queue.from(events)
- ..add(new Result.error("BAD2", stack));
+ ..add(new Result.error("BAD2", stack));
void dataListener(int v) {
expect(expectedList.isEmpty, isFalse);
@@ -212,32 +222,32 @@
}
stream.listen(expectAsync1(dataListener, count: 2),
- onError: expectAsync2(errorListener, count: 2),
- onDone: expectAsync0((){}));
+ onError: expectAsync2(errorListener, count: 2),
+ onDone: expectAsync0(() {}));
for (Result<int> result in events) {
- c.add(result); // Result value or error in data line.
+ c.add(result); // Result value or error in data line.
}
- c.addError("BAD2", stack); // Error in error line.
+ c.addError("BAD2", stack); // Error in error line.
c.close();
});
test("release stream cancel on error", () {
StreamController<Result<int>> c = new StreamController<Result<int>>();
Stream<int> stream = Result.releaseStream(c.stream);
- stream.listen(expectAsync1((v) { expect(v, equals(42)); }),
- onError: expectAsync2((e, s) {
- expect(e, equals("BAD"));
- expect(s, same(stack));
- }),
- onDone: () { fail("Unexpected done event"); },
- cancelOnError: true);
+ stream.listen(expectAsync1((v) {
+ expect(v, equals(42));
+ }), onError: expectAsync2((e, s) {
+ expect(e, equals("BAD"));
+ expect(s, same(stack));
+ }), onDone: () {
+ fail("Unexpected done event");
+ }, cancelOnError: true);
c.add(new Result.value(42));
c.add(new Result.error("BAD", stack));
c.add(new Result.value(37));
c.close();
});
-
test("flatten error 1", () {
Result<int> error = new Result<int>.error("BAD", stack);
Result<int> flattened =
@@ -292,18 +302,12 @@
test("handle neither unary nor binary", () {
ErrorResult result = new Result.error("error", stack);
- expect(() => result.handle(() => fail("unreachable")),
- throws);
- expect(() => result.handle((a, b, c) => fail("unreachable")),
- throws);
- expect(() => result.handle((a, b, {c}) => fail("unreachable")),
- throws);
- expect(() => result.handle((a, {b}) => fail("unreachable")),
- throws);
- expect(() => result.handle(({a, b}) => fail("unreachable")),
- throws);
- expect(() => result.handle(({a}) => fail("unreachable")),
- throws);
+ expect(() => result.handle(() => fail("unreachable")), throws);
+ expect(() => result.handle((a, b, c) => fail("unreachable")), throws);
+ expect(() => result.handle((a, b, {c}) => fail("unreachable")), throws);
+ expect(() => result.handle((a, {b}) => fail("unreachable")), throws);
+ expect(() => result.handle(({a, b}) => fail("unreachable")), throws);
+ expect(() => result.handle(({a}) => fail("unreachable")), throws);
});
}
@@ -323,17 +327,32 @@
final Function onError;
final Function onDone;
- TestSink({void this.onData(T data) : _nullData,
- void this.onError(e, StackTrace s) : _nullError,
- void this.onDone() : _nullDone });
+ TestSink(
+ {void this.onData(T data): _nullData,
+ void this.onError(e, StackTrace s): _nullError,
+ void this.onDone(): _nullDone});
- void add(T value) { onData(value); }
- void addError(error, [StackTrace stack]) { onError(error, stack); }
- void close() { onDone(); }
+ void add(T value) {
+ onData(value);
+ }
- static void _nullData(value) { fail("Unexpected sink add: $value"); }
+ void addError(error, [StackTrace stack]) {
+ onError(error, stack);
+ }
+
+ void close() {
+ onDone();
+ }
+
+ static void _nullData(value) {
+ fail("Unexpected sink add: $value");
+ }
+
static void _nullError(e, StackTrace s) {
fail("Unexpected sink addError: $e");
}
- static void _nullDone() { fail("Unepxected sink close"); }
+
+ static void _nullDone() {
+ fail("Unepxected sink close");
+ }
}
diff --git a/test/single_subscription_transformer_test.dart b/test/single_subscription_transformer_test.dart
index f21d4e9..74e462b 100644
--- a/test/single_subscription_transformer_test.dart
+++ b/test/single_subscription_transformer_test.dart
@@ -12,8 +12,8 @@
void main() {
test("buffers events as soon as it's bound", () async {
var controller = new StreamController.broadcast();
- var stream = controller.stream.transform(
- const SingleSubscriptionTransformer());
+ var stream =
+ controller.stream.transform(const SingleSubscriptionTransformer());
// Add events before [stream] has a listener to be sure it buffers them.
controller.add(1);
@@ -34,8 +34,8 @@
var controller = new StreamController.broadcast(onCancel: () {
canceled = true;
});
- var stream = controller.stream.transform(
- const SingleSubscriptionTransformer());
+ var stream =
+ controller.stream.transform(const SingleSubscriptionTransformer());
await flushMicrotasks();
expect(canceled, isFalse);
diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
index dea73d7..2e9ff9b 100644
--- a/test/stream_completer_test.dart
+++ b/test/stream_completer_test.dart
@@ -27,9 +27,9 @@
test("cancel before linking a stream doesn't listen on stream", () async {
var completer = new StreamCompleter();
var subscription = completer.stream.listen(null);
- subscription.pause(); // Should be ignored.
+ subscription.pause(); // Should be ignored.
subscription.cancel();
- completer.setSourceStream(new UnusableStream()); // Doesn't throw.
+ completer.setSourceStream(new UnusableStream()); // Doesn't throw.
});
test("listen and pause before linking stream", () async {
@@ -78,14 +78,13 @@
var lastEvent = -1;
var controller = new StreamController();
var subscription;
- subscription = completer.stream.listen(
- (value) {
- expect(value, lessThan(3));
- lastEvent = value;
- if (value == 2) {
- subscription.cancel();
- }
- },
+ subscription = completer.stream.listen((value) {
+ expect(value, lessThan(3));
+ lastEvent = value;
+ if (value == 2) {
+ subscription.cancel();
+ }
+ },
onError: unreachable("error"),
onDone: unreachable("done"),
cancelOnError: true);
@@ -110,20 +109,16 @@
var completer = new StreamCompleter();
completer.setEmpty();
var done = new Completer();
- completer.stream.listen(
- unreachable("data"),
- onError: unreachable("error"),
- onDone: done.complete);
+ completer.stream.listen(unreachable("data"),
+ onError: unreachable("error"), onDone: done.complete);
await done.future;
});
test("complete with setEmpty after listening", () async {
var completer = new StreamCompleter();
var done = new Completer();
- completer.stream.listen(
- unreachable("data"),
- onError: unreachable("error"),
- onDone: done.complete);
+ completer.stream.listen(unreachable("data"),
+ onError: unreachable("error"), onDone: done.complete);
completer.setEmpty();
await done.future;
});
@@ -147,17 +142,13 @@
var completer = new StreamCompleter();
var lastEvent = -1;
var controller = new StreamController();
- completer.stream.listen(
- (value) {
- expect(value, lessThan(3));
- lastEvent = value;
- },
- onError: (value) {
- expect(value, "3");
- lastEvent = value;
- },
- onDone: unreachable("done"),
- cancelOnError: true);
+ completer.stream.listen((value) {
+ expect(value, lessThan(3));
+ lastEvent = value;
+ }, onError: (value) {
+ expect(value, "3");
+ lastEvent = value;
+ }, onDone: unreachable("done"), cancelOnError: true);
completer.setSourceStream(controller.stream);
expect(controller.hasListener, isTrue);
@@ -188,17 +179,13 @@
controller.add(1);
expect(controller.hasListener, isFalse);
- completer.stream.listen(
- (value) {
- expect(value, lessThan(3));
- lastEvent = value;
- },
- onError: (value) {
- expect(value, "3");
- lastEvent = value;
- },
- onDone: unreachable("done"),
- cancelOnError: true);
+ completer.stream.listen((value) {
+ expect(value, lessThan(3));
+ lastEvent = value;
+ }, onError: (value) {
+ expect(value, "3");
+ lastEvent = value;
+ }, onDone: unreachable("done"), cancelOnError: true);
expect(controller.hasListener, isTrue);
@@ -324,8 +311,8 @@
test("asFuture with error accross setting stream", () async {
var completer = new StreamCompleter();
var controller = new StreamController();
- var subscription = completer.stream.listen(unreachable("data"),
- cancelOnError: false);
+ var subscription =
+ completer.stream.listen(unreachable("data"), cancelOnError: false);
var done = subscription.asFuture();
expect(controller.hasListener, isFalse);
completer.setSourceStream(controller.stream);
@@ -341,12 +328,10 @@
group("setError()", () {
test("produces a stream that emits a single error", () {
var completer = new StreamCompleter();
- completer.stream.listen(
- unreachable("data"),
+ completer.stream.listen(unreachable("data"),
onError: expectAsync2((error, stackTrace) {
- expect(error, equals("oh no"));
- }),
- onDone: expectAsync0(() {}));
+ expect(error, equals("oh no"));
+ }), onDone: expectAsync0(() {}));
completer.setError("oh no");
});
@@ -357,12 +342,10 @@
completer.setError("oh no");
await flushMicrotasks();
- completer.stream.listen(
- unreachable("data"),
+ completer.stream.listen(unreachable("data"),
onError: expectAsync2((error, stackTrace) {
- expect(error, equals("oh no"));
- }),
- onDone: expectAsync0(() {}));
+ expect(error, equals("oh no"));
+ }), onDone: expectAsync0(() {}));
});
});
}
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 078dc3c..65ddb42 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -75,22 +75,24 @@
new StreamTransformer.fromHandlers(
handleData: (data, sink) => sink.add("data: $data"),
handleError: (error, _, sink) => sink.add("error: $error")));
- expect(transformed.toList(), completion(equals([
- "data: first",
- "error: second",
- "data: third",
- "error: fourth",
- "error: fifth",
- "data: sixth"
- ])));
+ expect(
+ transformed.toList(),
+ completion(equals([
+ "data: first",
+ "error: second",
+ "data: third",
+ "error: fourth",
+ "error: fifth",
+ "data: sixth"
+ ])));
});
test("emits events once there's a listener", () {
var controller = new StreamController<String>();
streamGroup.add(controller.stream);
- expect(streamGroup.stream.toList(),
- completion(equals(["first", "second"])));
+ expect(
+ streamGroup.stream.toList(), completion(equals(["first", "second"])));
controller.add("first");
controller.add("second");
@@ -137,8 +139,8 @@
var controller = new StreamController<String>.broadcast();
streamGroup.add(controller.stream);
- expect(streamGroup.stream.toList(),
- completion(equals(["first", "second"])));
+ expect(
+ streamGroup.stream.toList(), completion(equals(["first", "second"])));
controller.add("first");
controller.add("second");
@@ -150,8 +152,8 @@
test("forwards cancel errors", () async {
var subscription = streamGroup.stream.listen(null);
- var controller = new StreamController<String>(
- onCancel: () => throw "error");
+ var controller =
+ new StreamController<String>(onCancel: () => throw "error");
streamGroup.add(controller.stream);
await flushMicrotasks();
@@ -162,8 +164,8 @@
var subscription = streamGroup.stream.listen(null);
var completer = new Completer();
- var controller = new StreamController<String>(
- onCancel: () => completer.future);
+ var controller =
+ new StreamController<String>(onCancel: () => completer.future);
streamGroup.add(controller.stream);
await flushMicrotasks();
@@ -178,15 +180,15 @@
expect(fired, isTrue);
});
- test("add() while active pauses the stream if the group is paused, then "
+ test(
+ "add() while active pauses the stream if the group is paused, then "
"resumes once the group resumes", () async {
var subscription = streamGroup.stream.listen(null);
await flushMicrotasks();
var paused = false;
var controller = new StreamController<String>(
- onPause: () => paused = true,
- onResume: () => paused = false);
+ onPause: () => paused = true, onResume: () => paused = false);
subscription.pause();
await flushMicrotasks();
@@ -223,16 +225,16 @@
});
test("forwards cancel errors", () {
- var controller = new StreamController<String>(
- onCancel: () => throw "error");
+ var controller =
+ new StreamController<String>(onCancel: () => throw "error");
expect(streamGroup.add(controller.stream), throwsA("error"));
});
test("forwards a cancel future", () async {
var completer = new Completer();
- var controller = new StreamController<String>(
- onCancel: () => completer.future);
+ var controller =
+ new StreamController<String>(onCancel: () => completer.future);
var fired = false;
streamGroup.add(controller.stream).then((_) => fired = true);
@@ -268,8 +270,8 @@
expect(streamGroup.close(), completes);
- expect(streamGroup.stream.toList(),
- completion(equals(["first", "second"])));
+ expect(
+ streamGroup.stream.toList(), completion(equals(["first", "second"])));
});
test("emits events from multiple sources once there's a listener", () {
@@ -279,8 +281,8 @@
var controller2 = new StreamController<String>();
streamGroup.add(controller2.stream);
- expect(streamGroup.stream.toList(),
- completion(equals(["first", "second"])));
+ expect(
+ streamGroup.stream.toList(), completion(equals(["first", "second"])));
controller1.add("first");
controller2.add("second");
@@ -325,8 +327,8 @@
var controller = new StreamController<String>.broadcast();
streamGroup.add(controller.stream);
- expect(streamGroup.stream.toList(),
- completion(equals(["first", "second"])));
+ expect(
+ streamGroup.stream.toList(), completion(equals(["first", "second"])));
controller.add("first");
controller.add("second");
@@ -356,8 +358,8 @@
test("never cancels single-subscription streams", () async {
var subscription = streamGroup.stream.listen(null);
- var controller = new StreamController<String>(
- onCancel: expectAsync0(() {}, count: 0));
+ var controller =
+ new StreamController<String>(onCancel: expectAsync0(() {}, count: 0));
streamGroup.add(controller.stream);
await flushMicrotasks();
@@ -568,8 +570,8 @@
});
test("forwards cancel errors", () async {
- var controller = new StreamController<String>(
- onCancel: () => throw "error");
+ var controller =
+ new StreamController<String>(onCancel: () => throw "error");
streamGroup.add(controller.stream);
streamGroup.stream.listen(null);
@@ -580,8 +582,8 @@
test("forwards cancel futures", () async {
var completer = new Completer();
- var controller = new StreamController<String>(
- onCancel: () => completer.future);
+ var controller =
+ new StreamController<String>(onCancel: () => completer.future);
streamGroup.stream.listen(null);
await flushMicrotasks();
@@ -641,7 +643,8 @@
expect(streamGroup.stream.toList(), completion(isEmpty));
});
- test("if there are streams, closes the group once those streams close "
+ test(
+ "if there are streams, closes the group once those streams close "
"and there's a listener", () async {
var controller1 = new StreamController<String>();
var controller2 = new StreamController<String>();
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index f487e65..2e4805b 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -113,9 +113,9 @@
test("with bad arguments throws", () async {
var events = new StreamQueue<int>(createStream());
expect(() => events.lookAhead(-1), throwsArgumentError);
- expect(await events.next, 1); // Did not consume event.
+ expect(await events.next, 1); // Did not consume event.
expect(() => events.lookAhead(-1), throwsArgumentError);
- expect(await events.next, 2); // Did not consume event.
+ expect(await events.next, 2); // Did not consume event.
await events.cancel();
});
@@ -153,8 +153,8 @@
test("multiple requests at the same time", () async {
var events = new StreamQueue<int>(createStream());
- var result = await Future.wait(
- [events.next, events.next, events.next, events.next]);
+ var result = await Future
+ .wait([events.next, events.next, events.next, events.next]);
expect(result, [1, 2, 3, 4]);
await events.cancel();
});
@@ -183,9 +183,9 @@
expect(() => events.skip(-1), throwsArgumentError);
// A non-int throws either a type error or an argument error,
// depending on whether it's checked mode or not.
- expect(await events.next, 1); // Did not consume event.
+ expect(await events.next, 1); // Did not consume event.
expect(() => events.skip(-1), throwsArgumentError);
- expect(await events.next, 2); // Did not consume event.
+ expect(await events.next, 2); // Did not consume event.
await events.cancel();
});
@@ -251,14 +251,16 @@
var index = 0;
// Check that futures complete in order.
Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) {
- expect(value, expectedValue);
- expect(index, sequenceIndex);
- index++;
- };
- await Future.wait([skip1.then(sequence(0, 0)),
- skip2.then(sequence(0, 1)),
- skip3.then(sequence(1, 2)),
- skip4.then(sequence(1, 3))]);
+ expect(value, expectedValue);
+ expect(index, sequenceIndex);
+ index++;
+ };
+ await Future.wait([
+ skip1.then(sequence(0, 0)),
+ skip2.then(sequence(0, 1)),
+ skip3.then(sequence(1, 2)),
+ skip4.then(sequence(1, 3))
+ ]);
await events.cancel();
});
});
@@ -291,9 +293,9 @@
test("with bad arguments throws", () async {
var events = new StreamQueue<int>(createStream());
expect(() => events.take(-1), throwsArgumentError);
- expect(await events.next, 1); // Did not consume event.
+ expect(await events.next, 1); // Did not consume event.
expect(() => events.take(-1), throwsArgumentError);
- expect(await events.next, 2); // Did not consume event.
+ expect(await events.next, 2); // Did not consume event.
await events.cancel();
});
@@ -522,7 +524,8 @@
test("cancels the underlying subscription when called before any event",
() async {
var cancelFuture = new Future.value(42);
- var controller = new StreamController<int>(onCancel: () => cancelFuture);
+ var controller =
+ new StreamController<int>(onCancel: () => cancelFuture);
var events = new StreamQueue<int>(controller.stream);
expect(await events.cancel(immediate: true), 42);
@@ -539,8 +542,8 @@
test("returns the result of closing the underlying subscription",
() async {
- var controller = new StreamController<int>(
- onCancel: () => new Future.value(42));
+ var controller =
+ new StreamController<int>(onCancel: () => new Future.value(42));
var events = new StreamQueue<int>(controller.stream);
expect(await events.cancel(immediate: true), 42);
});
@@ -548,8 +551,8 @@
test("listens and then cancels a stream that hasn't been listened to yet",
() async {
var wasListened = false;
- var controller = new StreamController<int>(
- onListen: () => wasListened = true);
+ var controller =
+ new StreamController<int>(onListen: () => wasListened = true);
var events = new StreamQueue<int>(controller.stream);
expect(wasListened, isFalse);
expect(controller.hasListener, isFalse);
@@ -608,7 +611,9 @@
var events = new StreamQueue<int>(controller.stream);
var hasNext;
- events.hasNext.then((result) { hasNext = result; });
+ events.hasNext.then((result) {
+ hasNext = result;
+ });
await flushMicrotasks();
expect(hasNext, isNull);
controller.add(42);
@@ -622,7 +627,9 @@
var events = new StreamQueue<int>(controller.stream);
var hasNext;
- events.hasNext.then((result) { hasNext = result; });
+ events.hasNext.then((result) {
+ hasNext = result;
+ });
await flushMicrotasks();
expect(hasNext, isNull);
controller.addError("BAD");
@@ -976,7 +983,8 @@
}));
});
- test("the parent queue continues from the child position if it returns "
+ test(
+ "the parent queue continues from the child position if it returns "
"true", () async {
await events.withTransaction(expectAsync1((queue) async {
expect(await queue.next, 2);
@@ -986,7 +994,8 @@
expect(await events.next, 3);
});
- test("the parent queue continues from its original position if it returns "
+ test(
+ "the parent queue continues from its original position if it returns "
"false", () async {
await events.withTransaction(expectAsync1((queue) async {
expect(await queue.next, 2);
@@ -1036,12 +1045,15 @@
expect(await events.next, 3);
});
- test("the parent queue continues from the child position if an error is "
+ test(
+ "the parent queue continues from the child position if an error is "
"thrown", () async {
- expect(events.cancelable(expectAsync1((queue) async {
- expect(await queue.next, 2);
- throw "oh no";
- })).value, throwsA("oh no"));
+ expect(
+ events.cancelable(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ throw "oh no";
+ })).value,
+ throwsA("oh no"));
expect(events.next, completion(3));
});
@@ -1057,10 +1069,12 @@
});
test("forwards the value from the callback", () async {
- expect(await events.cancelable(expectAsync1((queue) async {
- expect(await queue.next, 2);
- return "value";
- })).value, "value");
+ expect(
+ await events.cancelable(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ return "value";
+ })).value,
+ "value");
});
});
@@ -1088,8 +1102,9 @@
// `take(10)`.
takeTest(startIndex) {
expect(events.take(10),
- completion(new List.generate(10, (i) => startIndex + i)));
+ completion(new List.generate(10, (i) => startIndex + i)));
}
+
var tests = [nextTest, skipTest, takeTest];
int counter = 0;
@@ -1103,7 +1118,7 @@
}
// Then expect 20 more events as a `rest` call.
expect(events.rest.toList(),
- completion(new List.generate(20, (i) => counter + i)));
+ completion(new List.generate(20, (i) => counter + i)));
});
}
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
index c7ce1ea..3c8b576 100644
--- a/test/stream_sink_completer_test.dart
+++ b/test/stream_sink_completer_test.dart
@@ -289,9 +289,9 @@
test("doesn't allow the destination sink to be set multiple times", () {
completer.setDestinationSink(new TestSink());
- expect(() => completer.setDestinationSink(new TestSink()),
- throwsStateError);
- expect(() => completer.setDestinationSink(new TestSink()),
- throwsStateError);
+ expect(
+ () => completer.setDestinationSink(new TestSink()), throwsStateError);
+ expect(
+ () => completer.setDestinationSink(new TestSink()), throwsStateError);
});
}
diff --git a/test/stream_sink_transformer_test.dart b/test/stream_sink_transformer_test.dart
index 6be9f9e..208a03a 100644
--- a/test/stream_sink_transformer_test.dart
+++ b/test/stream_sink_transformer_test.dart
@@ -19,8 +19,8 @@
test("transforms data events", () {
var transformer = new StreamSinkTransformer.fromStreamTransformer(
new StreamTransformer.fromHandlers(handleData: (i, sink) {
- sink.add(i * 2);
- }));
+ sink.add(i * 2);
+ }));
var sink = transformer.bind(controller.sink);
var results = [];
@@ -38,18 +38,17 @@
var transformer = new StreamSinkTransformer.fromStreamTransformer(
new StreamTransformer.fromHandlers(
handleError: (i, stackTrace, sink) {
- sink.addError((i as num) * 2, stackTrace);
- }));
+ sink.addError((i as num) * 2, stackTrace);
+ }));
var sink = transformer.bind(controller.sink);
var results = [];
controller.stream.listen(expectAsync1((_) {}, count: 0),
onError: (error, stackTrace) {
- results.add(error);
- },
- onDone: expectAsync0(() {
- expect(results, equals([2, 4, 6]));
- }));
+ results.add(error);
+ }, onDone: expectAsync0(() {
+ expect(results, equals([2, 4, 6]));
+ }));
sink.addError(1);
sink.addError(2);
@@ -59,11 +58,10 @@
test("transforms done events", () {
var transformer = new StreamSinkTransformer.fromStreamTransformer(
- new StreamTransformer.fromHandlers(
- handleDone: (sink) {
- sink.add(1);
- sink.close();
- }));
+ new StreamTransformer.fromHandlers(handleDone: (sink) {
+ sink.add(1);
+ sink.close();
+ }));
var sink = transformer.bind(controller.sink);
var results = [];
@@ -90,7 +88,7 @@
expect(doneResult.isComplete, isFalse);
expect(closeResult.isComplete, isFalse);
- // Once the inner sink is completed, the futures should fire.
+ // Once the inner sink is completed, the futures should fire.
innerSink.completer.complete();
await flushMicrotasks();
expect(doneResult.isComplete, isTrue);
@@ -100,8 +98,8 @@
test("doesn't top-level the future from inner.close", () async {
var transformer = new StreamSinkTransformer.fromStreamTransformer(
new StreamTransformer.fromHandlers(handleData: (_, sink) {
- sink.close();
- }));
+ sink.close();
+ }));
var innerSink = new CompleterStreamSink();
var sink = transformer.bind(innerSink);
@@ -119,10 +117,10 @@
group("fromHandlers", () {
test("transforms data events", () {
- var transformer = new StreamSinkTransformer.fromHandlers(
- handleData: (i, sink) {
- sink.add(i * 2);
- });
+ var transformer =
+ new StreamSinkTransformer.fromHandlers(handleData: (i, sink) {
+ sink.add(i * 2);
+ });
var sink = transformer.bind(controller.sink);
var results = [];
@@ -139,18 +137,17 @@
test("transforms error events", () {
var transformer = new StreamSinkTransformer.fromHandlers(
handleError: (i, stackTrace, sink) {
- sink.addError((i as num) * 2, stackTrace);
- });
+ sink.addError((i as num) * 2, stackTrace);
+ });
var sink = transformer.bind(controller.sink);
var results = [];
controller.stream.listen(expectAsync1((_) {}, count: 0),
onError: (error, stackTrace) {
- results.add(error);
- },
- onDone: expectAsync0(() {
- expect(results, equals([2, 4, 6]));
- }));
+ results.add(error);
+ }, onDone: expectAsync0(() {
+ expect(results, equals([2, 4, 6]));
+ }));
sink.addError(1);
sink.addError(2);
@@ -159,11 +156,11 @@
});
test("transforms done events", () {
- var transformer = new StreamSinkTransformer.fromHandlers(
- handleDone: (sink) {
- sink.add(1);
- sink.close();
- });
+ var transformer =
+ new StreamSinkTransformer.fromHandlers(handleDone: (sink) {
+ sink.add(1);
+ sink.close();
+ });
var sink = transformer.bind(controller.sink);
var results = [];
@@ -189,7 +186,7 @@
expect(doneResult.isComplete, isFalse);
expect(closeResult.isComplete, isFalse);
- // Once the inner sink is completed, the futures should fire.
+ // Once the inner sink is completed, the futures should fire.
innerSink.completer.complete();
await flushMicrotasks();
expect(doneResult.isComplete, isTrue);
@@ -197,10 +194,10 @@
});
test("doesn't top-level the future from inner.close", () async {
- var transformer = new StreamSinkTransformer.fromHandlers(
- handleData: (_, sink) {
- sink.close();
- });
+ var transformer =
+ new StreamSinkTransformer.fromHandlers(handleData: (_, sink) {
+ sink.close();
+ });
var innerSink = new CompleterStreamSink();
var sink = transformer.bind(innerSink);
diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart
index c562305..68a6b74 100644
--- a/test/stream_splitter_test.dart
+++ b/test/stream_splitter_test.dart
@@ -47,11 +47,12 @@
controller.close();
var count = 0;
- branch.listen(expectAsync1((value) {
- expect(count, anyOf(0, 2));
- expect(value, equals(count + 1));
- count++;
- }, count: 2), onError: expectAsync1((error) {
+ branch.listen(
+ expectAsync1((value) {
+ expect(count, anyOf(0, 2));
+ expect(value, equals(count + 1));
+ count++;
+ }, count: 2), onError: expectAsync1((error) {
expect(count, equals(1));
expect(error, equals("error"));
count++;
@@ -60,7 +61,8 @@
}));
});
- test("a branch that's created in the middle of a stream replays it", () async {
+ test("a branch that's created in the middle of a stream replays it",
+ () async {
controller.add(1);
controller.add(2);
await flushMicrotasks();
@@ -104,12 +106,12 @@
controller.add(1);
controller.add(2);
await flushMicrotasks();
-
+
var branch2 = splitter.split();
controller.add(3);
controller.close();
await flushMicrotasks();
-
+
var branch3 = splitter.split();
splitter.close();
@@ -207,7 +209,8 @@
expect(controller.isPaused, isFalse);
});
- test("the source stream is canceled when it's closed after all branches have "
+ test(
+ "the source stream is canceled when it's closed after all branches have "
"been canceled", () async {
var branch1 = splitter.split();
var branch2 = splitter.split();
@@ -233,7 +236,8 @@
expect(controller.hasListener, isFalse);
});
- test("the source stream is canceled when all branches are canceled after it "
+ test(
+ "the source stream is canceled when all branches are canceled after it "
"has been closed", () async {
var branch1 = splitter.split();
var branch2 = splitter.split();
@@ -257,7 +261,8 @@
expect(controller.hasListener, isFalse);
});
- test("a splitter that's closed before any branches are added never listens "
+ test(
+ "a splitter that's closed before any branches are added never listens "
"to the source stream", () {
splitter.close();
@@ -265,7 +270,8 @@
controller.stream.listen(null);
});
- test("splitFrom splits a source stream into the designated number of "
+ test(
+ "splitFrom splits a source stream into the designated number of "
"branches", () {
var branches = StreamSplitter.splitFrom(controller.stream, 5);
diff --git a/test/stream_zip_test.dart b/test/stream_zip_test.dart
index 958d1f2..71d8eee 100644
--- a/test/stream_zip_test.dart
+++ b/test/stream_zip_test.dart
@@ -45,40 +45,87 @@
}
test("Basic", () {
- testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9])],
- [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+ testZip([
+ mks([1, 2, 3]),
+ mks([4, 5, 6]),
+ mks([7, 8, 9])
+ ], [
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ]);
});
test("Uneven length 1", () {
- testZip([mks([1, 2, 3, 99, 100]), mks([4, 5, 6]), mks([7, 8, 9])],
- [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+ testZip([
+ mks([1, 2, 3, 99, 100]),
+ mks([4, 5, 6]),
+ mks([7, 8, 9])
+ ], [
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ]);
});
test("Uneven length 2", () {
- testZip([mks([1, 2, 3]), mks([4, 5, 6, 99, 100]), mks([7, 8, 9])],
- [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+ testZip([
+ mks([1, 2, 3]),
+ mks([4, 5, 6, 99, 100]),
+ mks([7, 8, 9])
+ ], [
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ]);
});
test("Uneven length 3", () {
- testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])],
- [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+ testZip([
+ mks([1, 2, 3]),
+ mks([4, 5, 6]),
+ mks([7, 8, 9, 99, 100])
+ ], [
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ]);
});
test("Uneven length 4", () {
- testZip([mks([1, 2, 3, 98]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])],
- [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+ testZip([
+ mks([1, 2, 3, 98]),
+ mks([4, 5, 6]),
+ mks([7, 8, 9, 99, 100])
+ ], [
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ]);
});
test("Empty 1", () {
- testZip([mks([]), mks([4, 5, 6]), mks([7, 8, 9])], []);
+ testZip([
+ mks([]),
+ mks([4, 5, 6]),
+ mks([7, 8, 9])
+ ], []);
});
test("Empty 2", () {
- testZip([mks([1, 2, 3]), mks([]), mks([7, 8, 9])], []);
+ testZip([
+ mks([1, 2, 3]),
+ mks([]),
+ mks([7, 8, 9])
+ ], []);
});
test("Empty 3", () {
- testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([])], []);
+ testZip([
+ mks([1, 2, 3]),
+ mks([4, 5, 6]),
+ mks([])
+ ], []);
});
test("Empty source", () {
@@ -86,57 +133,88 @@
});
test("Single Source", () {
- testZip([mks([1, 2, 3])], [[1], [2], [3]]);
+ testZip([
+ mks([1, 2, 3])
+ ], [
+ [1],
+ [2],
+ [3]
+ ]);
});
test("Other-streams", () {
Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4);
- Stream st2 = new Stream.periodic(const Duration(milliseconds: 5),
- (x) => x + 4).take(3);
+ Stream st2 =
+ new Stream.periodic(const Duration(milliseconds: 5), (x) => x + 4)
+ .take(3);
StreamController c = new StreamController.broadcast();
Stream st3 = c.stream;
- testZip([st1, st2, st3],
- [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
- c..add(7)..add(8)..add(9)..close();
+ testZip([
+ st1,
+ st2,
+ st3
+ ], [
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ]);
+ c
+ ..add(7)
+ ..add(8)
+ ..add(9)
+ ..close();
});
test("Error 1", () {
- expect(new StreamZip([streamError(mks([1, 2, 3]), 2, "BAD-1"),
- mks([4, 5, 6]),
- mks([7, 8, 9])]).toList(),
- throwsA(equals("BAD-1")));
+ expect(
+ new StreamZip([
+ streamError(mks([1, 2, 3]), 2, "BAD-1"),
+ mks([4, 5, 6]),
+ mks([7, 8, 9])
+ ]).toList(),
+ throwsA(equals("BAD-1")));
});
test("Error 2", () {
- expect(new StreamZip([mks([1, 2, 3]),
- streamError(mks([4, 5, 6]), 5, "BAD-2"),
- mks([7, 8, 9])]).toList(),
- throwsA(equals("BAD-2")));
+ expect(
+ new StreamZip([
+ mks([1, 2, 3]),
+ streamError(mks([4, 5, 6]), 5, "BAD-2"),
+ mks([7, 8, 9])
+ ]).toList(),
+ throwsA(equals("BAD-2")));
});
test("Error 3", () {
- expect(new StreamZip([mks([1, 2, 3]),
- mks([4, 5, 6]),
- streamError(mks([7, 8, 9]), 8, "BAD-3")]).toList(),
- throwsA(equals("BAD-3")));
+ expect(
+ new StreamZip([
+ mks([1, 2, 3]),
+ mks([4, 5, 6]),
+ streamError(mks([7, 8, 9]), 8, "BAD-3")
+ ]).toList(),
+ throwsA(equals("BAD-3")));
});
test("Error at end", () {
- expect(new StreamZip([mks([1, 2, 3]),
- streamError(mks([4, 5, 6]), 6, "BAD-4"),
- mks([7, 8, 9])]).toList(),
- throwsA(equals("BAD-4")));
+ expect(
+ new StreamZip([
+ mks([1, 2, 3]),
+ streamError(mks([4, 5, 6]), 6, "BAD-4"),
+ mks([7, 8, 9])
+ ]).toList(),
+ throwsA(equals("BAD-4")));
});
test("Error before first end", () {
// StreamControllers' streams with no "close" called will never be done,
// so the fourth event of the first stream is guaranteed to come first.
- expect(new StreamZip(
- [streamError(mks([1, 2, 3, 4]), 4, "BAD-5"),
- (new StreamController()..add(4)..add(5)..add(6)).stream,
- (new StreamController()..add(7)..add(8)..add(9)).stream]
- ).toList(),
- throwsA(equals("BAD-5")));
+ expect(
+ new StreamZip([
+ streamError(mks([1, 2, 3, 4]), 4, "BAD-5"),
+ (new StreamController()..add(4)..add(5)..add(6)).stream,
+ (new StreamController()..add(7)..add(8)..add(9)).stream
+ ]).toList(),
+ throwsA(equals("BAD-5")));
});
test("Error after first end", () {
@@ -144,40 +222,43 @@
controller..add(7)..add(8)..add(9);
// Transformer that puts error into controller when one of the first two
// streams have sent a done event.
- StreamTransformer trans = new StreamTransformer.fromHandlers(
- handleDone: (EventSink s) {
- Timer.run(() { controller.addError("BAD-6"); });
+ StreamTransformer trans =
+ new StreamTransformer.fromHandlers(handleDone: (EventSink s) {
+ Timer.run(() {
+ controller.addError("BAD-6");
+ });
s.close();
});
- testZip([mks([1, 2, 3]).transform(trans),
- mks([4, 5, 6]).transform(trans),
- controller.stream],
- [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+ testZip([
+ mks([1, 2, 3]).transform(trans),
+ mks([4, 5, 6]).transform(trans),
+ controller.stream
+ ], [
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ]);
});
test("Pause/Resume", () {
int sc1p = 0;
- StreamController c1 = new StreamController(
- onPause: () {
- sc1p++;
- },
- onResume: () {
- sc1p--;
- });
+ StreamController c1 = new StreamController(onPause: () {
+ sc1p++;
+ }, onResume: () {
+ sc1p--;
+ });
int sc2p = 0;
- StreamController c2 = new StreamController(
- onPause: () {
- sc2p++;
- },
- onResume: () {
- sc2p--;
- });
+ StreamController c2 = new StreamController(onPause: () {
+ sc2p++;
+ }, onResume: () {
+ sc2p--;
+ });
- var done = expectAsync0((){
+ var done = expectAsync0(() {
expect(sc1p, equals(1));
expect(sc2p, equals(0));
- }); // Call to complete test.
+ }); // Call to complete test.
Stream zip = new StreamZip([c1.stream, c2.stream]);
@@ -198,7 +279,9 @@
}).then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([5, 6]));
- new Future.delayed(ms25).then((_) { c2.add(8); });
+ new Future.delayed(ms25).then((_) {
+ c2.add(8);
+ });
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isTrue);
@@ -210,7 +293,12 @@
done();
});
- c1..add(1)..add(3)..add(5)..add(7)..close();
+ c1
+ ..add(1)
+ ..add(3)
+ ..add(5)
+ ..add(7)
+ ..close();
c2..add(2)..add(4);
});
diff --git a/test/stream_zip_zone_test.dart b/test/stream_zip_zone_test.dart
index af4d94a..a0773a6 100644
--- a/test/stream_zip_zone_test.dart
+++ b/test/stream_zip_zone_test.dart
@@ -9,22 +9,22 @@
// listen occurred.
main() {
- StreamController controller;
- controller = new StreamController();
- testStream("singlesub-async", controller, controller.stream);
- controller = new StreamController.broadcast();
- testStream("broadcast-async", controller, controller.stream);
- controller = new StreamController();
- testStream("asbroadcast-async", controller,
- controller.stream.asBroadcastStream());
+ StreamController controller;
+ controller = new StreamController();
+ testStream("singlesub-async", controller, controller.stream);
+ controller = new StreamController.broadcast();
+ testStream("broadcast-async", controller, controller.stream);
+ controller = new StreamController();
+ testStream(
+ "asbroadcast-async", controller, controller.stream.asBroadcastStream());
- controller = new StreamController(sync: true);
- testStream("singlesub-sync", controller, controller.stream);
- controller = new StreamController.broadcast(sync: true);
- testStream("broadcast-sync", controller, controller.stream);
- controller = new StreamController(sync: true);
- testStream("asbroadcast-sync", controller,
- controller.stream.asBroadcastStream());
+ controller = new StreamController(sync: true);
+ testStream("singlesub-sync", controller, controller.stream);
+ controller = new StreamController.broadcast(sync: true);
+ testStream("broadcast-sync", controller, controller.stream);
+ controller = new StreamController(sync: true);
+ testStream(
+ "asbroadcast-sync", controller, controller.stream.asBroadcastStream());
}
void testStream(String name, StreamController controller, Stream stream) {
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart
index 699babe..6e2c9d5 100644
--- a/test/subscription_stream_test.dart
+++ b/test/subscription_stream_test.dart
@@ -73,13 +73,13 @@
for (var sourceCancels in [false, true]) {
group("${sourceCancels ? "yes" : "no"}:", () {
var subscriptionStream;
- var onCancel; // Completes if source stream is canceled before done.
+ var onCancel; // Completes if source stream is canceled before done.
setUp(() {
var cancelCompleter = new Completer();
var source = createErrorStream(cancelCompleter);
onCancel = cancelCompleter.future;
- var sourceSubscription = source.listen(null,
- cancelOnError: sourceCancels);
+ var sourceSubscription =
+ source.listen(null, cancelOnError: sourceCancels);
subscriptionStream = new SubscriptionStream<int>(sourceSubscription);
});
@@ -87,15 +87,15 @@
var done = new Completer();
var events = [];
subscriptionStream.listen(events.add,
- onError: events.add,
- onDone: done.complete,
- cancelOnError: false);
+ onError: events.add, onDone: done.complete, cancelOnError: false);
var expected = [1, 2, "To err is divine!"];
if (sourceCancels) {
await onCancel;
// And [done] won't complete at all.
bool isDone = false;
- done.future.then((_) { isDone = true; });
+ done.future.then((_) {
+ isDone = true;
+ });
await new Future.delayed(const Duration(milliseconds: 5));
expect(isDone, false);
} else {
@@ -109,12 +109,12 @@
var completer = new Completer();
var events = [];
subscriptionStream.listen(events.add,
- onError: (value) {
- events.add(value);
- completer.complete();
- },
- onDone: () => throw "should not happen",
- cancelOnError: true);
+ onError: (value) {
+ events.add(value);
+ completer.complete();
+ },
+ onDone: () => throw "should not happen",
+ cancelOnError: true);
await completer.future;
await flushMicrotasks();
expect(events, [1, 2, "To err is divine!"]);
@@ -128,8 +128,7 @@
var stream = createStream();
var sourceSubscription =
stream.listen(null, cancelOnError: cancelOnError);
- var subscriptionStream =
- new SubscriptionStream(sourceSubscription);
+ var subscriptionStream = new SubscriptionStream(sourceSubscription);
var subscription =
subscriptionStream.listen(null, cancelOnError: cancelOnError);
expect(subscription.asFuture(42), completion(42));
@@ -137,10 +136,9 @@
test("- error goes to asFuture", () async {
var stream = createErrorStream();
- var sourceSubscription = stream.listen(null,
- cancelOnError: cancelOnError);
- var subscriptionStream =
- new SubscriptionStream(sourceSubscription);
+ var sourceSubscription =
+ stream.listen(null, cancelOnError: cancelOnError);
+ var subscriptionStream = new SubscriptionStream(sourceSubscription);
var subscription =
subscriptionStream.listen(null, cancelOnError: cancelOnError);
diff --git a/test/subscription_transformer_test.dart b/test/subscription_transformer_test.dart
index cdac4f8..dbbf597 100644
--- a/test/subscription_transformer_test.dart
+++ b/test/subscription_transformer_test.dart
@@ -86,13 +86,11 @@
var controller = new StreamController(onCancel: expectAsync0(() {
isCanceled = true;
}));
- var subscription = controller.stream
- .transform(subscriptionTransformer(
- handleCancel: expectAsync1((inner) {
- callbackInvoked = true;
- inner.cancel();
- })))
- .listen(expectAsync1((_) {}, count: 0));
+ var subscription = controller.stream.transform(
+ subscriptionTransformer(handleCancel: expectAsync1((inner) {
+ callbackInvoked = true;
+ inner.cancel();
+ }))).listen(expectAsync1((_) {}, count: 0));
await flushMicrotasks();
expect(callbackInvoked, isFalse);
@@ -103,7 +101,7 @@
expect(callbackInvoked, isTrue);
expect(isCanceled, isTrue);
});
-
+
test("invokes the callback once and caches its result", () async {
var completer = new Completer();
var controller = new StreamController();
@@ -138,7 +136,8 @@
var pauseCount = 0;
var controller = new StreamController();
var subscription = controller.stream
- .transform(subscriptionTransformer(handlePause: expectAsync1((inner) {
+ .transform(subscriptionTransformer(
+ handlePause: expectAsync1((inner) {
pauseCount++;
inner.pause();
}, count: 3)))
@@ -187,9 +186,9 @@
var subscription = controller.stream
.transform(subscriptionTransformer(
handleResume: expectAsync1((inner) {
- resumeCount++;
- inner.resume();
- }, count: 3)))
+ resumeCount++;
+ inner.resume();
+ }, count: 3)))
.listen(expectAsync1((_) {}, count: 0));
await flushMicrotasks();
@@ -216,13 +215,11 @@
test("invokes the callback when a resume future completes", () async {
var resumed = false;
var controller = new StreamController();
- var subscription = controller.stream
- .transform(subscriptionTransformer(
- handleResume: expectAsync1((inner) {
- resumed = true;
- inner.resume();
- })))
- .listen(expectAsync1((_) {}, count: 0));
+ var subscription = controller.stream.transform(
+ subscriptionTransformer(handleResume: expectAsync1((inner) {
+ resumed = true;
+ inner.resume();
+ }))).listen(expectAsync1((_) {}, count: 0));
var completer = new Completer();
subscription.pause(completer.future);
@@ -255,8 +252,7 @@
var controller = new StreamController();
subscription = controller.stream
.transform(subscriptionTransformer(handleCancel: (_) {}))
- .listen(
- expectAsync1((_) {}, count: 0),
+ .listen(expectAsync1((_) {}, count: 0),
onError: expectAsync2((_, __) {}, count: 0),
onDone: expectAsync0(() {}, count: 0));
subscription.cancel();
diff --git a/test/typed_wrapper/future_test.dart b/test/typed_wrapper/future_test.dart
index b928e05..0c8b00a 100644
--- a/test/typed_wrapper/future_test.dart
+++ b/test/typed_wrapper/future_test.dart
@@ -32,20 +32,22 @@
test: expectAsync1((_) {}, count: 0)),
completion(equals(12)));
- expect(errorWrapper.catchError(expectAsync1((error) {
- expect(error, equals("oh no"));
- return 42;
- }), test: expectAsync1((error) {
- expect(error, equals("oh no"));
- return true;
- })), completion(equals(42)));
+ expect(
+ errorWrapper.catchError(expectAsync1((error) {
+ expect(error, equals("oh no"));
+ return 42;
+ }), test: expectAsync1((error) {
+ expect(error, equals("oh no"));
+ return true;
+ })),
+ completion(equals(42)));
});
test("then()", () {
- expect(wrapper.then((value) => value.toString()),
- completion(equals("12")));
- expect(errorWrapper.then(expectAsync1((_) {}, count: 0)),
- throwsA("oh no"));
+ expect(
+ wrapper.then((value) => value.toString()), completion(equals("12")));
+ expect(
+ errorWrapper.then(expectAsync1((_) {}, count: 0)), throwsA("oh no"));
});
test("whenComplete()", () {
@@ -96,11 +98,11 @@
expect(wrapper.timeout(new Duration(seconds: 3)).then((_) {}),
throwsCastError);
- expect(
- new TypeSafeFuture<int>(new Completer<Object>().future)
- .timeout(Duration.ZERO, onTimeout: expectAsync0(() => "foo"))
- .then((_) {}),
- throwsCastError);
+ expect(
+ new TypeSafeFuture<int>(new Completer<Object>().future)
+ .timeout(Duration.ZERO, onTimeout: expectAsync0(() => "foo"))
+ .then((_) {}),
+ throwsCastError);
});
});
});
diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart
index 07a8bd2..f52abe7 100644
--- a/test/typed_wrapper/stream_subscription_test.dart
+++ b/test/typed_wrapper/stream_subscription_test.dart
@@ -18,8 +18,8 @@
controller = new StreamController<Object>(onCancel: () {
isCanceled = true;
});
- wrapper = new TypeSafeStreamSubscription<int>(
- controller.stream.listen(null));
+ wrapper =
+ new TypeSafeStreamSubscription<int>(controller.stream.listen(null));
});
test("onData()", () {
@@ -75,8 +75,8 @@
controller = new StreamController<Object>(onCancel: () {
isCanceled = true;
});
- wrapper = new TypeSafeStreamSubscription<int>(
- controller.stream.listen(null));
+ wrapper =
+ new TypeSafeStreamSubscription<int>(controller.stream.listen(null));
});
group("throws a CastError for", () {
diff --git a/test/typed_wrapper/stream_test.dart b/test/typed_wrapper/stream_test.dart
index 12a130a..88fbee6 100644
--- a/test/typed_wrapper/stream_test.dart
+++ b/test/typed_wrapper/stream_test.dart
@@ -18,14 +18,19 @@
var errorWrapper;
setUp(() {
controller = new StreamController<Object>()
- ..add(1)..add(2)..add(3)..add(4)..add(5)..close();
+ ..add(1)
+ ..add(2)
+ ..add(3)
+ ..add(4)
+ ..add(5)
+ ..close();
// TODO(nweiz): Use public methods when test#414 is fixed and we can run
// this on DDC.
wrapper = new TypeSafeStream<int>(controller.stream);
emptyWrapper = new TypeSafeStream<int>(new Stream<Object>.empty());
- singleWrapper = new TypeSafeStream<int>(
- new Stream<Object>.fromIterable([1]));
+ singleWrapper =
+ new TypeSafeStream<int>(new Stream<Object>.fromIterable([1]));
errorWrapper = new TypeSafeStream<int>(
new Stream<Object>.fromFuture(new Future.error("oh no")));
});
@@ -70,8 +75,8 @@
});
test("with onListen", () {
- var broadcast = wrapper.asBroadcastStream(
- onListen: expectAsync1((subscription) {
+ var broadcast =
+ wrapper.asBroadcastStream(onListen: expectAsync1((subscription) {
expect(subscription, new isInstanceOf<StreamSubscription<int>>());
subscription.pause();
}));
@@ -81,8 +86,8 @@
});
test("with onCancel", () {
- var broadcast = wrapper.asBroadcastStream(
- onCancel: expectAsync1((subscription) {
+ var broadcast =
+ wrapper.asBroadcastStream(onCancel: expectAsync1((subscription) {
expect(subscription, new isInstanceOf<StreamSubscription<int>>());
subscription.pause();
}));
@@ -105,13 +110,14 @@
group("distinct()", () {
test("without equals", () {
- expect(wrapper.distinct().toList(),
- completion(equals([1, 2, 3, 4, 5])));
+ expect(
+ wrapper.distinct().toList(), completion(equals([1, 2, 3, 4, 5])));
expect(
new TypeSafeStream<int>(
new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
- .distinct().toList(),
+ .distinct()
+ .toList(),
completion(equals([1, 2, 3])));
});
@@ -133,8 +139,7 @@
});
test("expand()", () {
- expect(
- wrapper.expand((i) => [i, i]).toList(),
+ expect(wrapper.expand((i) => [i, i]).toList(),
completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
});
@@ -197,26 +202,32 @@
group("handleError()", () {
test("without a test", () {
- expect(errorWrapper.handleError(expectAsync1((error) {
- expect(error, equals("oh no"));
- })).toList(), completion(isEmpty));
+ expect(
+ errorWrapper.handleError(expectAsync1((error) {
+ expect(error, equals("oh no"));
+ })).toList(),
+ completion(isEmpty));
});
test("with a matching test", () {
- expect(errorWrapper.handleError(expectAsync1((error) {
- expect(error, equals("oh no"));
- }), test: expectAsync1((error) {
- expect(error, equals("oh no"));
- return true;
- })).toList(), completion(isEmpty));
+ expect(
+ errorWrapper.handleError(expectAsync1((error) {
+ expect(error, equals("oh no"));
+ }), test: expectAsync1((error) {
+ expect(error, equals("oh no"));
+ return true;
+ })).toList(),
+ completion(isEmpty));
});
test("with a matching test", () {
- expect(errorWrapper.handleError(expectAsync1((_) {}, count: 0),
- test: expectAsync1((error) {
- expect(error, equals("oh no"));
- return false;
- })).toList(), throwsA("oh no"));
+ expect(
+ errorWrapper.handleError(expectAsync1((_) {}, count: 0),
+ test: expectAsync1((error) {
+ expect(error, equals("oh no"));
+ return false;
+ })).toList(),
+ throwsA("oh no"));
});
});
@@ -260,24 +271,23 @@
});
test("takeWhile()", () {
- expect(wrapper.takeWhile((i) => i < 3).toList(),
- completion(equals([1, 2])));
+ expect(
+ wrapper.takeWhile((i) => i < 3).toList(), completion(equals([1, 2])));
});
test("toSet()", () {
expect(wrapper.toSet(), completion(unorderedEquals([1, 2, 3, 4, 5])));
expect(
new TypeSafeStream<int>(
- new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
- .toSet(),
+ new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3])).toSet(),
completion(unorderedEquals([1, 2, 3])));
});
test("transform()", () {
- var transformer = new StreamTransformer<int, String>.fromHandlers(
- handleData: (data, sink) {
- sink.add(data.toString());
- });
+ var transformer = new StreamTransformer<int, String>.fromHandlers(
+ handleData: (data, sink) {
+ sink.add(data.toString());
+ });
expect(wrapper.transform(transformer).toList(),
completion(equals(["1", "2", "3", "4", "5"])));
@@ -385,8 +395,8 @@
setUp(() {
wrapper = new TypeSafeStream<int>(
new Stream<Object>.fromIterable(["foo", "bar", "baz"]));
- singleWrapper = new TypeSafeStream<int>(
- new Stream<Object>.fromIterable(["foo"]));
+ singleWrapper =
+ new TypeSafeStream<int>(new Stream<Object>.fromIterable(["foo"]));
});
group("throws a CastError for", () {
@@ -439,8 +449,8 @@
});
test("lastWhere()", () {
- expect(wrapper.lastWhere(expectAsync1((_) {}, count: 0)),
- throwsCastError);
+ expect(
+ wrapper.lastWhere(expectAsync1((_) {}, count: 0)), throwsCastError);
});
test("singleWhere()", () {
@@ -454,7 +464,8 @@
});
test("forEach()", () async {
- expect(wrapper.forEach(expectAsync1((_) {}, count: 0)), throwsCastError);
+ expect(
+ wrapper.forEach(expectAsync1((_) {}, count: 0)), throwsCastError);
});
test("handleError()", () {
@@ -468,8 +479,8 @@
});
test("map()", () {
- expect(wrapper.map(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
+ expect(
+ wrapper.map(expectAsync1((_) {}, count: 0)).first, throwsCastError);
});
test("reduce()", () {
diff --git a/test/utils.dart b/test/utils.dart
index 8499eb2..9270886 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -17,23 +17,24 @@
///
/// Returns a function that fails the test if it is ever called.
OptionalArgAction unreachable(String name) =>
- ([a, b]) => fail("Unreachable: $name");
+ ([a, b]) => fail("Unreachable: $name");
// TODO(nweiz): Use the version of this in test when test#418 is fixed.
/// A matcher that runs a callback in its own zone and asserts that that zone
/// emits an error that matches [matcher].
Matcher throwsZoned(matcher) => predicate((callback) {
- var firstError = true;
- runZoned(callback, onError: expectAsync2((error, stackTrace) {
- if (firstError) {
- expect(error, matcher);
- firstError = false;
- } else {
- registerException(error, stackTrace);
- }
- }, max: -1));
- return true;
-});
+ var firstError = true;
+ runZoned(callback,
+ onError: expectAsync2((error, stackTrace) {
+ if (firstError) {
+ expect(error, matcher);
+ firstError = false;
+ } else {
+ registerException(error, stackTrace);
+ }
+ }, max: -1));
+ return true;
+ });
/// A matcher that runs a callback in its own zone and asserts that that zone
/// emits a [CastError].