dartfmt (with 1.23.x)
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index 96e1a50..af52cd0 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -42,8 +42,7 @@
   /// An ephemeral cache guarantees that a callback function will only be
   /// executed at most once concurrently. This is useful for requests for which
   /// data is updated frequently but stale data is acceptable.
-  factory AsyncCache.ephemeral() =>
-      new AsyncCache(Duration.ZERO);
+  factory AsyncCache.ephemeral() => new AsyncCache(Duration.ZERO);
 
   /// Creates a cache that invalidates its contents after [duration] has passed.
   ///
diff --git a/lib/src/byte_collector.dart b/lib/src/byte_collector.dart
index a8d2dbf..3c1d49d 100644
--- a/lib/src/byte_collector.dart
+++ b/lib/src/byte_collector.dart
@@ -29,9 +29,10 @@
 /// an eight-bit unsigned value in the resulting list.
 CancelableOperation<Uint8List> collectBytesCancelable(
     Stream<List<int>> source) {
-  return _collectBytes(source, (subscription, result) =>
-      new CancelableOperation.fromFuture(result, onCancel: subscription.cancel)
-  );
+  return _collectBytes(
+      source,
+      (subscription, result) => new CancelableOperation.fromFuture(result,
+          onCancel: subscription.cancel));
 }
 
 /// Generalization over [collectBytes] and [collectBytesCancelable].
@@ -41,8 +42,8 @@
 /// so it can cancel the operation.
 T _collectBytes<T>(
     Stream<List<int>> source,
-    T result(StreamSubscription<List<int>> subscription,
-        Future<Uint8List> result)) {
+    T result(
+        StreamSubscription<List<int>> subscription, Future<Uint8List> result)) {
   var byteLists = <List<int>>[];
   var length = 0;
   var completer = new Completer<Uint8List>.sync();
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index a9663c1..2bf0f3b 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -44,8 +44,8 @@
   /// This is like `value.asStream()`, but if a subscription to the stream is
   /// canceled, this is as well.
   Stream<T> asStream() {
-    var controller = new StreamController<T>(
-        sync: true, onCancel: _completer._cancel);
+    var controller =
+        new StreamController<T>(sync: true, onCancel: _completer._cancel);
 
     value.then((value) {
       controller.add(value);
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index f746190..129ad49 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -25,13 +25,13 @@
   Stream<T> asStream() => _future.asStream();
 
   Future<T> catchError(Function onError, {bool test(Object error)}) =>
-    _future.catchError(onError, test: test);
+      _future.catchError(onError, test: test);
 
   Future<S> then<S>(FutureOr<S> onValue(T value), {Function onError}) =>
-    _future.then(onValue, onError: onError);
+      _future.then(onValue, onError: onError);
 
   Future<T> whenComplete(action()) => _future.whenComplete(action);
 
   Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
-    _future.timeout(timeLimit, onTimeout: onTimeout);
+      _future.timeout(timeLimit, onTimeout: onTimeout);
 }
diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart
index 0823412..e7575d8 100644
--- a/lib/src/delegate/stream_subscription.dart
+++ b/lib/src/delegate/stream_subscription.dart
@@ -23,8 +23,7 @@
   /// regardless of its original generic type, by asserting that its events are
   /// instances of `T` whenever they're provided. If they're not, the
   /// subscription throws a [CastError].
-  static StreamSubscription<T> typed<T>(
-          StreamSubscription subscription) =>
+  static StreamSubscription<T> typed<T>(StreamSubscription subscription) =>
       subscription is StreamSubscription<T>
           ? subscription
           : new TypeSafeStreamSubscription<T>(subscription);
@@ -51,8 +50,7 @@
 
   Future cancel() => _source.cancel();
 
-  Future<E> asFuture<E>([E futureValue]) =>
-      _source.asFuture(futureValue);
+  Future<E> asFuture<E>([E futureValue]) => _source.asFuture(futureValue);
 
   bool get isPaused => _source.isPaused;
 }
diff --git a/lib/src/future_group.dart b/lib/src/future_group.dart
index 9b85c2f..0bf3158 100644
--- a/lib/src/future_group.dart
+++ b/lib/src/future_group.dart
@@ -47,6 +47,7 @@
     }
     return _onIdleController.stream;
   }
+
   StreamController _onIdleController;
 
   /// The values emitted by the futures that have been added to the group, in
@@ -93,4 +94,3 @@
     _completer.complete(_values);
   }
 }
-
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index d565b4d..f07c387 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -26,9 +26,7 @@
   }
 
   StreamSubscription<T> listen(void onData(T event),
-                               {Function onError,
-                                void onDone(),
-                                bool cancelOnError}) {
+      {Function onError, void onDone(), bool cancelOnError}) {
     if (_callback == null) {
       throw new StateError("Stream has already been listened to.");
     }
diff --git a/lib/src/result.dart b/lib/src/result.dart
index 7c45846..49457c3 100644
--- a/lib/src/result.dart
+++ b/lib/src/result.dart
@@ -78,8 +78,7 @@
   /// Errors have been converted to an [ErrorResult] value.
   static Future<Result<T>> capture<T>(Future<T> future) {
     return future.then((value) => new ValueResult(value),
-        onError: (error, stackTrace) =>
-            new ErrorResult<T>(error, stackTrace));
+        onError: (error, stackTrace) => new ErrorResult<T>(error, stackTrace));
   }
 
   /// Release the result of a captured future.
@@ -115,8 +114,7 @@
   /// result with the value is returned.
   static Result<T> flatten<T>(Result<Result<T>> result) {
     if (result.isValue) return result.asValue.value;
-    return new ErrorResult<T>(
-        result.asError.error, result.asError.stackTrace);
+    return new ErrorResult<T>(result.asError.error, result.asError.stackTrace);
   }
 
   /// Whether this result is a value result.
diff --git a/lib/src/result/future.dart b/lib/src/result/future.dart
index 209e8b1..749b101 100644
--- a/lib/src/result/future.dart
+++ b/lib/src/result/future.dart
@@ -29,6 +29,5 @@
     return resultFuture;
   }
 
-  ResultFuture._(Future<T> future)
-      : super(future);
+  ResultFuture._(Future<T> future) : super(future);
 }
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index e01efac..fcd6b06 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -18,8 +18,8 @@
 
   Stream<T> bind(Stream<S> stream) {
     var subscription;
-    var controller = new StreamController<T>(sync: true,
-        onCancel: () => subscription.cancel());
+    var controller = new StreamController<T>(
+        sync: true, onCancel: () => subscription.cancel());
     subscription = stream.listen((value) {
       // TODO(nweiz): When we release a new major version, get rid of the second
       // type parameter and avoid this conversion.
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index 2bff6a5..4311de5 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -36,8 +36,7 @@
   /// instead contain just that error.
   static Stream<T> fromFuture<T>(Future<Stream<T>> streamFuture) {
     var completer = new StreamCompleter<T>();
-    streamFuture.then(completer.setSourceStream,
-        onError: completer.setError);
+    streamFuture.then(completer.setSourceStream, onError: completer.setError);
     return completer.stream;
   }
 
@@ -118,23 +117,21 @@
   Stream<T> _sourceStream;
 
   StreamSubscription<T> listen(onData(T data),
-                               {Function onError,
-                                void onDone(),
-                                bool cancelOnError}) {
+      {Function onError, void onDone(), bool cancelOnError}) {
     if (_controller == null) {
       if (_sourceStream != null && !_sourceStream.isBroadcast) {
         // If the source stream is itself single subscription,
         // just listen to it directly instead of creating a controller.
-        return _sourceStream.listen(onData, onError: onError, onDone: onDone,
-                                    cancelOnError: cancelOnError);
+        return _sourceStream.listen(onData,
+            onError: onError, onDone: onDone, cancelOnError: cancelOnError);
       }
       _createController();
       if (_sourceStream != null) {
         _linkStreamToController();
       }
     }
-    return _controller.stream.listen(onData, onError: onError, onDone: onDone,
-                                     cancelOnError: cancelOnError);
+    return _controller.stream.listen(onData,
+        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
   }
 
   /// Whether a source stream has been set.
@@ -161,8 +158,9 @@
   void _linkStreamToController() {
     assert(_controller != null);
     assert(_sourceStream != null);
-    _controller.addStream(_sourceStream, cancelOnError: false)
-               .whenComplete(_controller.close);
+    _controller
+        .addStream(_sourceStream, cancelOnError: false)
+        .whenComplete(_controller.close);
   }
 
   /// Sets an empty source stream.
@@ -174,7 +172,7 @@
     if (_controller == null) {
       _createController();
     }
-    _sourceStream = _controller.stream;  // Mark stream as set.
+    _sourceStream = _controller.stream; // Mark stream as set.
     _controller.close();
   }
 
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 4aa448e..6361a5c 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -73,9 +73,7 @@
   /// Creates a new stream group where [stream] is a broadcast stream.
   StreamGroup.broadcast() {
     _controller = new StreamController<T>.broadcast(
-        onListen: _onListen,
-        onCancel: _onCancelBroadcast,
-        sync: true);
+        onListen: _onListen, onCancel: _onCancelBroadcast, sync: true);
   }
 
   /// Adds [stream] as a member of this group.
@@ -193,10 +191,8 @@
   ///
   /// This will pause the resulting subscription if [this] is paused.
   StreamSubscription<T> _listenToStream(Stream<T> stream) {
-    var subscription = stream.listen(
-        _controller.add,
-        onError: _controller.addError,
-        onDone: () => remove(stream));
+    var subscription = stream.listen(_controller.add,
+        onError: _controller.addError, onDone: () => remove(stream));
     if (_state == _StreamGroupState.paused) subscription.pause();
     return subscription;
   }
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index f578081..b9023ab 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -131,7 +131,6 @@
     throw _failClosed();
   }
 
-
   /// Look at the next [count] data events without consuming them.
   ///
   /// Works like [take] except that the events are left in the queue.
@@ -495,7 +494,6 @@
   }
 }
 
-
 /// The default implementation of [StreamQueue].
 ///
 /// This queue gets its events from a stream which is listened
@@ -523,18 +521,14 @@
   void _ensureListening() {
     if (_isDone) return;
     if (_subscription == null) {
-      _subscription =
-          _sourceStream.listen(
-              (data) {
-                _addResult(new Result.value(data));
-              },
-              onError: (error, StackTrace stackTrace) {
-                _addResult(new Result.error(error, stackTrace));
-              },
-              onDone: () {
-                _subscription = null;
-                this._close();
-              });
+      _subscription = _sourceStream.listen((data) {
+        _addResult(new Result.value(data));
+      }, onError: (error, StackTrace stackTrace) {
+        _addResult(new Result.error(error, stackTrace));
+      }, onDone: () {
+        _subscription = null;
+        this._close();
+      });
     } else {
       _subscription.resume();
     }
@@ -650,8 +644,8 @@
       queue._cancel();
     }
 
-    assert((_parent._requestQueue.first as _TransactionRequest)
-        .transaction == this);
+    assert((_parent._requestQueue.first as _TransactionRequest).transaction ==
+        this);
     _parent._requestQueue.removeFirst();
     _parent._updateRequests();
   }
@@ -722,15 +716,14 @@
       return true;
     }
     if (isDone) {
-      _completer.completeError(new StateError("No elements"),
-                               StackTrace.current);
+      _completer.completeError(
+          new StateError("No elements"), StackTrace.current);
       return true;
     }
     return false;
   }
 }
 
-
 /// Request for a [StreamQueue.peek] call.
 ///
 /// Completes the returned future when receiving the first event,
@@ -749,15 +742,14 @@
       return true;
     }
     if (isDone) {
-      _completer.completeError(new StateError("No elements"),
-                               StackTrace.current);
+      _completer.completeError(
+          new StateError("No elements"), StackTrace.current);
       return true;
     }
     return false;
   }
 }
 
-
 /// Request for a [StreamQueue.skip] call.
 class _SkipRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by the skip call.
@@ -815,7 +807,6 @@
   Future<List<T>> get future => _completer.future;
 }
 
-
 /// Request for a [StreamQueue.take] call.
 class _TakeRequest<T> extends _ListRequest<T> {
   _TakeRequest(int eventsToTake) : super(eventsToTake);
@@ -839,7 +830,6 @@
   }
 }
 
