Clean up `CancelableOperation`. (#204)

* Clean up `CancelableOperation`.

Some clean-ups and simplifications of the code.
Should not change any behavior, but differences in future chaining
can potentially change timing.

But don't change `Future` to `Future<void>` today.
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 58684d6..95332dc 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -4,10 +4,6 @@
 
 import 'dart:async';
 
-import 'package:async/async.dart';
-
-import 'utils.dart';
-
 /// An asynchronous operation that can be cancelled.
 ///
 /// The value of this operation is exposed as [value]. When this operation is
@@ -16,28 +12,25 @@
 class CancelableOperation<T> {
   /// The completer that produced this operation.
   ///
-  /// This is canceled when [cancel] is called.
+  /// That completer is canceled when [cancel] is called.
   final CancelableCompleter<T> _completer;
 
   CancelableOperation._(this._completer);
 
-  /// Creates a [CancelableOperation] wrapping [inner].
+  /// Creates a [CancelableOperation] with the same result as the [result] future.
   ///
   /// When this operation is canceled, [onCancel] will be called and any value
-  /// or error produced by [inner] will be discarded. If [onCancel] returns a
-  /// [Future], it will be forwarded to [cancel].
+  /// or error later produced by [result] will be discarded.
+  /// If [onCancel] returns a [Future], it will be returned by [cancel].
   ///
-  /// [onCancel] will be called synchronously when the operation is canceled.
-  /// It's guaranteed to only be called once.
+  /// The [onCancel] funcion will be called synchronously
+  /// when the new operation is canceled, and will be called at most once.\
   ///
-  /// Calling this constructor is equivalent to creating a [CancelableCompleter]
-  /// and completing it with [inner].
-  factory CancelableOperation.fromFuture(Future<T> inner,
-      {FutureOr Function()? onCancel}) {
-    var completer = CancelableCompleter<T>(onCancel: onCancel);
-    completer.complete(inner);
-    return completer.operation;
-  }
+  /// Calling this constructor is equivalent to creating a
+  /// [CancelableCompleter] and completing it with [result].
+  factory CancelableOperation.fromFuture(Future<T> result,
+          {FutureOr Function()? onCancel}) =>
+      (CancelableCompleter<T>(onCancel: onCancel)..complete(result)).operation;
 
   /// Creates a [CancelableOperation] wrapping [subscription].
   ///
@@ -58,12 +51,12 @@
     return completer.operation;
   }
 
-  /// Returns a [CancelableOperation] that completes with the value of the first
+  /// Creates a [CancelableOperation] that completes with the value of the first
   /// of [operations] to complete.
   ///
   /// Once any of [operations] completes, its result is forwarded to the
-  /// returned [CancelableOperation] and the rest are cancelled. When the
-  /// returned operation is cancelled, all the [operations] are cancelled as
+  /// new [CancelableOperation] and the rest are cancelled. If the
+  /// bew operation is cancelled, all the [operations] are cancelled as
   /// well.
   static CancelableOperation<T> race<T>(
       Iterable<CancelableOperation<T>> operations) {
@@ -73,8 +66,8 @@
     }
 
     var done = false;
-    // Note: if one of the completers has already completed, it's not actually
-    // cancelled by this.
+    // Note: if one or more of the completers have already completed,
+    // they're not actually cancelled by this.
     Future<void> _cancelAll() {
       done = true;
       return Future.wait(operations.map((operation) => operation.cancel()));
@@ -83,11 +76,11 @@
     var completer = CancelableCompleter<T>(onCancel: _cancelAll);
     for (var operation in operations) {
       operation.then((value) {
-        if (!done) completer.complete(_cancelAll().then((_) => value));
+        if (!done) _cancelAll().whenComplete(() => completer.complete(value));
       }, onError: (error, stackTrace) {
         if (!done) {
-          completer.complete(
-              _cancelAll().then((_) => Future.error(error, stackTrace)));
+          _cancelAll()
+              .whenComplete(() => completer.completeError(error, stackTrace));
         }
       });
     }
@@ -95,13 +88,17 @@
     return completer.operation;
   }
 
-  /// The value returned by the operation.
+  /// The result of this operation, if not cancelled.
+  ///
+  /// This future will not complete if the operation is cancelled.
+  /// Use [valueOrCancellation] for a future which completes
+  /// both if the operation is cancelled and if it isn't.
   Future<T> get value => _completer._inner.future;
 
   /// Creates a [Stream] containing the result of this operation.
   ///
   /// This is like `value.asStream()`, but if a subscription to the stream is
-  /// canceled, this is as well.
+  /// canceled, this operation is as well.
   Stream<T> asStream() {
     var controller =
         StreamController<T>(sync: true, onCancel: _completer._cancel);
@@ -124,28 +121,31 @@
   /// returned by [cancel], then completes to [cancellationValue].
   Future<T?> valueOrCancellation([T? cancellationValue]) {
     var completer = Completer<T?>.sync();
-    value.then((result) => completer.complete(result),
-        onError: completer.completeError);
+    value.then(completer.complete, onError: completer.completeError);
 
-    _completer._cancelMemo.future.then((_) {
+    _completer._cancelCompleter.future.then((_) {
       completer.complete(cancellationValue);
     }, onError: completer.completeError);
 
     return completer.future;
   }
 
-  /// Registers callbacks to be called when this operation completes.
+  /// Creates a new cancelable operation to be completed
+  /// when this operation completes or is cancelled.
   ///
-  /// [onValue] and [onError] behave in the same way as [Future.then].
+  /// The [onValue] and [onError] callbacks behave in the same way as
+  /// for [Future.then], and the result of those callbacks is used to complete
+  /// the returned cancelable operation.
   ///
-  /// If [onCancel] is provided, and this operation is canceled, the [onCancel]
-  /// callback is called and the returned operation completes with the result.
+  /// If [onCancel] is provided, and the this operation is canceled,
+  /// the [onCancel] callback is called and the returned operation completes
+  /// with the result returned by that call.
   ///
-  /// If [onCancel] is not given, and this operation is canceled, then the
-  /// returned operation is canceled.
+  /// If [onCancel] is not provided, and this operation is canceled, then the
+  /// returned operation is also canceled.
   ///
-  /// If [propagateCancel] is `true` and the returned operation is canceled then
-  /// this operation is canceled. The default is `false`.
+  /// If [propagateCancel] is `true` and the returned operation is canceled
+  /// then this operation is canceled. The default is `false`.
   CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
       {FutureOr<R> Function(Object, StackTrace)? onError,
       FutureOr<R> Function()? onCancel,
@@ -153,26 +153,19 @@
     final completer =
         CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
 
-    valueOrCancellation().then((T? result) {
-      if (!completer.isCanceled) {
-        if (isCompleted && !isCanceled) {
-          assert(result is T);
-          completer.complete(Future.sync(() => onValue(result as T)));
-        } else if (onCancel != null) {
-          completer.complete(Future.sync(onCancel));
-        } else {
-          completer._cancel();
-        }
-      }
-    }, onError: (Object error, StackTrace stackTrace) {
-      if (!completer.isCanceled) {
-        if (onError != null) {
-          completer.complete(Future.sync(() => onError(error, stackTrace)));
-        } else {
-          completer.completeError(error, stackTrace);
-        }
+    if (!isCanceled) {
+      value
+          .then(onValue, onError: onError)
+          .then(completer.complete, onError: completer.completeError);
+    }
+    _completer._cancelCompleter.future.then((_) {
+      if (onCancel != null) {
+        completer.complete(Future.sync(onCancel));
+      } else {
+        completer._cancel();
       }
     });
+
     return completer.operation;
   }
 
@@ -196,24 +189,51 @@
 /// A completer for a [CancelableOperation].
 class CancelableCompleter<T> {
   /// The completer for the wrapped future.
+  ///
+  /// At most one of `_inner.future` and `_cancelCompleter.future` will
+  /// ever complete.
   final _inner = Completer<T>();
 
-  /// The callback to call if the future is canceled.
-  final FutureOrCallback? _onCancel;
+  /// Completed when `cancel` is called.
+  ///
+  /// At most one of `_inner.future` and `_cancelCompleter.future` will
+  /// ever complete.
+  final _cancelCompleter = Completer<void>();
 
-  /// Creates a new completer for a [CancelableOperation].
-  ///
-  /// When the future operation canceled, as long as the completer hasn't yet
-  /// completed, [onCancel] is called. If [onCancel] returns a [Future], it's
-  /// forwarded to [CancelableOperation.cancel].
-  ///
-  /// [onCancel] will be called synchronously when the operation is canceled.
-  /// It's guaranteed to only be called once.
-  CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel;
+  /// The callback to call if the operation is canceled.
+  final FutureOr<void> Function()? _onCancel;
 
   /// The operation controlled by this completer.
   late final operation = CancelableOperation<T>._(this);
 
+  /// Set when [complete] or [completeError] is called.
+  ///
+  /// Completing twice is not allowed.
+  ///
+  /// If [complete] is called with a future, it's still possible to
+  /// cancel the operation until that future completes,
+  /// so this value and [_isCanceled] are not mutually exclusive.
+  bool _isCompleted = false;
+
+  /// Set when [cancel] is called.
+  ///
+  /// Cancelling twice does nothing, nor does completing after cancelling.
+  bool _isCanceled = false;
+
+  /// Creates a new completer for a [CancelableOperation].
+  ///
+  /// The cancelable [operation] can be completed using
+  /// [complete] or [completeError].
+  ///
+  /// The [onCancel] function is called if the [operation] is canceled,
+  /// by calling [CancelableOperation.cancel]
+  /// before the operation has completed.
+  /// If [onCancel] returns a [Future],
+  /// that future is also returned by [CancelableOperation.cancel].
+  ///
+  /// The [onCancel] function will be called at most once.
+  CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel;
+
   /// Whether the [complete] or [completeError] have been called.
   ///
   /// Once this completer has been completed with either a result or error,
@@ -223,20 +243,16 @@
   /// completed before it's [operation] is completed. In that case the
   /// [operation] may still be canceled before the result is available.
   bool get isCompleted => _isCompleted;
-  bool _isCompleted = false;
 
   /// Whether the completer was canceled before the result was ready.
   bool get isCanceled => _isCanceled;
-  bool _isCanceled = false;
 
-  /// The memoizer for [_cancel].
-  final _cancelMemo = AsyncMemoizer();
-
-  /// Completes [operation] to [value].
+  /// Completes [operation] with [value].
   ///
-  /// If [value] is a [Future] the [operation] will complete with the result of
-  /// that `Future` once it is available.
-  /// In that case [isComplete] will be true before the [operation] is complete.
+  /// If [value] is a [Future] the [operation] will complete
+  /// with the result of that `Future` once it is available.
+  /// In that case [isComplete] will be `true` before the [operation]
+  /// is complete.
   ///
   /// If the type [T] is not nullable [value] may be not be omitted or `null`.
   ///
@@ -247,20 +263,19 @@
     if (_isCompleted) throw StateError('Operation already completed');
     _isCompleted = true;
 
-    if (value is! Future) {
+    if (value is! Future<T>) {
       if (_isCanceled) return;
       _inner.complete(value);
       return;
     }
 
-    final future = value as Future<T>;
     if (_isCanceled) {
       // Make sure errors from [value] aren't top-leveled.
-      future.catchError((_) {});
+      value.ignore();
       return;
     }
 
-    future.then((result) {
+    value.then((result) {
       if (_isCanceled) return;
       _inner.complete(result);
     }, onError: (Object error, StackTrace stackTrace) {
@@ -282,24 +297,27 @@
     _inner.completeError(error, stackTrace);
   }
 
-  /// Cancel the operation.
+  /// Cancels the operation.
   ///
-  /// This call is be ignored if the result of the operation is already
-  /// available.
+  /// If the operation has already completed, prior to being cancelled,
+  /// this method does nothing.
+  /// If the operation has already been cancelled, this method returns
+  /// the same result as the first call to `_cancel`.
+  ///
   /// The result of the operation may only be available some time after
   /// the completer has been completed (using [complete] or [completeError],
   /// which sets [isCompleted] to true) if completed with a [Future].
   /// The completer can be cancelled until the result becomes available,
   /// even if [isCompleted] is true.
-  ///
-  /// This call is ignored if this completer has already been canceled.
-  Future _cancel() {
-    if (_inner.isCompleted) return Future.value();
+  Future<void> _cancel() {
+    if (_inner.isCompleted) return Future.value(null);
 
-    return _cancelMemo.runOnce(() {
+    if (!_isCanceled) {
       _isCanceled = true;
       var onCancel = _onCancel;
-      if (onCancel != null) return onCancel();
-    });
+      _cancelCompleter
+          .complete(onCancel == null ? null : Future.sync(onCancel));
+    }
+    return _cancelCompleter.future;
   }
 }
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index e6779ee..e0facaa 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -5,7 +5,6 @@
 import 'dart:async';
 
 import 'stream_completer.dart';
-import 'utils.dart';
 
 /// A [Stream] wrapper that forwards to another [Stream] that's initialized
 /// lazily.
@@ -15,7 +14,7 @@
 /// produce a `Stream`.
 class LazyStream<T> extends Stream<T> {
   /// The callback that's called to create the inner stream.
-  FutureOrCallback<Stream<T>>? _callback;
+  FutureOr<Stream<T>> Function()? _callback;
 
   /// Creates a single-subscription `Stream` that calls [callback] when it gets
   /// a listener and forwards to the returned stream.
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
deleted file mode 100644
index 39e5a8a..0000000
--- a/lib/src/utils.dart
+++ /dev/null
@@ -1,12 +0,0 @@
-// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-
-/// A generic typedef for a function that takes one type and returns another.
-typedef UnaryFunction<E, F> = F Function(E argument);
-
-/// A typedef for a function that takes no arguments and returns a Future or a
-/// value.
-typedef FutureOrCallback<T> = FutureOr<T> Function();