-
 /// Request for a [StreamQueue.lookAhead] call.
 class _LookAheadRequest<T> extends _ListRequest<T> {
   _LookAheadRequest(int eventsToTake) : super(eventsToTake);
@@ -862,7 +852,6 @@
   }
 }
 
-
 /// Request for a [StreamQueue.cancel] call.
 ///
 /// The request needs no events, it just waits in the request queue
@@ -871,6 +860,7 @@
 class _CancelRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by the `cancel` call.
   final _completer = new Completer();
+
   ///
   /// When the event is completed, it needs to cancel the active subscription
   /// of the `StreamQueue` object, if any.
@@ -926,8 +916,9 @@
       for (var event in events) {
         event.addTo(controller);
       }
-      controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
-                .whenComplete(controller.close);
+      controller
+          .addStream(_streamQueue._extractStream(), cancelOnError: false)
+          .whenComplete(controller.close);
       _completer.setSourceStream(controller.stream);
     }
     return true;
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index f8056c7..4219126 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -36,11 +36,9 @@
   ///
   /// If the future completes with an error, the returned sink will instead
   /// be closed. Its [Sink.done] future will contain the error.
-  static StreamSink<T> fromFuture<T>(
-      Future<StreamSink<T>> sinkFuture) {
+  static StreamSink<T> fromFuture<T>(Future<StreamSink<T>> sinkFuture) {
     var completer = new StreamSinkCompleter<T>();
-    sinkFuture.then(completer.setDestinationSink,
-        onError: completer.setError);
+    sinkFuture.then(completer.setDestinationSink, onError: completer.setError);
     return completer.sink;
   }
 
diff --git a/lib/src/stream_sink_transformer.dart b/lib/src/stream_sink_transformer.dart
index 65adda6..503d28a 100644
--- a/lib/src/stream_sink_transformer.dart
+++ b/lib/src/stream_sink_transformer.dart
@@ -24,8 +24,7 @@
   /// This is equivalent to piping all events from the outer sink through a
   /// stream transformed by [transformer] and from there into the inner sink.
   const factory StreamSinkTransformer.fromStreamTransformer(
-          StreamTransformer<S, T> transformer) =
-      StreamTransformerWrapper<S, T>;
+      StreamTransformer<S, T> transformer) = StreamTransformerWrapper<S, T>;
 
   /// Creates a [StreamSinkTransformer] that delegates events to the given
   /// handlers.
@@ -34,8 +33,8 @@
   /// They're called for each incoming event, and any actions on the sink
   /// they're passed are forwarded to the inner sink. If a handler is omitted,
   /// the event is passed through unaltered.
-  factory StreamSinkTransformer.fromHandlers({
-      void handleData(S data, EventSink<T> sink),
+  factory StreamSinkTransformer.fromHandlers(
+      {void handleData(S data, EventSink<T> sink),
       void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
       void handleDone(EventSink<T> sink)}) {
     return new HandlerTransformer<S, T>(handleData, handleError, handleDone);
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
index 8cc3d01..dba8240 100644
--- a/lib/src/stream_sink_transformer/handler_transformer.dart
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -28,8 +28,7 @@
   /// The handler for done events.
   final HandleDone<T> _handleDone;
 
-  HandlerTransformer(
-      this._handleData, this._handleError, this._handleDone);
+  HandlerTransformer(this._handleData, this._handleError, this._handleDone);
 
   StreamSink<S> bind(StreamSink<T> sink) => new _HandlerSink<S, T>(this, sink);
 }
diff --git a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
index 83d2b19..32ac648 100644
--- a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
+++ b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
@@ -30,17 +30,16 @@
 
   Future get done => _inner.done;
 
-  _StreamTransformerWrapperSink(StreamTransformer<S, T> transformer,
-      this._inner) {
-    _controller.stream.transform(transformer).listen(
-        _inner.add,
-        onError: _inner.addError,
-        onDone: () {
-          // Ignore any errors that come from this call to [_inner.close]. The
-          // user can access them through [done] or the value returned from
-          // [this.close], and we don't want them to get top-leveled.
-          _inner.close().catchError((_) {});
-        });
+  _StreamTransformerWrapperSink(
+      StreamTransformer<S, T> transformer, this._inner) {
+    _controller.stream
+        .transform(transformer)
+        .listen(_inner.add, onError: _inner.addError, onDone: () {
+      // Ignore any errors that come from this call to [_inner.close]. The
+      // user can access them through [done] or the value returned from
+      // [this.close], and we don't want them to get top-leveled.
+      _inner.close().catchError((_) {});
+    });
   }
 
   void add(S event) {
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index 3d8f994..6cec98c 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -57,8 +57,7 @@
   ///
   /// [count] defaults to 2. This is the same as creating [count] branches and
   /// then closing the [StreamSplitter].
-  static List<Stream<T>> splitFrom<T>(Stream<T> stream,
-      [int count]) {
+  static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) {
     if (count == null) count = 2;
     var splitter = new StreamSplitter<T>(stream);
     var streams = new List<Stream>.generate(count, (_) => splitter.split());
@@ -77,9 +76,7 @@
     }
 
     var controller = new StreamController<T>(
-        onListen: _onListen,
-        onPause: _onPause,
-        onResume: _onResume);
+        onListen: _onListen, onPause: _onPause, onResume: _onResume);
     controller.onCancel = () => _onCancel(controller);
 
     for (var result in _buffer) {
@@ -147,8 +144,8 @@
       // wasn't paused, this will be a no-op.
       _subscription.resume();
     } else {
-      _subscription = _stream.listen(
-          _onData, onError: _onError, onDone: _onDone);
+      _subscription =
+          _stream.listen(_onData, onError: _onError, onDone: _onDone);
     }
   }
 
diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart
index ad54539..1443b18 100644
--- a/lib/src/stream_subscription_transformer.dart
+++ b/lib/src/stream_subscription_transformer.dart
@@ -35,12 +35,14 @@
     return new _TransformedSubscription(
         stream.listen(null, cancelOnError: cancelOnError),
         handleCancel ?? (inner) => inner.cancel(),
-        handlePause ?? (inner) {
-          inner.pause();
-        },
-        handleResume ?? (inner) {
-          inner.resume();
-        });
+        handlePause ??
+            (inner) {
+              inner.pause();
+            },
+        handleResume ??
+            (inner) {
+              inner.resume();
+            });
   });
 }
 
@@ -61,8 +63,8 @@
 
   bool get isPaused => _inner?.isPaused ?? false;
 
-  _TransformedSubscription(this._inner, this._handleCancel, this._handlePause,
-      this._handleResume);
+  _TransformedSubscription(
+      this._inner, this._handleCancel, this._handlePause, this._handleResume);
 
   void onData(void handleData(T data)) {
     _inner?.onData(handleData);
@@ -77,15 +79,15 @@
   }
 
   Future cancel() => _cancelMemoizer.runOnce(() {
-    var inner = _inner;
-    _inner.onData(null);
-    _inner.onDone(null);
+        var inner = _inner;
+        _inner.onData(null);
+        _inner.onDone(null);
 
-    // Setting onError to null will cause errors to be top-leveled.
-    _inner.onError((_, __) {});
-    _inner = null;
-    return _handleCancel(inner);
-  });
+        // Setting onError to null will cause errors to be top-leveled.
+        _inner.onError((_, __) {});
+        _inner = null;
+        return _handleCancel(inner);
+      });
   final _cancelMemoizer = new AsyncMemoizer();
 
   void pause([Future resumeFuture]) {
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index a65840c..3d5a811 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -17,10 +17,8 @@
 
   StreamZip(Iterable<Stream<T>> streams) : _streams = streams;
 
-  StreamSubscription<List<T>> listen(void onData(List<T> data), {
-                                  Function onError,
-                                  void onDone(),
-                                  bool cancelOnError}) {
+  StreamSubscription<List<T>> listen(void onData(List<T> data),
+      {Function onError, void onDone(), bool cancelOnError}) {
     cancelOnError = identical(true, cancelOnError);
     var subscriptions = <StreamSubscription<T>>[];
     StreamController<List<T>> controller;
@@ -72,8 +70,9 @@
     try {
       for (var stream in _streams) {
         int index = subscriptions.length;
-        subscriptions.add(stream.listen(
-            (data) { handleData(index, data); },
+        subscriptions.add(stream.listen((data) {
+          handleData(index, data);
+        },
             onError: cancelOnError ? handleError : handleErrorCancel,
             onDone: handleDone,
             cancelOnError: cancelOnError));
@@ -87,34 +86,28 @@
 
     current = new List(subscriptions.length);
 
-    controller = new StreamController<List<T>>(
-      onPause: () {
-        for (int i = 0; i < subscriptions.length; i++) {
-          // This may pause some subscriptions more than once.
-          // These will not be resumed by onResume below, but must wait for the
-          // next round.
-          subscriptions[i].pause();
-        }
-      },
-      onResume: () {
-        for (int i = 0; i < subscriptions.length; i++) {
-          subscriptions[i].resume();
-        }
-      },
-      onCancel: () {
-        for (int i = 0; i < subscriptions.length; i++) {
-          // Canceling more than once is safe.
-          subscriptions[i].cancel();
-        }
+    controller = new StreamController<List<T>>(onPause: () {
+      for (int i = 0; i < subscriptions.length; i++) {
+        // This may pause some subscriptions more than once.
+        // These will not be resumed by onResume below, but must wait for the
+        // next round.
+        subscriptions[i].pause();
       }
-    );
+    }, onResume: () {
+      for (int i = 0; i < subscriptions.length; i++) {
+        subscriptions[i].resume();
+      }
+    }, onCancel: () {
+      for (int i = 0; i < subscriptions.length; i++) {
+        // Canceling more than once is safe.
+        subscriptions[i].cancel();
+      }
+    });
 
     if (subscriptions.isEmpty) {
       controller.close();
     }
     return controller.stream.listen(onData,
-                                    onError: onError,
-                                    onDone: onDone,
-                                    cancelOnError: cancelOnError);
+        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
   }
 }
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 50ca81b..a235663 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -30,7 +30,7 @@
   /// stream. That may be an issue if `subscription` was made to cancel on
   /// an error.
   SubscriptionStream(StreamSubscription<T> subscription)
-       : _source = subscription {
+      : _source = subscription {
     _source.pause();
     // Clear callbacks to avoid keeping them alive unnecessarily.
     _source.onData(null);
@@ -39,9 +39,7 @@
   }
 
   StreamSubscription<T> listen(void onData(T event),
-                               {Function onError,
-                                void onDone(),
-                                bool cancelOnError}) {
+      {Function onError, void onDone(), bool cancelOnError}) {
     if (_source == null) {
       throw new StateError("Stream has already been listened to.");
     }
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
index 235b205..5b1f2ff 100644
--- a/lib/src/typed/stream.dart
+++ b/lib/src/typed/stream.dart
@@ -30,11 +30,11 @@
         onListen: onListen == null
             ? null
             : (subscription) =>
-                  onListen(new TypeSafeStreamSubscription<T>(subscription)),
+                onListen(new TypeSafeStreamSubscription<T>(subscription)),
         onCancel: onCancel == null
             ? null
             : (subscription) =>
-                  onCancel(new TypeSafeStreamSubscription<T>(subscription))));
+                onCancel(new TypeSafeStreamSubscription<T>(subscription))));
   }
 
   Stream<E> asyncExpand<E>(Stream<E> convert(T event)) =>
@@ -48,8 +48,7 @@
           ? null
           : (previous, next) => equals(previous as T, next as T)));
 
-  Future<E> drain<E>([E futureValue]) =>
-      _stream.drain(futureValue);
+  Future<E> drain<E>([E futureValue]) => _stream.drain(futureValue);
 
   Stream<S> expand<S>(Iterable<S> convert(T value)) =>
       _stream.expand(_validateType(convert));
@@ -63,10 +62,9 @@
   Future<T> singleWhere(bool test(T element)) async =>
       (await _stream.singleWhere(_validateType(test))) as T;
 
-  Future<S> fold<S>(S initialValue,
-          S combine(S previous, T element)) =>
-      _stream.fold(initialValue,
-          (previous, element) => combine(previous, element as T));
+  Future<S> fold<S>(S initialValue, S combine(S previous, T element)) =>
+      _stream.fold(
+          initialValue, (previous, element) => combine(previous, element as T));
 
   Future forEach(void action(T element)) =>
       _stream.forEach(_validateType(action));
@@ -79,8 +77,7 @@
       new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData),
           onError: onError, onDone: onDone, cancelOnError: cancelOnError));
 
-  Stream<S> map<S>(S convert(T event)) =>
-      _stream.map(_validateType(convert));
+  Stream<S> map<S>(S convert(T event)) => _stream.map(_validateType(convert));
 
   // Don't forward to `_stream.pipe` because we want the consumer to see the
   // type-asserted stream.
@@ -88,8 +85,8 @@
       consumer.addStream(this).then((_) => consumer.close());
 
   Future<T> reduce(T combine(T previous, T element)) async {
-    var result = await _stream.reduce(
-        (previous, element) => combine(previous as T, element as T));
+    var result = await _stream
+        .reduce((previous, element) => combine(previous as T, element as T));
     return result as T;
   }
 
@@ -100,20 +97,17 @@
       new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
 
   Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) =>
-      new TypeSafeStream<T>(_stream.timeout(
-          timeLimit,
+      new TypeSafeStream<T>(_stream.timeout(timeLimit,
           onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink))));
 
   Future<List<T>> toList() async =>
       DelegatingList.typed<T>(await _stream.toList());
 
-  Future<Set<T>> toSet() async =>
-      DelegatingSet.typed<T>(await _stream.toSet());
+  Future<Set<T>> toSet() async => DelegatingSet.typed<T>(await _stream.toSet());
 
   // Don't forward to `_stream.transform` because we want the transformer to see
   // the type-asserted stream.
-  Stream<S> transform<S>(
-          StreamTransformer<T, S> transformer) =>
+  Stream<S> transform<S>(StreamTransformer<T, S> transformer) =>
       transformer.bind(this);
 
   Stream<T> where(bool test(T element)) =>
@@ -132,7 +126,6 @@
 
   /// Returns a version of [function] that asserts that its argument is an
   /// instance of `T`.
-  UnaryFunction<dynamic, S> _validateType<S>(
-          S function(T value)) =>
+  UnaryFunction<dynamic, S> _validateType<S>(S function(T value)) =>
       function == null ? null : (value) => function(value as T);
 }
diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart
index e02e1c0..0fab039 100644
--- a/lib/src/typed/stream_subscription.dart
+++ b/lib/src/typed/stream_subscription.dart
@@ -33,6 +33,5 @@
 
   Future cancel() => _subscription.cancel();
 
-  Future<E> asFuture<E>([E futureValue]) =>
-      _subscription.asFuture(futureValue);
+  Future<E> asFuture<E>([E futureValue]) => _subscription.asFuture(futureValue);
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index adb2c82..70c13e9 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,13 +1,13 @@
 name: async
-version: 1.13.2
+version: 1.13.3-dev
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
+environment:
+  sdk: ">=1.22.0 <2.0.0"
 dependencies:
   collection: "^1.5.0"
 dev_dependencies:
   fake_async: "^0.1.2"
   stack_trace: "^1.0.0"
   test: "^0.12.0"
-environment:
-  sdk: ">=1.22.0 <2.0.0"
diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart
index b81d9f9..747835b 100644
--- a/test/async_cache_test.dart
+++ b/test/async_cache_test.dart
@@ -65,9 +65,11 @@
   });
 
   test('should fetch a stream via a callback', () async {
-    expect(await cache.fetchStream(expectAsync0(() {
-      return new Stream.fromIterable(['1', '2', '3']);
-    })).toList(), ['1', '2', '3']);
+    expect(
+        await cache.fetchStream(expectAsync0(() {
+          return new Stream.fromIterable(['1', '2', '3']);
+        })).toList(),
+        ['1', '2', '3']);
   });
 
   test('should not fetch stream via callback when a cache exists', () async {
diff --git a/test/byte_collection_test.dart b/test/byte_collection_test.dart
index e3d4f05..b87b074 100644
--- a/test/byte_collection_test.dart
+++ b/test/byte_collection_test.dart
@@ -67,7 +67,7 @@
       var sc = new StreamController<List<int>>();
       var result = collectBytesCancelable(sc.stream);
       // Value never completes.
-      result.value.whenComplete(expectAsync0((){}, count: 0));
+      result.value.whenComplete(expectAsync0(() {}, count: 0));
 
       expect(sc.hasListener, isTrue);
       sc.add([1, 2]);
@@ -77,7 +77,7 @@
       await nextTimerTick();
       expect(sc.hasListener, isTrue);
       result.cancel();
-      expect(sc.hasListener, isFalse);  // Cancelled immediately.
+      expect(sc.hasListener, isFalse); // Cancelled immediately.
       var replacement = await result.valueOrCancellation();
       expect(replacement, isNull);
       await nextTimerTick();
@@ -87,4 +87,4 @@
   });
 }
 
-Future nextTimerTick() => new Future((){});
+Future nextTimerTick() => new Future(() {});
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index f30c134..bded402 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -13,8 +13,8 @@
   group("without being canceled", () {
     var completer;
     setUp(() {
-      completer = new CancelableCompleter(
-          onCancel: expectAsync0(() {}, count: 0));
+      completer =
+          new CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
     });
 
     test("sends values to the future", () {
@@ -74,8 +74,8 @@
 
       test("successfully then with a future", () {
         completer.complete(1);
-        expect(() => completer.complete(new Completer().future),
-            throwsStateError);
+        expect(
+            () => completer.complete(new Completer().future), throwsStateError);
       });
 
       test("with a future then successfully", () {
@@ -85,8 +85,8 @@
 
       test("with a future twice", () {
         completer.complete(new Completer().future);
-        expect(() => completer.complete(new Completer().future),
-            throwsStateError);
+        expect(
+            () => completer.complete(new Completer().future), throwsStateError);
       });
     });
 
@@ -97,8 +97,8 @@
       });
 
       test("forwards errors", () {
-        var operation = new CancelableOperation.fromFuture(
-            new Future.error("error"));
+        var operation =
+            new CancelableOperation.fromFuture(new Future.error("error"));
         expect(operation.value, throwsA("error"));
       });
     });
@@ -148,24 +148,26 @@
     });
 
     test("doesn't call onCancel if the completer has completed", () {
-      var completer = new CancelableCompleter(
-          onCancel: expectAsync0(() {}, count: 0));
+      var completer =
+          new CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
       completer.complete(1);
       expect(completer.operation.value, completion(equals(1)));
       expect(completer.operation.cancel(), completes);
     });
 
-    test("does call onCancel if the completer has completed to an unfired "
+    test(
+        "does call onCancel if the completer has completed to an unfired "
         "Future", () {
       var completer = new CancelableCompleter(onCancel: expectAsync0(() {}));
       completer.complete(new Completer().future);
       expect(completer.operation.cancel(), completes);
     });
 
-    test("doesn't call onCancel if the completer has completed to a fired "
+    test(
+        "doesn't call onCancel if the completer has completed to a fired "
         "Future", () async {
-      var completer = new CancelableCompleter(
-          onCancel: expectAsync0(() {}, count: 0));
+      var completer =
+          new CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
       completer.complete(new Future.value(1));
       await completer.operation.value;
       expect(completer.operation.cancel(), completes);
@@ -195,7 +197,8 @@
 
     test("valueOrCancellation waits on the onCancel future", () async {
       var innerCompleter = new Completer();
-      var completer = new CancelableCompleter(onCancel: () => innerCompleter.future);
+      var completer =
+          new CancelableCompleter(onCancel: () => innerCompleter.future);
 
       var fired = false;
       completer.operation.valueOrCancellation().then((_) {
@@ -229,8 +232,8 @@
 
     test("cancels the completer when the subscription is canceled", () {
       var completer = new CancelableCompleter(onCancel: expectAsync0(() {}));
-      var sub = completer.operation.asStream()
-          .listen(expectAsync1((_) {}, count: 0));
+      var sub =
+          completer.operation.asStream().listen(expectAsync1((_) {}, count: 0));
       completer.operation.value.whenComplete(expectAsync0(() {}, count: 0));
       sub.cancel();
       expect(completer.isCanceled, isTrue);
diff --git a/test/restartable_timer_test.dart b/test/restartable_timer_test.dart
index b612f6e..4f59f62 100644
--- a/test/restartable_timer_test.dart
+++ b/test/restartable_timer_test.dart
@@ -100,8 +100,7 @@
   test("only runs the callback once if the timer isn't reset", () {
     new FakeAsync().run((async) {
       new RestartableTimer(
-          new Duration(seconds: 5),
-          expectAsync0(() {}, count: 1));
+          new Duration(seconds: 5), expectAsync0(() {}, count: 1));
       async.elapse(new Duration(seconds: 10));
     });
   });
diff --git a/test/result_test.dart b/test/result_test.dart
index 21e251c..210ae3f 100644
--- a/test/result_test.dart
+++ b/test/result_test.dart
@@ -58,54 +58,60 @@
   test("complete with value", () {
     Result<int> result = new ValueResult<int>(42);
     var c = new Completer<int>();
-    c.future.then(expectAsync1((int v) { expect(v, equals(42)); }),
-                  onError: (e, s) { fail("Unexpected error"); });
+    c.future.then(expectAsync1((int v) {
+      expect(v, equals(42));
+    }), onError: (e, s) {
+      fail("Unexpected error");
+    });
     result.complete(c);
   });
 
   test("complete with error", () {
     Result<bool> result = new ErrorResult("BAD", stack);
     var c = new Completer<bool>();
-    c.future.then((bool v) { fail("Unexpected value $v"); },
-                  onError: expectAsync2((e, s) {
-                    expect(e, equals("BAD"));
-                    expect(s, same(stack));
-                  }));
+    c.future.then((bool v) {
+      fail("Unexpected value $v");
+    }, onError: expectAsync2((e, s) {
+      expect(e, equals("BAD"));
+      expect(s, same(stack));
+    }));
     result.complete(c);
   });
 
   test("add sink value", () {
     var result = new ValueResult<int>(42);
-    EventSink<int> sink = new TestSink(
-        onData: expectAsync1((v) { expect(v, equals(42)); })
-    );
+    EventSink<int> sink = new TestSink(onData: expectAsync1((v) {
+      expect(v, equals(42));
+    }));
     result.addTo(sink);
   });
 
   test("add sink error", () {
     Result<bool> result = new ErrorResult("BAD", stack);
-    EventSink<bool> sink = new TestSink(
-        onError: expectAsync2((e, s) {
-          expect(e, equals("BAD"));
-          expect(s, same(stack));
-        })
-    );
+    EventSink<bool> sink = new TestSink(onError: expectAsync2((e, s) {
+      expect(e, equals("BAD"));
+      expect(s, same(stack));
+    }));
     result.addTo(sink);
   });
 
   test("value as future", () {
     Result<int> result = new ValueResult<int>(42);
-    result.asFuture.then(expectAsync1((int v) { expect(v, equals(42)); }),
-                         onError: (e, s) { fail("Unexpected error"); });
+    result.asFuture.then(expectAsync1((int v) {
+      expect(v, equals(42));
+    }), onError: (e, s) {
+      fail("Unexpected error");
+    });
   });
 
   test("error as future", () {
     Result<bool> result = new ErrorResult("BAD", stack);
-    result.asFuture.then((bool v) { fail("Unexpected value $v"); },
-                         onError: expectAsync2((e, s) {
-                           expect(e, equals("BAD"));
-                           expect(s, same(stack));
-                         }));
+    result.asFuture.then((bool v) {
+      fail("Unexpected value $v");
+    }, onError: expectAsync2((e, s) {
+      expect(e, equals("BAD"));
+      expect(s, same(stack));
+    }));
   });
 
   test("capture future value", () {
@@ -169,17 +175,19 @@
   test("capture stream", () {
     StreamController<int> c = new StreamController<int>();
     Stream<Result> stream = Result.captureStream(c.stream);
-    var expectedList = new Queue.from([new Result.value(42),
-                                       new Result.error("BAD", stack),
-                                       new Result.value(37)]);
+    var expectedList = new Queue.from([
+      new Result.value(42),
+      new Result.error("BAD", stack),
+      new Result.value(37)
+    ]);
     void listener(Result actual) {
       expect(expectedList.isEmpty, isFalse);
       expectResult(actual, expectedList.removeFirst());
     }
-    stream.listen(expectAsync1(listener, count: 3),
-                  onError: (e, s) { fail("Unexpected error: $e"); },
-                  onDone: expectAsync0((){}),
-                  cancelOnError: true);
+
+    stream.listen(expectAsync1(listener, count: 3), onError: (e, s) {
+      fail("Unexpected error: $e");
+    }, onDone: expectAsync0(() {}), cancelOnError: true);
     c.add(42);
     c.addError("BAD", stack);
     c.add(37);
@@ -189,12 +197,14 @@
   test("release stream", () {
     StreamController<Result<int>> c = new StreamController<Result<int>>();
     Stream<int> stream = Result.releaseStream(c.stream);
-    var events = [new Result<int>.value(42),
-                   new Result<int>.error("BAD", stack),
-                   new Result<int>.value(37)];
+    var events = [
+      new Result<int>.value(42),
+      new Result<int>.error("BAD", stack),
+      new Result<int>.value(37)
+    ];
     // Expect the data events, and an extra error event.
     var expectedList = new Queue.from(events)
-        ..add(new Result.error("BAD2", stack));
+      ..add(new Result.error("BAD2", stack));
 
     void dataListener(int v) {
       expect(expectedList.isEmpty, isFalse);
@@ -212,32 +222,32 @@
     }
 
     stream.listen(expectAsync1(dataListener, count: 2),
-                  onError: expectAsync2(errorListener, count: 2),
-                  onDone: expectAsync0((){}));
+        onError: expectAsync2(errorListener, count: 2),
+        onDone: expectAsync0(() {}));
     for (Result<int> result in events) {
-      c.add(result);  // Result value or error in data line.
+      c.add(result); // Result value or error in data line.
     }
-    c.addError("BAD2", stack);  // Error in error line.
+    c.addError("BAD2", stack); // Error in error line.
     c.close();
   });
 
   test("release stream cancel on error", () {
     StreamController<Result<int>> c = new StreamController<Result<int>>();
     Stream<int> stream = Result.releaseStream(c.stream);
-    stream.listen(expectAsync1((v) { expect(v, equals(42)); }),
-                  onError: expectAsync2((e, s) {
-                    expect(e, equals("BAD"));
-                    expect(s, same(stack));
-                  }),
-                  onDone: () { fail("Unexpected done event"); },
-                  cancelOnError: true);
+    stream.listen(expectAsync1((v) {
+      expect(v, equals(42));
+    }), onError: expectAsync2((e, s) {
+      expect(e, equals("BAD"));
+      expect(s, same(stack));
+    }), onDone: () {
+      fail("Unexpected done event");
+    }, cancelOnError: true);
     c.add(new Result.value(42));
     c.add(new Result.error("BAD", stack));
     c.add(new Result.value(37));
     c.close();
   });
 
-
   test("flatten error 1", () {
     Result<int> error = new Result<int>.error("BAD", stack);
     Result<int> flattened =
@@ -292,18 +302,12 @@
 
   test("handle neither unary nor binary", () {
     ErrorResult result = new Result.error("error", stack);
-    expect(() => result.handle(() => fail("unreachable")),
-           throws);
-    expect(() => result.handle((a, b, c) => fail("unreachable")),
-           throws);
-    expect(() => result.handle((a, b, {c}) => fail("unreachable")),
-           throws);
-    expect(() => result.handle((a, {b}) => fail("unreachable")),
-           throws);
-    expect(() => result.handle(({a, b}) => fail("unreachable")),
-           throws);
-    expect(() => result.handle(({a}) => fail("unreachable")),
-           throws);
+    expect(() => result.handle(() => fail("unreachable")), throws);
+    expect(() => result.handle((a, b, c) => fail("unreachable")), throws);
+    expect(() => result.handle((a, b, {c}) => fail("unreachable")), throws);
+    expect(() => result.handle((a, {b}) => fail("unreachable")), throws);
+    expect(() => result.handle(({a, b}) => fail("unreachable")), throws);
+    expect(() => result.handle(({a}) => fail("unreachable")), throws);
   });
 }
 
@@ -323,17 +327,32 @@
   final Function onError;
   final Function onDone;
 
-  TestSink({void this.onData(T data) : _nullData,
-            void this.onError(e, StackTrace s) : _nullError,
-            void this.onDone() : _nullDone });
+  TestSink(
+      {void this.onData(T data): _nullData,
+      void this.onError(e, StackTrace s): _nullError,
+      void this.onDone(): _nullDone});
 
-  void add(T value) { onData(value); }
-  void addError(error, [StackTrace stack]) { onError(error, stack); }
-  void close() { onDone(); }
+  void add(T value) {
+    onData(value);
+  }
 
-  static void _nullData(value) { fail("Unexpected sink add: $value"); }
+  void addError(error, [StackTrace stack]) {
+    onError(error, stack);
+  }
+
+  void close() {
+    onDone();
+  }
+
+  static void _nullData(value) {
+    fail("Unexpected sink add: $value");
+  }
+
   static void _nullError(e, StackTrace s) {
     fail("Unexpected sink addError: $e");
   }
-  static void _nullDone() { fail("Unepxected sink close"); }
+
+  static void _nullDone() {
+    fail("Unepxected sink close");
+  }
 }
diff --git a/test/single_subscription_transformer_test.dart b/test/single_subscription_transformer_test.dart
index f21d4e9..74e462b 100644
--- a/test/single_subscription_transformer_test.dart
+++ b/test/single_subscription_transformer_test.dart
@@ -12,8 +12,8 @@
 void main() {
   test("buffers events as soon as it's bound", () async {
     var controller = new StreamController.broadcast();
-    var stream = controller.stream.transform(
-        const SingleSubscriptionTransformer());
+    var stream =
+        controller.stream.transform(const SingleSubscriptionTransformer());
 
     // Add events before [stream] has a listener to be sure it buffers them.
     controller.add(1);
@@ -34,8 +34,8 @@
     var controller = new StreamController.broadcast(onCancel: () {
       canceled = true;
     });
-    var stream = controller.stream.transform(
-        const SingleSubscriptionTransformer());
+    var stream =
+        controller.stream.transform(const SingleSubscriptionTransformer());
     await flushMicrotasks();
     expect(canceled, isFalse);
 
diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
index dea73d7..2e9ff9b 100644
--- a/test/stream_completer_test.dart
+++ b/test/stream_completer_test.dart
@@ -27,9 +27,9 @@
   test("cancel before linking a stream doesn't listen on stream", () async {
     var completer = new StreamCompleter();
     var subscription = completer.stream.listen(null);
-    subscription.pause();  // Should be ignored.
+    subscription.pause(); // Should be ignored.
     subscription.cancel();
-    completer.setSourceStream(new UnusableStream());  // Doesn't throw.
+    completer.setSourceStream(new UnusableStream()); // Doesn't throw.
   });
 
   test("listen and pause before linking stream", () async {
@@ -78,14 +78,13 @@
     var lastEvent = -1;
     var controller = new StreamController();
     var subscription;
-    subscription = completer.stream.listen(
-        (value) {
-          expect(value, lessThan(3));
-          lastEvent = value;
-          if (value == 2) {
-            subscription.cancel();
-          }
-        },
+    subscription = completer.stream.listen((value) {
+      expect(value, lessThan(3));
+      lastEvent = value;
+      if (value == 2) {
+        subscription.cancel();
+      }
+    },
         onError: unreachable("error"),
         onDone: unreachable("done"),
         cancelOnError: true);
@@ -110,20 +109,16 @@
     var completer = new StreamCompleter();
     completer.setEmpty();
     var done = new Completer();
-    completer.stream.listen(
-        unreachable("data"),
-        onError: unreachable("error"),
-        onDone: done.complete);
+    completer.stream.listen(unreachable("data"),
+        onError: unreachable("error"), onDone: done.complete);
     await done.future;
   });
 
   test("complete with setEmpty after listening", () async {
     var completer = new StreamCompleter();
     var done = new Completer();
-    completer.stream.listen(
-        unreachable("data"),
-        onError: unreachable("error"),
-        onDone: done.complete);
+    completer.stream.listen(unreachable("data"),
+        onError: unreachable("error"), onDone: done.complete);
     completer.setEmpty();
     await done.future;
   });
@@ -147,17 +142,13 @@
     var completer = new StreamCompleter();
     var lastEvent = -1;
     var controller = new StreamController();
-    completer.stream.listen(
-        (value) {
-          expect(value, lessThan(3));
-          lastEvent = value;
-        },
-        onError: (value) {
-          expect(value, "3");
-          lastEvent = value;
-        },
-        onDone: unreachable("done"),
-        cancelOnError: true);
+    completer.stream.listen((value) {
+      expect(value, lessThan(3));
+      lastEvent = value;
+    }, onError: (value) {
+      expect(value, "3");
+      lastEvent = value;
+    }, onDone: unreachable("done"), cancelOnError: true);
     completer.setSourceStream(controller.stream);
     expect(controller.hasListener, isTrue);
 
@@ -188,17 +179,13 @@
     controller.add(1);
     expect(controller.hasListener, isFalse);
 
-    completer.stream.listen(
-        (value) {
-          expect(value, lessThan(3));
-          lastEvent = value;
-        },
-        onError: (value) {
-          expect(value, "3");
-          lastEvent = value;
-        },
-        onDone: unreachable("done"),
-        cancelOnError: true);
+    completer.stream.listen((value) {
+      expect(value, lessThan(3));
+      lastEvent = value;
+    }, onError: (value) {
+      expect(value, "3");
+      lastEvent = value;
+    }, onDone: unreachable("done"), cancelOnError: true);
 
     expect(controller.hasListener, isTrue);
 
@@ -324,8 +311,8 @@
   test("asFuture with error accross setting stream", () async {
     var completer = new StreamCompleter();
     var controller = new StreamController();
-    var subscription = completer.stream.listen(unreachable("data"),
-                                               cancelOnError: false);
+    var subscription =
+        completer.stream.listen(unreachable("data"), cancelOnError: false);
     var done = subscription.asFuture();
     expect(controller.hasListener, isFalse);
     completer.setSourceStream(controller.stream);
@@ -341,12 +328,10 @@
   group("setError()", () {
     test("produces a stream that emits a single error", () {
       var completer = new StreamCompleter();
-      completer.stream.listen(
-          unreachable("data"),
+      completer.stream.listen(unreachable("data"),
           onError: expectAsync2((error, stackTrace) {
-            expect(error, equals("oh no"));
-          }),
-          onDone: expectAsync0(() {}));
+        expect(error, equals("oh no"));
+      }), onDone: expectAsync0(() {}));
 
       completer.setError("oh no");
     });
@@ -357,12 +342,10 @@
       completer.setError("oh no");
       await flushMicrotasks();
 
-      completer.stream.listen(
-          unreachable("data"),
+      completer.stream.listen(unreachable("data"),
           onError: expectAsync2((error, stackTrace) {
-            expect(error, equals("oh no"));
-          }),
-          onDone: expectAsync0(() {}));
+        expect(error, equals("oh no"));
+      }), onDone: expectAsync0(() {}));
     });
   });
 }
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 078dc3c..65ddb42 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -75,22 +75,24 @@
           new StreamTransformer.fromHandlers(
               handleData: (data, sink) => sink.add("data: $data"),
               handleError: (error, _, sink) => sink.add("error: $error")));
-      expect(transformed.toList(), completion(equals([
-        "data: first",
-        "error: second",
-        "data: third",
-        "error: fourth",
-        "error: fifth",
-        "data: sixth"
-      ])));
+      expect(
+          transformed.toList(),
+          completion(equals([
+            "data: first",
+            "error: second",
+            "data: third",
+            "error: fourth",
+            "error: fifth",
+            "data: sixth"
+          ])));
     });
 
     test("emits events once there's a listener", () {
       var controller = new StreamController<String>();
       streamGroup.add(controller.stream);
 
-      expect(streamGroup.stream.toList(),
-          completion(equals(["first", "second"])));
+      expect(
+          streamGroup.stream.toList(), completion(equals(["first", "second"])));
 
       controller.add("first");
       controller.add("second");
@@ -137,8 +139,8 @@
       var controller = new StreamController<String>.broadcast();
       streamGroup.add(controller.stream);
 
-      expect(streamGroup.stream.toList(),
-          completion(equals(["first", "second"])));
+      expect(
+          streamGroup.stream.toList(), completion(equals(["first", "second"])));
 
       controller.add("first");
       controller.add("second");
@@ -150,8 +152,8 @@
     test("forwards cancel errors", () async {
       var subscription = streamGroup.stream.listen(null);
 
-      var controller = new StreamController<String>(
-          onCancel: () => throw "error");
+      var controller =
+          new StreamController<String>(onCancel: () => throw "error");
       streamGroup.add(controller.stream);
       await flushMicrotasks();
 
@@ -162,8 +164,8 @@
       var subscription = streamGroup.stream.listen(null);
 
       var completer = new Completer();
-      var controller = new StreamController<String>(
-          onCancel: () => completer.future);
+      var controller =
+          new StreamController<String>(onCancel: () => completer.future);
       streamGroup.add(controller.stream);
       await flushMicrotasks();
 
@@ -178,15 +180,15 @@
       expect(fired, isTrue);
     });
 
-    test("add() while active pauses the stream if the group is paused, then "
+    test(
+        "add() while active pauses the stream if the group is paused, then "
         "resumes once the group resumes", () async {
       var subscription = streamGroup.stream.listen(null);
       await flushMicrotasks();
 
       var paused = false;
       var controller = new StreamController<String>(
-          onPause: () => paused = true,
-          onResume: () => paused = false);
+          onPause: () => paused = true, onResume: () => paused = false);
 
       subscription.pause();
       await flushMicrotasks();
@@ -223,16 +225,16 @@
       });
 
       test("forwards cancel errors", () {
-        var controller = new StreamController<String>(
-            onCancel: () => throw "error");
+        var controller =
+            new StreamController<String>(onCancel: () => throw "error");
 
         expect(streamGroup.add(controller.stream), throwsA("error"));
       });
 
       test("forwards a cancel future", () async {
         var completer = new Completer();
-        var controller = new StreamController<String>(
-            onCancel: () => completer.future);
+        var controller =
+            new StreamController<String>(onCancel: () => completer.future);
 
         var fired = false;
         streamGroup.add(controller.stream).then((_) => fired = true);
@@ -268,8 +270,8 @@
 
       expect(streamGroup.close(), completes);
 
-      expect(streamGroup.stream.toList(),
-          completion(equals(["first", "second"])));
+      expect(
+          streamGroup.stream.toList(), completion(equals(["first", "second"])));
     });
 
     test("emits events from multiple sources once there's a listener", () {
@@ -279,8 +281,8 @@
       var controller2 = new StreamController<String>();
       streamGroup.add(controller2.stream);
 
-      expect(streamGroup.stream.toList(),
-          completion(equals(["first", "second"])));
+      expect(
+          streamGroup.stream.toList(), completion(equals(["first", "second"])));
 
       controller1.add("first");
       controller2.add("second");
@@ -325,8 +327,8 @@
       var controller = new StreamController<String>.broadcast();
       streamGroup.add(controller.stream);
 
-      expect(streamGroup.stream.toList(),
-          completion(equals(["first", "second"])));
+      expect(
+          streamGroup.stream.toList(), completion(equals(["first", "second"])));
 
       controller.add("first");
       controller.add("second");
@@ -356,8 +358,8 @@
     test("never cancels single-subscription streams", () async {
       var subscription = streamGroup.stream.listen(null);
 
-      var controller = new StreamController<String>(
-          onCancel: expectAsync0(() {}, count: 0));
+      var controller =
+          new StreamController<String>(onCancel: expectAsync0(() {}, count: 0));
 
       streamGroup.add(controller.stream);
       await flushMicrotasks();
@@ -568,8 +570,8 @@
       });
 
       test("forwards cancel errors", () async {
-        var controller = new StreamController<String>(
-            onCancel: () => throw "error");
+        var controller =
+            new StreamController<String>(onCancel: () => throw "error");
         streamGroup.add(controller.stream);
 
         streamGroup.stream.listen(null);
@@ -580,8 +582,8 @@
 
       test("forwards cancel futures", () async {
         var completer = new Completer();
-        var controller = new StreamController<String>(
-            onCancel: () => completer.future);
+        var controller =
+            new StreamController<String>(onCancel: () => completer.future);
 
         streamGroup.stream.listen(null);
         await flushMicrotasks();
@@ -641,7 +643,8 @@
         expect(streamGroup.stream.toList(), completion(isEmpty));
       });
 
-      test("if there are streams, closes the group once those streams close "
+      test(
+          "if there are streams, closes the group once those streams close "
           "and there's a listener", () async {
         var controller1 = new StreamController<String>();
         var controller2 = new StreamController<String>();
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index f487e65..2e4805b 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -113,9 +113,9 @@
     test("with bad arguments throws", () async {
       var events = new StreamQueue<int>(createStream());
       expect(() => events.lookAhead(-1), throwsArgumentError);
-      expect(await events.next, 1);  // Did not consume event.
+      expect(await events.next, 1); // Did not consume event.
       expect(() => events.lookAhead(-1), throwsArgumentError);
-      expect(await events.next, 2);  // Did not consume event.
+      expect(await events.next, 2); // Did not consume event.
       await events.cancel();
     });
 
@@ -153,8 +153,8 @@
 
     test("multiple requests at the same time", () async {
       var events = new StreamQueue<int>(createStream());
-      var result = await Future.wait(
-          [events.next, events.next, events.next, events.next]);
+      var result = await Future
+          .wait([events.next, events.next, events.next, events.next]);
       expect(result, [1, 2, 3, 4]);
       await events.cancel();
     });
@@ -183,9 +183,9 @@
       expect(() => events.skip(-1), throwsArgumentError);
       // A non-int throws either a type error or an argument error,
       // depending on whether it's checked mode or not.
-      expect(await events.next, 1);  // Did not consume event.
+      expect(await events.next, 1); // Did not consume event.
       expect(() => events.skip(-1), throwsArgumentError);
-      expect(await events.next, 2);  // Did not consume event.
+      expect(await events.next, 2); // Did not consume event.
       await events.cancel();
     });
 
@@ -251,14 +251,16 @@
       var index = 0;
       // Check that futures complete in order.
       Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) {
-        expect(value, expectedValue);
-        expect(index, sequenceIndex);
-        index++;
-      };
-      await Future.wait([skip1.then(sequence(0, 0)),
-                         skip2.then(sequence(0, 1)),
-                         skip3.then(sequence(1, 2)),
-                         skip4.then(sequence(1, 3))]);
+            expect(value, expectedValue);
+            expect(index, sequenceIndex);
+            index++;
+          };
+      await Future.wait([
+        skip1.then(sequence(0, 0)),
+        skip2.then(sequence(0, 1)),
+        skip3.then(sequence(1, 2)),
+        skip4.then(sequence(1, 3))
+      ]);
       await events.cancel();
     });
   });
@@ -291,9 +293,9 @@
     test("with bad arguments throws", () async {
       var events = new StreamQueue<int>(createStream());
       expect(() => events.take(-1), throwsArgumentError);
-      expect(await events.next, 1);  // Did not consume event.
+      expect(await events.next, 1); // Did not consume event.
       expect(() => events.take(-1), throwsArgumentError);
-      expect(await events.next, 2);  // Did not consume event.
+      expect(await events.next, 2); // Did not consume event.
       await events.cancel();
     });
 
@@ -522,7 +524,8 @@
       test("cancels the underlying subscription when called before any event",
           () async {
         var cancelFuture = new Future.value(42);
-        var controller = new StreamController<int>(onCancel: () => cancelFuture);
+        var controller =
+            new StreamController<int>(onCancel: () => cancelFuture);
 
         var events = new StreamQueue<int>(controller.stream);
         expect(await events.cancel(immediate: true), 42);
@@ -539,8 +542,8 @@
 
       test("returns the result of closing the underlying subscription",
           () async {
-        var controller = new StreamController<int>(
-            onCancel: () => new Future.value(42));
+        var controller =
+            new StreamController<int>(onCancel: () => new Future.value(42));
         var events = new StreamQueue<int>(controller.stream);
         expect(await events.cancel(immediate: true), 42);
       });
@@ -548,8 +551,8 @@
       test("listens and then cancels a stream that hasn't been listened to yet",
           () async {
         var wasListened = false;
-        var controller = new StreamController<int>(
-            onListen: () => wasListened = true);
+        var controller =
+            new StreamController<int>(onListen: () => wasListened = true);
         var events = new StreamQueue<int>(controller.stream);
         expect(wasListened, isFalse);
         expect(controller.hasListener, isFalse);
@@ -608,7 +611,9 @@
       var events = new StreamQueue<int>(controller.stream);
 
       var hasNext;
-      events.hasNext.then((result) { hasNext = result; });
+      events.hasNext.then((result) {
+        hasNext = result;
+      });
       await flushMicrotasks();
       expect(hasNext, isNull);
       controller.add(42);
@@ -622,7 +627,9 @@
       var events = new StreamQueue<int>(controller.stream);
 
       var hasNext;
-      events.hasNext.then((result) { hasNext = result; });
+      events.hasNext.then((result) {
+        hasNext = result;
+      });
       await flushMicrotasks();
       expect(hasNext, isNull);
       controller.addError("BAD");
@@ -976,7 +983,8 @@
       }));
     });
 
-    test("the parent queue continues from the child position if it returns "
+    test(
+        "the parent queue continues from the child position if it returns "
         "true", () async {
       await events.withTransaction(expectAsync1((queue) async {
         expect(await queue.next, 2);
@@ -986,7 +994,8 @@
       expect(await events.next, 3);
     });
 
-    test("the parent queue continues from its original position if it returns "
+    test(
+        "the parent queue continues from its original position if it returns "
         "false", () async {
       await events.withTransaction(expectAsync1((queue) async {
         expect(await queue.next, 2);
@@ -1036,12 +1045,15 @@
       expect(await events.next, 3);
     });
 
-    test("the parent queue continues from the child position if an error is "
+    test(
+        "the parent queue continues from the child position if an error is "
         "thrown", () async {
-      expect(events.cancelable(expectAsync1((queue) async {
-        expect(await queue.next, 2);
-        throw "oh no";
-      })).value, throwsA("oh no"));
+      expect(
+          events.cancelable(expectAsync1((queue) async {
+            expect(await queue.next, 2);
+            throw "oh no";
+          })).value,
+          throwsA("oh no"));
 
       expect(events.next, completion(3));
     });
@@ -1057,10 +1069,12 @@
     });
 
     test("forwards the value from the callback", () async {
-      expect(await events.cancelable(expectAsync1((queue) async {
-        expect(await queue.next, 2);
-        return "value";
-      })).value, "value");
+      expect(
+          await events.cancelable(expectAsync1((queue) async {
+            expect(await queue.next, 2);
+            return "value";
+          })).value,
+          "value");
     });
   });
 
@@ -1088,8 +1102,9 @@
     // `take(10)`.
     takeTest(startIndex) {
       expect(events.take(10),
-             completion(new List.generate(10, (i) => startIndex + i)));
+          completion(new List.generate(10, (i) => startIndex + i)));
     }
+
     var tests = [nextTest, skipTest, takeTest];
 
     int counter = 0;
@@ -1103,7 +1118,7 @@
     }
     // Then expect 20 more events as a `rest` call.
     expect(events.rest.toList(),
-           completion(new List.generate(20, (i) => counter + i)));
+        completion(new List.generate(20, (i) => counter + i)));
   });
 }
 
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
index c7ce1ea..3c8b576 100644
--- a/test/stream_sink_completer_test.dart
+++ b/test/stream_sink_completer_test.dart
@@ -289,9 +289,9 @@
 
   test("doesn't allow the destination sink to be set multiple times", () {
     completer.setDestinationSink(new TestSink());
-    expect(() => completer.setDestinationSink(new TestSink()),
-        throwsStateError);
-    expect(() => completer.setDestinationSink(new TestSink()),
-        throwsStateError);
+    expect(
+        () => completer.setDestinationSink(new TestSink()), throwsStateError);
+    expect(
+        () => completer.setDestinationSink(new TestSink()), throwsStateError);
   });
 }
diff --git a/test/stream_sink_transformer_test.dart b/test/stream_sink_transformer_test.dart
index 6be9f9e..208a03a 100644
--- a/test/stream_sink_transformer_test.dart
+++ b/test/stream_sink_transformer_test.dart
@@ -19,8 +19,8 @@
     test("transforms data events", () {
       var transformer = new StreamSinkTransformer.fromStreamTransformer(
           new StreamTransformer.fromHandlers(handleData: (i, sink) {
-            sink.add(i * 2);
-          }));
+        sink.add(i * 2);
+      }));
       var sink = transformer.bind(controller.sink);
 
       var results = [];
@@ -38,18 +38,17 @@
       var transformer = new StreamSinkTransformer.fromStreamTransformer(
           new StreamTransformer.fromHandlers(
               handleError: (i, stackTrace, sink) {
-                sink.addError((i as num) * 2, stackTrace);
-              }));
+        sink.addError((i as num) * 2, stackTrace);
+      }));
       var sink = transformer.bind(controller.sink);
 
       var results = [];
       controller.stream.listen(expectAsync1((_) {}, count: 0),
           onError: (error, stackTrace) {
-            results.add(error);
-          },
-          onDone: expectAsync0(() {
-            expect(results, equals([2, 4, 6]));
-          }));
+        results.add(error);
+      }, onDone: expectAsync0(() {
+        expect(results, equals([2, 4, 6]));
+      }));
 
       sink.addError(1);
       sink.addError(2);
@@ -59,11 +58,10 @@
 
     test("transforms done events", () {
       var transformer = new StreamSinkTransformer.fromStreamTransformer(
-          new StreamTransformer.fromHandlers(
-              handleDone: (sink) {
-                sink.add(1);
-                sink.close();
-              }));
+          new StreamTransformer.fromHandlers(handleDone: (sink) {
+        sink.add(1);
+        sink.close();
+      }));
       var sink = transformer.bind(controller.sink);
 
       var results = [];
@@ -90,7 +88,7 @@
       expect(doneResult.isComplete, isFalse);
       expect(closeResult.isComplete, isFalse);
 
-      // Once the inner sink is completed, the futures should fire. 
+      // Once the inner sink is completed, the futures should fire.
       innerSink.completer.complete();
       await flushMicrotasks();
       expect(doneResult.isComplete, isTrue);
@@ -100,8 +98,8 @@
     test("doesn't top-level the future from inner.close", () async {
       var transformer = new StreamSinkTransformer.fromStreamTransformer(
           new StreamTransformer.fromHandlers(handleData: (_, sink) {
-            sink.close();
-          }));
+        sink.close();
+      }));
       var innerSink = new CompleterStreamSink();
       var sink = transformer.bind(innerSink);
 
@@ -119,10 +117,10 @@
 
   group("fromHandlers", () {
     test("transforms data events", () {
-      var transformer = new StreamSinkTransformer.fromHandlers(
-          handleData: (i, sink) {
-            sink.add(i * 2);
-          });
+      var transformer =
+          new StreamSinkTransformer.fromHandlers(handleData: (i, sink) {
+        sink.add(i * 2);
+      });
       var sink = transformer.bind(controller.sink);
 
       var results = [];
@@ -139,18 +137,17 @@
     test("transforms error events", () {
       var transformer = new StreamSinkTransformer.fromHandlers(
           handleError: (i, stackTrace, sink) {
-            sink.addError((i as num) * 2, stackTrace);
-          });
+        sink.addError((i as num) * 2, stackTrace);
+      });
       var sink = transformer.bind(controller.sink);
 
       var results = [];
       controller.stream.listen(expectAsync1((_) {}, count: 0),
           onError: (error, stackTrace) {
-            results.add(error);
-          },
-          onDone: expectAsync0(() {
-            expect(results, equals([2, 4, 6]));
-          }));
+        results.add(error);
+      }, onDone: expectAsync0(() {
+        expect(results, equals([2, 4, 6]));
+      }));
 
       sink.addError(1);
       sink.addError(2);
@@ -159,11 +156,11 @@
     });
 
     test("transforms done events", () {
-      var transformer = new StreamSinkTransformer.fromHandlers(
-          handleDone: (sink) {
-            sink.add(1);
-            sink.close();
-          });
+      var transformer =
+          new StreamSinkTransformer.fromHandlers(handleDone: (sink) {
+        sink.add(1);
+        sink.close();
+      });
       var sink = transformer.bind(controller.sink);
 
       var results = [];
@@ -189,7 +186,7 @@
       expect(doneResult.isComplete, isFalse);
       expect(closeResult.isComplete, isFalse);
 
-      // Once the inner sink is completed, the futures should fire. 
+      // Once the inner sink is completed, the futures should fire.
       innerSink.completer.complete();
       await flushMicrotasks();
       expect(doneResult.isComplete, isTrue);
@@ -197,10 +194,10 @@
     });
 
     test("doesn't top-level the future from inner.close", () async {
-      var transformer = new StreamSinkTransformer.fromHandlers(
-          handleData: (_, sink) {
-            sink.close();
-          });
+      var transformer =
+          new StreamSinkTransformer.fromHandlers(handleData: (_, sink) {
+        sink.close();
+      });
       var innerSink = new CompleterStreamSink();
       var sink = transformer.bind(innerSink);
 
diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart
index c562305..68a6b74 100644
--- a/test/stream_splitter_test.dart
+++ b/test/stream_splitter_test.dart
@@ -47,11 +47,12 @@
     controller.close();
 
     var count = 0;
-    branch.listen(expectAsync1((value) {
-      expect(count, anyOf(0, 2));
-      expect(value, equals(count + 1));
-      count++;
-    }, count: 2), onError: expectAsync1((error) {
+    branch.listen(
+        expectAsync1((value) {
+          expect(count, anyOf(0, 2));
+          expect(value, equals(count + 1));
+          count++;
+        }, count: 2), onError: expectAsync1((error) {
       expect(count, equals(1));
       expect(error, equals("error"));
       count++;
@@ -60,7 +61,8 @@
     }));
   });
 
-  test("a branch that's created in the middle of a stream replays it", () async {
+  test("a branch that's created in the middle of a stream replays it",
+      () async {
     controller.add(1);
     controller.add(2);
     await flushMicrotasks();
@@ -104,12 +106,12 @@
     controller.add(1);
     controller.add(2);
     await flushMicrotasks();
-    
+
     var branch2 = splitter.split();
     controller.add(3);
     controller.close();
     await flushMicrotasks();
-    
+
     var branch3 = splitter.split();
     splitter.close();
 
@@ -207,7 +209,8 @@
     expect(controller.isPaused, isFalse);
   });
 
-  test("the source stream is canceled when it's closed after all branches have "
+  test(
+      "the source stream is canceled when it's closed after all branches have "
       "been canceled", () async {
     var branch1 = splitter.split();
     var branch2 = splitter.split();
@@ -233,7 +236,8 @@
     expect(controller.hasListener, isFalse);
   });
 
-  test("the source stream is canceled when all branches are canceled after it "
+  test(
+      "the source stream is canceled when all branches are canceled after it "
       "has been closed", () async {
     var branch1 = splitter.split();
     var branch2 = splitter.split();
@@ -257,7 +261,8 @@
     expect(controller.hasListener, isFalse);
   });
 
-  test("a splitter that's closed before any branches are added never listens "
+  test(
+      "a splitter that's closed before any branches are added never listens "
       "to the source stream", () {
     splitter.close();
 
@@ -265,7 +270,8 @@
     controller.stream.listen(null);
   });
 
-  test("splitFrom splits a source stream into the designated number of "
+  test(
+      "splitFrom splits a source stream into the designated number of "
       "branches", () {
     var branches = StreamSplitter.splitFrom(controller.stream, 5);
 
diff --git a/test/stream_zip_test.dart b/test/stream_zip_test.dart
index 958d1f2..71d8eee 100644
--- a/test/stream_zip_test.dart
+++ b/test/stream_zip_test.dart
@@ -45,40 +45,87 @@
   }
 
   test("Basic", () {
-    testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9])],
-            [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+    testZip([
+      mks([1, 2, 3]),
+      mks([4, 5, 6]),
+      mks([7, 8, 9])
+    ], [
+      [1, 4, 7],
+      [2, 5, 8],
+      [3, 6, 9]
+    ]);
   });
 
   test("Uneven length 1", () {
-    testZip([mks([1, 2, 3, 99, 100]), mks([4, 5, 6]), mks([7, 8, 9])],
-            [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+    testZip([
+      mks([1, 2, 3, 99, 100]),
+      mks([4, 5, 6]),
+      mks([7, 8, 9])
+    ], [
+      [1, 4, 7],
+      [2, 5, 8],
+      [3, 6, 9]
+    ]);
   });
 
   test("Uneven length 2", () {
-    testZip([mks([1, 2, 3]), mks([4, 5, 6, 99, 100]), mks([7, 8, 9])],
-            [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+    testZip([
+      mks([1, 2, 3]),
+      mks([4, 5, 6, 99, 100]),
+      mks([7, 8, 9])
+    ], [
+      [1, 4, 7],
+      [2, 5, 8],
+      [3, 6, 9]
+    ]);
   });
 
   test("Uneven length 3", () {
-    testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])],
-            [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+    testZip([
+      mks([1, 2, 3]),
+      mks([4, 5, 6]),
+      mks([7, 8, 9, 99, 100])
+    ], [
+      [1, 4, 7],
+      [2, 5, 8],
+      [3, 6, 9]
+    ]);
   });
 
   test("Uneven length 4", () {
-    testZip([mks([1, 2, 3, 98]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])],
-            [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+    testZip([
+      mks([1, 2, 3, 98]),
+      mks([4, 5, 6]),
+      mks([7, 8, 9, 99, 100])
+    ], [
+      [1, 4, 7],
+      [2, 5, 8],
+      [3, 6, 9]
+    ]);
   });
 
   test("Empty 1", () {
-    testZip([mks([]), mks([4, 5, 6]), mks([7, 8, 9])], []);
+    testZip([
+      mks([]),
+      mks([4, 5, 6]),
+      mks([7, 8, 9])
+    ], []);
   });
 
   test("Empty 2", () {
-    testZip([mks([1, 2, 3]), mks([]), mks([7, 8, 9])], []);
+    testZip([
+      mks([1, 2, 3]),
+      mks([]),
+      mks([7, 8, 9])
+    ], []);
   });
 
   test("Empty 3", () {
-    testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([])], []);
+    testZip([
+      mks([1, 2, 3]),
+      mks([4, 5, 6]),
+      mks([])
+    ], []);
   });
 
   test("Empty source", () {
@@ -86,57 +133,88 @@
   });
 
   test("Single Source", () {
-    testZip([mks([1, 2, 3])], [[1], [2], [3]]);
+    testZip([
+      mks([1, 2, 3])
+    ], [
+      [1],
+      [2],
+      [3]
+    ]);
   });
 
   test("Other-streams", () {
     Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4);
-    Stream st2 = new Stream.periodic(const Duration(milliseconds: 5),
-                                     (x) => x + 4).take(3);
+    Stream st2 =
+        new Stream.periodic(const Duration(milliseconds: 5), (x) => x + 4)
+            .take(3);
     StreamController c = new StreamController.broadcast();
     Stream st3 = c.stream;
-    testZip([st1, st2, st3],
-            [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
-    c..add(7)..add(8)..add(9)..close();
+    testZip([
+      st1,
+      st2,
+      st3
+    ], [
+      [1, 4, 7],
+      [2, 5, 8],
+      [3, 6, 9]
+    ]);
+    c
+      ..add(7)
+      ..add(8)
+      ..add(9)
+      ..close();
   });
 
   test("Error 1", () {
-    expect(new StreamZip([streamError(mks([1, 2, 3]), 2, "BAD-1"),
-                          mks([4, 5, 6]),
-                          mks([7, 8, 9])]).toList(),
-           throwsA(equals("BAD-1")));
+    expect(
+        new StreamZip([
+          streamError(mks([1, 2, 3]), 2, "BAD-1"),
+          mks([4, 5, 6]),
+          mks([7, 8, 9])
+        ]).toList(),
+        throwsA(equals("BAD-1")));
   });
 
   test("Error 2", () {
-    expect(new StreamZip([mks([1, 2, 3]),
-                          streamError(mks([4, 5, 6]), 5, "BAD-2"),
-                          mks([7, 8, 9])]).toList(),
-           throwsA(equals("BAD-2")));
+    expect(
+        new StreamZip([
+          mks([1, 2, 3]),
+          streamError(mks([4, 5, 6]), 5, "BAD-2"),
+          mks([7, 8, 9])
+        ]).toList(),
+        throwsA(equals("BAD-2")));
   });
 
   test("Error 3", () {
-    expect(new StreamZip([mks([1, 2, 3]),
-                          mks([4, 5, 6]),
-                          streamError(mks([7, 8, 9]), 8, "BAD-3")]).toList(),
-           throwsA(equals("BAD-3")));
+    expect(
+        new StreamZip([
+          mks([1, 2, 3]),
+          mks([4, 5, 6]),
+          streamError(mks([7, 8, 9]), 8, "BAD-3")
+        ]).toList(),
+        throwsA(equals("BAD-3")));
   });
 
   test("Error at end", () {
-    expect(new StreamZip([mks([1, 2, 3]),
-                          streamError(mks([4, 5, 6]), 6, "BAD-4"),
-                          mks([7, 8, 9])]).toList(),
-           throwsA(equals("BAD-4")));
+    expect(
+        new StreamZip([
+          mks([1, 2, 3]),
+          streamError(mks([4, 5, 6]), 6, "BAD-4"),
+          mks([7, 8, 9])
+        ]).toList(),
+        throwsA(equals("BAD-4")));
   });
 
   test("Error before first end", () {
     // StreamControllers' streams with no "close" called will never be done,
     // so the fourth event of the first stream is guaranteed to come first.
-    expect(new StreamZip(
-                [streamError(mks([1, 2, 3, 4]), 4, "BAD-5"),
-                 (new StreamController()..add(4)..add(5)..add(6)).stream,
-                 (new StreamController()..add(7)..add(8)..add(9)).stream]
-               ).toList(),
-           throwsA(equals("BAD-5")));
+    expect(
+        new StreamZip([
+          streamError(mks([1, 2, 3, 4]), 4, "BAD-5"),
+          (new StreamController()..add(4)..add(5)..add(6)).stream,
+          (new StreamController()..add(7)..add(8)..add(9)).stream
+        ]).toList(),
+        throwsA(equals("BAD-5")));
   });
 
   test("Error after first end", () {
@@ -144,40 +222,43 @@
     controller..add(7)..add(8)..add(9);
     // Transformer that puts error into controller when one of the first two
     // streams have sent a done event.
-    StreamTransformer trans = new StreamTransformer.fromHandlers(
-        handleDone: (EventSink s) {
-      Timer.run(() { controller.addError("BAD-6"); });
+    StreamTransformer trans =
+        new StreamTransformer.fromHandlers(handleDone: (EventSink s) {
+      Timer.run(() {
+        controller.addError("BAD-6");
+      });
       s.close();
     });
-    testZip([mks([1, 2, 3]).transform(trans),
-             mks([4, 5, 6]).transform(trans),
-             controller.stream],
-           [[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
+    testZip([
+      mks([1, 2, 3]).transform(trans),
+      mks([4, 5, 6]).transform(trans),
+      controller.stream
+    ], [
+      [1, 4, 7],
+      [2, 5, 8],
+      [3, 6, 9]
+    ]);
   });
 
   test("Pause/Resume", () {
     int sc1p = 0;
-    StreamController c1 = new StreamController(
-      onPause: () {
-        sc1p++;
-      },
-      onResume: () {
-        sc1p--;
-      });
+    StreamController c1 = new StreamController(onPause: () {
+      sc1p++;
+    }, onResume: () {
+      sc1p--;
+    });
 
     int sc2p = 0;
-    StreamController c2 = new StreamController(
-      onPause: () {
-        sc2p++;
-      },
-      onResume: () {
-        sc2p--;
-      });
+    StreamController c2 = new StreamController(onPause: () {
+      sc2p++;
+    }, onResume: () {
+      sc2p--;
+    });
 
-    var done = expectAsync0((){
+    var done = expectAsync0(() {
       expect(sc1p, equals(1));
       expect(sc2p, equals(0));
-    });  // Call to complete test.
+    }); // Call to complete test.
 
     Stream zip = new StreamZip([c1.stream, c2.stream]);
 
@@ -198,7 +279,9 @@
     }).then((hasMore) {
       expect(hasMore, isTrue);
       expect(it.current, equals([5, 6]));
-      new Future.delayed(ms25).then((_) { c2.add(8); });
+      new Future.delayed(ms25).then((_) {
+        c2.add(8);
+      });
       return it.moveNext();
     }).then((hasMore) {
       expect(hasMore, isTrue);
@@ -210,7 +293,12 @@
       done();
     });
 
-    c1..add(1)..add(3)..add(5)..add(7)..close();
+    c1
+      ..add(1)
+      ..add(3)
+      ..add(5)
+      ..add(7)
+      ..close();
     c2..add(2)..add(4);
   });
 
diff --git a/test/stream_zip_zone_test.dart b/test/stream_zip_zone_test.dart
index af4d94a..a0773a6 100644
--- a/test/stream_zip_zone_test.dart
+++ b/test/stream_zip_zone_test.dart
@@ -9,22 +9,22 @@
 // listen occurred.
 
 main() {
- StreamController controller;
- controller = new StreamController();
- testStream("singlesub-async", controller, controller.stream);
- controller = new StreamController.broadcast();
- testStream("broadcast-async", controller, controller.stream);
- controller = new StreamController();
- testStream("asbroadcast-async", controller,
-                                 controller.stream.asBroadcastStream());
+  StreamController controller;
+  controller = new StreamController();
+  testStream("singlesub-async", controller, controller.stream);
+  controller = new StreamController.broadcast();
+  testStream("broadcast-async", controller, controller.stream);
+  controller = new StreamController();
+  testStream(
+      "asbroadcast-async", controller, controller.stream.asBroadcastStream());
 
- controller = new StreamController(sync: true);
- testStream("singlesub-sync", controller, controller.stream);
- controller = new StreamController.broadcast(sync: true);
- testStream("broadcast-sync", controller, controller.stream);
- controller = new StreamController(sync: true);
- testStream("asbroadcast-sync", controller,
-                                controller.stream.asBroadcastStream());
+  controller = new StreamController(sync: true);
+  testStream("singlesub-sync", controller, controller.stream);
+  controller = new StreamController.broadcast(sync: true);
+  testStream("broadcast-sync", controller, controller.stream);
+  controller = new StreamController(sync: true);
+  testStream(
+      "asbroadcast-sync", controller, controller.stream.asBroadcastStream());
 }
 
 void testStream(String name, StreamController controller, Stream stream) {
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart
index 699babe..6e2c9d5 100644
--- a/test/subscription_stream_test.dart
+++ b/test/subscription_stream_test.dart
@@ -73,13 +73,13 @@
     for (var sourceCancels in [false, true]) {
       group("${sourceCancels ? "yes" : "no"}:", () {
         var subscriptionStream;
-        var onCancel;  // Completes if source stream is canceled before done.
+        var onCancel; // Completes if source stream is canceled before done.
         setUp(() {
           var cancelCompleter = new Completer();
           var source = createErrorStream(cancelCompleter);
           onCancel = cancelCompleter.future;
-          var sourceSubscription = source.listen(null,
-                                                 cancelOnError: sourceCancels);
+          var sourceSubscription =
+              source.listen(null, cancelOnError: sourceCancels);
           subscriptionStream = new SubscriptionStream<int>(sourceSubscription);
         });
 
@@ -87,15 +87,15 @@
           var done = new Completer();
           var events = [];
           subscriptionStream.listen(events.add,
-                                    onError: events.add,
-                                    onDone: done.complete,
-                                    cancelOnError: false);
+              onError: events.add, onDone: done.complete, cancelOnError: false);
           var expected = [1, 2, "To err is divine!"];
           if (sourceCancels) {
             await onCancel;
             // And [done] won't complete at all.
             bool isDone = false;
-            done.future.then((_) { isDone = true; });
+            done.future.then((_) {
+              isDone = true;
+            });
             await new Future.delayed(const Duration(milliseconds: 5));
             expect(isDone, false);
           } else {
@@ -109,12 +109,12 @@
           var completer = new Completer();
           var events = [];
           subscriptionStream.listen(events.add,
-                                    onError: (value) {
-                                      events.add(value);
-                                      completer.complete();
-                                    },
-                                    onDone: () => throw "should not happen",
-                                    cancelOnError: true);
+              onError: (value) {
+                events.add(value);
+                completer.complete();
+              },
+              onDone: () => throw "should not happen",
+              cancelOnError: true);
           await completer.future;
           await flushMicrotasks();
           expect(events, [1, 2, "To err is divine!"]);
@@ -128,8 +128,7 @@
           var stream = createStream();
           var sourceSubscription =
               stream.listen(null, cancelOnError: cancelOnError);
-          var subscriptionStream =
-              new SubscriptionStream(sourceSubscription);
+          var subscriptionStream = new SubscriptionStream(sourceSubscription);
           var subscription =
               subscriptionStream.listen(null, cancelOnError: cancelOnError);
           expect(subscription.asFuture(42), completion(42));
@@ -137,10 +136,9 @@
 
         test("- error goes to asFuture", () async {
           var stream = createErrorStream();
-          var sourceSubscription = stream.listen(null,
-                                                 cancelOnError: cancelOnError);
-          var subscriptionStream =
-              new SubscriptionStream(sourceSubscription);
+          var sourceSubscription =
+              stream.listen(null, cancelOnError: cancelOnError);
+          var subscriptionStream = new SubscriptionStream(sourceSubscription);
 
           var subscription =
               subscriptionStream.listen(null, cancelOnError: cancelOnError);
diff --git a/test/subscription_transformer_test.dart b/test/subscription_transformer_test.dart
index cdac4f8..dbbf597 100644
--- a/test/subscription_transformer_test.dart
+++ b/test/subscription_transformer_test.dart
@@ -86,13 +86,11 @@
       var controller = new StreamController(onCancel: expectAsync0(() {
         isCanceled = true;
       }));
-      var subscription = controller.stream
-          .transform(subscriptionTransformer(
-              handleCancel: expectAsync1((inner) {
-                callbackInvoked = true;
-                inner.cancel();
-              })))
-          .listen(expectAsync1((_) {}, count: 0));
+      var subscription = controller.stream.transform(
+          subscriptionTransformer(handleCancel: expectAsync1((inner) {
+        callbackInvoked = true;
+        inner.cancel();
+      }))).listen(expectAsync1((_) {}, count: 0));
 
       await flushMicrotasks();
       expect(callbackInvoked, isFalse);
@@ -103,7 +101,7 @@
       expect(callbackInvoked, isTrue);
       expect(isCanceled, isTrue);
     });
-    
+
     test("invokes the callback once and caches its result", () async {
       var completer = new Completer();
       var controller = new StreamController();
@@ -138,7 +136,8 @@
       var pauseCount = 0;
       var controller = new StreamController();
       var subscription = controller.stream
-          .transform(subscriptionTransformer(handlePause: expectAsync1((inner) {
+          .transform(subscriptionTransformer(
+              handlePause: expectAsync1((inner) {
             pauseCount++;
             inner.pause();
           }, count: 3)))
@@ -187,9 +186,9 @@
       var subscription = controller.stream
           .transform(subscriptionTransformer(
               handleResume: expectAsync1((inner) {
-                resumeCount++;
-                inner.resume();
-              }, count: 3)))
+            resumeCount++;
+            inner.resume();
+          }, count: 3)))
           .listen(expectAsync1((_) {}, count: 0));
 
       await flushMicrotasks();
@@ -216,13 +215,11 @@
     test("invokes the callback when a resume future completes", () async {
       var resumed = false;
       var controller = new StreamController();
-      var subscription = controller.stream
-          .transform(subscriptionTransformer(
-              handleResume: expectAsync1((inner) {
-                resumed = true;
-                inner.resume();
-              })))
-          .listen(expectAsync1((_) {}, count: 0));
+      var subscription = controller.stream.transform(
+          subscriptionTransformer(handleResume: expectAsync1((inner) {
+        resumed = true;
+        inner.resume();
+      }))).listen(expectAsync1((_) {}, count: 0));
 
       var completer = new Completer();
       subscription.pause(completer.future);
@@ -255,8 +252,7 @@
       var controller = new StreamController();
       subscription = controller.stream
           .transform(subscriptionTransformer(handleCancel: (_) {}))
-          .listen(
-              expectAsync1((_) {}, count: 0),
+          .listen(expectAsync1((_) {}, count: 0),
               onError: expectAsync2((_, __) {}, count: 0),
               onDone: expectAsync0(() {}, count: 0));
       subscription.cancel();
diff --git a/test/typed_wrapper/future_test.dart b/test/typed_wrapper/future_test.dart
index b928e05..0c8b00a 100644
--- a/test/typed_wrapper/future_test.dart
+++ b/test/typed_wrapper/future_test.dart
@@ -32,20 +32,22 @@
               test: expectAsync1((_) {}, count: 0)),
           completion(equals(12)));
 
-      expect(errorWrapper.catchError(expectAsync1((error) {
-        expect(error, equals("oh no"));
-        return 42;
-      }), test: expectAsync1((error) {
-        expect(error, equals("oh no"));
-        return true;
-      })), completion(equals(42)));
+      expect(
+          errorWrapper.catchError(expectAsync1((error) {
+            expect(error, equals("oh no"));
+            return 42;
+          }), test: expectAsync1((error) {
+            expect(error, equals("oh no"));
+            return true;
+          })),
+          completion(equals(42)));
     });
 
     test("then()", () {
-      expect(wrapper.then((value) => value.toString()),
-          completion(equals("12")));
-      expect(errorWrapper.then(expectAsync1((_) {}, count: 0)),
-          throwsA("oh no"));
+      expect(
+          wrapper.then((value) => value.toString()), completion(equals("12")));
+      expect(
+          errorWrapper.then(expectAsync1((_) {}, count: 0)), throwsA("oh no"));
     });
 
     test("whenComplete()", () {
@@ -96,11 +98,11 @@
         expect(wrapper.timeout(new Duration(seconds: 3)).then((_) {}),
             throwsCastError);
 
-      expect(
-          new TypeSafeFuture<int>(new Completer<Object>().future)
-              .timeout(Duration.ZERO, onTimeout: expectAsync0(() => "foo"))
-              .then((_) {}),
-          throwsCastError);
+        expect(
+            new TypeSafeFuture<int>(new Completer<Object>().future)
+                .timeout(Duration.ZERO, onTimeout: expectAsync0(() => "foo"))
+                .then((_) {}),
+            throwsCastError);
       });
     });
   });
diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart
index 07a8bd2..f52abe7 100644
--- a/test/typed_wrapper/stream_subscription_test.dart
+++ b/test/typed_wrapper/stream_subscription_test.dart
@@ -18,8 +18,8 @@
       controller = new StreamController<Object>(onCancel: () {
         isCanceled = true;
       });
-      wrapper = new TypeSafeStreamSubscription<int>(
-          controller.stream.listen(null));
+      wrapper =
+          new TypeSafeStreamSubscription<int>(controller.stream.listen(null));
     });
 
     test("onData()", () {
@@ -75,8 +75,8 @@
       controller = new StreamController<Object>(onCancel: () {
         isCanceled = true;
       });
-      wrapper = new TypeSafeStreamSubscription<int>(
-          controller.stream.listen(null));
+      wrapper =
+          new TypeSafeStreamSubscription<int>(controller.stream.listen(null));
     });
 
     group("throws a CastError for", () {
diff --git a/test/typed_wrapper/stream_test.dart b/test/typed_wrapper/stream_test.dart
index 12a130a..88fbee6 100644
--- a/test/typed_wrapper/stream_test.dart
+++ b/test/typed_wrapper/stream_test.dart
@@ -18,14 +18,19 @@
     var errorWrapper;
     setUp(() {
       controller = new StreamController<Object>()
-          ..add(1)..add(2)..add(3)..add(4)..add(5)..close();
+        ..add(1)
+        ..add(2)
+        ..add(3)
+        ..add(4)
+        ..add(5)
+        ..close();
 
       // TODO(nweiz): Use public methods when test#414 is fixed and we can run
       // this on DDC.
       wrapper = new TypeSafeStream<int>(controller.stream);
       emptyWrapper = new TypeSafeStream<int>(new Stream<Object>.empty());
-      singleWrapper = new TypeSafeStream<int>(
-          new Stream<Object>.fromIterable([1]));
+      singleWrapper =
+          new TypeSafeStream<int>(new Stream<Object>.fromIterable([1]));
       errorWrapper = new TypeSafeStream<int>(
           new Stream<Object>.fromFuture(new Future.error("oh no")));
     });
@@ -70,8 +75,8 @@
       });
 
       test("with onListen", () {
-        var broadcast = wrapper.asBroadcastStream(
-            onListen: expectAsync1((subscription) {
+        var broadcast =
+            wrapper.asBroadcastStream(onListen: expectAsync1((subscription) {
           expect(subscription, new isInstanceOf<StreamSubscription<int>>());
           subscription.pause();
         }));
@@ -81,8 +86,8 @@
       });
 
       test("with onCancel", () {
-        var broadcast = wrapper.asBroadcastStream(
-            onCancel: expectAsync1((subscription) {
+        var broadcast =
+            wrapper.asBroadcastStream(onCancel: expectAsync1((subscription) {
           expect(subscription, new isInstanceOf<StreamSubscription<int>>());
           subscription.pause();
         }));
@@ -105,13 +110,14 @@
 
     group("distinct()", () {
       test("without equals", () {
-        expect(wrapper.distinct().toList(),
-            completion(equals([1, 2, 3, 4, 5])));
+        expect(
+            wrapper.distinct().toList(), completion(equals([1, 2, 3, 4, 5])));
 
         expect(
             new TypeSafeStream<int>(
                     new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
-                .distinct().toList(),
+                .distinct()
+                .toList(),
             completion(equals([1, 2, 3])));
       });
 
@@ -133,8 +139,7 @@
     });
 
     test("expand()", () {
-      expect(
-          wrapper.expand((i) => [i, i]).toList(),
+      expect(wrapper.expand((i) => [i, i]).toList(),
           completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
     });
 
@@ -197,26 +202,32 @@
 
     group("handleError()", () {
       test("without a test", () {
-        expect(errorWrapper.handleError(expectAsync1((error) {
-          expect(error, equals("oh no"));
-        })).toList(), completion(isEmpty));
+        expect(
+            errorWrapper.handleError(expectAsync1((error) {
+              expect(error, equals("oh no"));
+            })).toList(),
+            completion(isEmpty));
       });
 
       test("with a matching test", () {
-        expect(errorWrapper.handleError(expectAsync1((error) {
-          expect(error, equals("oh no"));
-        }), test: expectAsync1((error) {
-          expect(error, equals("oh no"));
-          return true;
-        })).toList(), completion(isEmpty));
+        expect(
+            errorWrapper.handleError(expectAsync1((error) {
+              expect(error, equals("oh no"));
+            }), test: expectAsync1((error) {
+              expect(error, equals("oh no"));
+              return true;
+            })).toList(),
+            completion(isEmpty));
       });
 
       test("with a matching test", () {
-        expect(errorWrapper.handleError(expectAsync1((_) {}, count: 0),
-            test: expectAsync1((error) {
-          expect(error, equals("oh no"));
-          return false;
-        })).toList(), throwsA("oh no"));
+        expect(
+            errorWrapper.handleError(expectAsync1((_) {}, count: 0),
+                test: expectAsync1((error) {
+              expect(error, equals("oh no"));
+              return false;
+            })).toList(),
+            throwsA("oh no"));
       });
     });
 
@@ -260,24 +271,23 @@
     });
 
     test("takeWhile()", () {
-      expect(wrapper.takeWhile((i) => i < 3).toList(),
-          completion(equals([1, 2])));
+      expect(
+          wrapper.takeWhile((i) => i < 3).toList(), completion(equals([1, 2])));
     });
 
     test("toSet()", () {
       expect(wrapper.toSet(), completion(unorderedEquals([1, 2, 3, 4, 5])));
       expect(
           new TypeSafeStream<int>(
-                  new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
-              .toSet(),
+              new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3])).toSet(),
           completion(unorderedEquals([1, 2, 3])));
     });
 
     test("transform()", () {
-     var transformer = new StreamTransformer<int, String>.fromHandlers(
-         handleData: (data, sink) {
-       sink.add(data.toString());
-     });
+      var transformer = new StreamTransformer<int, String>.fromHandlers(
+          handleData: (data, sink) {
+        sink.add(data.toString());
+      });
 
       expect(wrapper.transform(transformer).toList(),
           completion(equals(["1", "2", "3", "4", "5"])));
@@ -385,8 +395,8 @@
     setUp(() {
       wrapper = new TypeSafeStream<int>(
           new Stream<Object>.fromIterable(["foo", "bar", "baz"]));
-      singleWrapper = new TypeSafeStream<int>(
-          new Stream<Object>.fromIterable(["foo"]));
+      singleWrapper =
+          new TypeSafeStream<int>(new Stream<Object>.fromIterable(["foo"]));
     });
 
     group("throws a CastError for", () {
@@ -439,8 +449,8 @@
       });
 
       test("lastWhere()", () {
-        expect(wrapper.lastWhere(expectAsync1((_) {}, count: 0)),
-            throwsCastError);
+        expect(
+            wrapper.lastWhere(expectAsync1((_) {}, count: 0)), throwsCastError);
       });
 
       test("singleWhere()", () {
@@ -454,7 +464,8 @@
       });
 
       test("forEach()", () async {
-        expect(wrapper.forEach(expectAsync1((_) {}, count: 0)), throwsCastError);
+        expect(
+            wrapper.forEach(expectAsync1((_) {}, count: 0)), throwsCastError);
       });
 
       test("handleError()", () {
@@ -468,8 +479,8 @@
       });
 
       test("map()", () {
-        expect(wrapper.map(expectAsync1((_) {}, count: 0)).first,
-            throwsCastError);
+        expect(
+            wrapper.map(expectAsync1((_) {}, count: 0)).first, throwsCastError);
       });
 
       test("reduce()", () {
diff --git a/test/utils.dart b/test/utils.dart
index 8499eb2..9270886 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -17,23 +17,24 @@
 ///
 /// Returns a function that fails the test if it is ever called.
 OptionalArgAction unreachable(String name) =>
-        ([a, b]) => fail("Unreachable: $name");
+    ([a, b]) => fail("Unreachable: $name");
 
 // TODO(nweiz): Use the version of this in test when test#418 is fixed.
 /// A matcher that runs a callback in its own zone and asserts that that zone
 /// emits an error that matches [matcher].
 Matcher throwsZoned(matcher) => predicate((callback) {
-  var firstError = true;
-  runZoned(callback, onError: expectAsync2((error, stackTrace) {
-    if (firstError) {
-      expect(error, matcher);
-      firstError = false;
-    } else {
-      registerException(error, stackTrace);
-    }
-  }, max: -1));
-  return true;
-});
+      var firstError = true;
+      runZoned(callback,
+          onError: expectAsync2((error, stackTrace) {
+            if (firstError) {
+              expect(error, matcher);
+              firstError = false;
+            } else {
+              registerException(error, stackTrace);
+            }
+          }, max: -1));
+      return true;
+    });
 
 /// A matcher that runs a callback in its own zone and asserts that that zone
 /// emits a [CastError].