Clean up `CancelableOperation`. (#204)
* Clean up `CancelableOperation`.
Some clean-ups and simplifications of the code.
Should not change any behavior, but differences in future chaining
can potentially change timing.
But don't change `Future` to `Future<void>` today.
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 58684d6..95332dc 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -4,10 +4,6 @@
import 'dart:async';
-import 'package:async/async.dart';
-
-import 'utils.dart';
-
/// An asynchronous operation that can be cancelled.
///
/// The value of this operation is exposed as [value]. When this operation is
@@ -16,28 +12,25 @@
class CancelableOperation<T> {
/// The completer that produced this operation.
///
- /// This is canceled when [cancel] is called.
+ /// That completer is canceled when [cancel] is called.
final CancelableCompleter<T> _completer;
CancelableOperation._(this._completer);
- /// Creates a [CancelableOperation] wrapping [inner].
+ /// Creates a [CancelableOperation] with the same result as the [result] future.
///
/// When this operation is canceled, [onCancel] will be called and any value
- /// or error produced by [inner] will be discarded. If [onCancel] returns a
- /// [Future], it will be forwarded to [cancel].
+ /// or error later produced by [result] will be discarded.
+ /// If [onCancel] returns a [Future], it will be returned by [cancel].
///
- /// [onCancel] will be called synchronously when the operation is canceled.
- /// It's guaranteed to only be called once.
+ /// The [onCancel] funcion will be called synchronously
+ /// when the new operation is canceled, and will be called at most once.\
///
- /// Calling this constructor is equivalent to creating a [CancelableCompleter]
- /// and completing it with [inner].
- factory CancelableOperation.fromFuture(Future<T> inner,
- {FutureOr Function()? onCancel}) {
- var completer = CancelableCompleter<T>(onCancel: onCancel);
- completer.complete(inner);
- return completer.operation;
- }
+ /// Calling this constructor is equivalent to creating a
+ /// [CancelableCompleter] and completing it with [result].
+ factory CancelableOperation.fromFuture(Future<T> result,
+ {FutureOr Function()? onCancel}) =>
+ (CancelableCompleter<T>(onCancel: onCancel)..complete(result)).operation;
/// Creates a [CancelableOperation] wrapping [subscription].
///
@@ -58,12 +51,12 @@
return completer.operation;
}
- /// Returns a [CancelableOperation] that completes with the value of the first
+ /// Creates a [CancelableOperation] that completes with the value of the first
/// of [operations] to complete.
///
/// Once any of [operations] completes, its result is forwarded to the
- /// returned [CancelableOperation] and the rest are cancelled. When the
- /// returned operation is cancelled, all the [operations] are cancelled as
+ /// new [CancelableOperation] and the rest are cancelled. If the
+ /// bew operation is cancelled, all the [operations] are cancelled as
/// well.
static CancelableOperation<T> race<T>(
Iterable<CancelableOperation<T>> operations) {
@@ -73,8 +66,8 @@
}
var done = false;
- // Note: if one of the completers has already completed, it's not actually
- // cancelled by this.
+ // Note: if one or more of the completers have already completed,
+ // they're not actually cancelled by this.
Future<void> _cancelAll() {
done = true;
return Future.wait(operations.map((operation) => operation.cancel()));
@@ -83,11 +76,11 @@
var completer = CancelableCompleter<T>(onCancel: _cancelAll);
for (var operation in operations) {
operation.then((value) {
- if (!done) completer.complete(_cancelAll().then((_) => value));
+ if (!done) _cancelAll().whenComplete(() => completer.complete(value));
}, onError: (error, stackTrace) {
if (!done) {
- completer.complete(
- _cancelAll().then((_) => Future.error(error, stackTrace)));
+ _cancelAll()
+ .whenComplete(() => completer.completeError(error, stackTrace));
}
});
}
@@ -95,13 +88,17 @@
return completer.operation;
}
- /// The value returned by the operation.
+ /// The result of this operation, if not cancelled.
+ ///
+ /// This future will not complete if the operation is cancelled.
+ /// Use [valueOrCancellation] for a future which completes
+ /// both if the operation is cancelled and if it isn't.
Future<T> get value => _completer._inner.future;
/// Creates a [Stream] containing the result of this operation.
///
/// This is like `value.asStream()`, but if a subscription to the stream is
- /// canceled, this is as well.
+ /// canceled, this operation is as well.
Stream<T> asStream() {
var controller =
StreamController<T>(sync: true, onCancel: _completer._cancel);
@@ -124,28 +121,31 @@
/// returned by [cancel], then completes to [cancellationValue].
Future<T?> valueOrCancellation([T? cancellationValue]) {
var completer = Completer<T?>.sync();
- value.then((result) => completer.complete(result),
- onError: completer.completeError);
+ value.then(completer.complete, onError: completer.completeError);
- _completer._cancelMemo.future.then((_) {
+ _completer._cancelCompleter.future.then((_) {
completer.complete(cancellationValue);
}, onError: completer.completeError);
return completer.future;
}
- /// Registers callbacks to be called when this operation completes.
+ /// Creates a new cancelable operation to be completed
+ /// when this operation completes or is cancelled.
///
- /// [onValue] and [onError] behave in the same way as [Future.then].
+ /// The [onValue] and [onError] callbacks behave in the same way as
+ /// for [Future.then], and the result of those callbacks is used to complete
+ /// the returned cancelable operation.
///
- /// If [onCancel] is provided, and this operation is canceled, the [onCancel]
- /// callback is called and the returned operation completes with the result.
+ /// If [onCancel] is provided, and the this operation is canceled,
+ /// the [onCancel] callback is called and the returned operation completes
+ /// with the result returned by that call.
///
- /// If [onCancel] is not given, and this operation is canceled, then the
- /// returned operation is canceled.
+ /// If [onCancel] is not provided, and this operation is canceled, then the
+ /// returned operation is also canceled.
///
- /// If [propagateCancel] is `true` and the returned operation is canceled then
- /// this operation is canceled. The default is `false`.
+ /// If [propagateCancel] is `true` and the returned operation is canceled
+ /// then this operation is canceled. The default is `false`.
CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
{FutureOr<R> Function(Object, StackTrace)? onError,
FutureOr<R> Function()? onCancel,
@@ -153,26 +153,19 @@
final completer =
CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
- valueOrCancellation().then((T? result) {
- if (!completer.isCanceled) {
- if (isCompleted && !isCanceled) {
- assert(result is T);
- completer.complete(Future.sync(() => onValue(result as T)));
- } else if (onCancel != null) {
- completer.complete(Future.sync(onCancel));
- } else {
- completer._cancel();
- }
- }
- }, onError: (Object error, StackTrace stackTrace) {
- if (!completer.isCanceled) {
- if (onError != null) {
- completer.complete(Future.sync(() => onError(error, stackTrace)));
- } else {
- completer.completeError(error, stackTrace);
- }
+ if (!isCanceled) {
+ value
+ .then(onValue, onError: onError)
+ .then(completer.complete, onError: completer.completeError);
+ }
+ _completer._cancelCompleter.future.then((_) {
+ if (onCancel != null) {
+ completer.complete(Future.sync(onCancel));
+ } else {
+ completer._cancel();
}
});
+
return completer.operation;
}
@@ -196,24 +189,51 @@
/// A completer for a [CancelableOperation].
class CancelableCompleter<T> {
/// The completer for the wrapped future.
+ ///
+ /// At most one of `_inner.future` and `_cancelCompleter.future` will
+ /// ever complete.
final _inner = Completer<T>();
- /// The callback to call if the future is canceled.
- final FutureOrCallback? _onCancel;
+ /// Completed when `cancel` is called.
+ ///
+ /// At most one of `_inner.future` and `_cancelCompleter.future` will
+ /// ever complete.
+ final _cancelCompleter = Completer<void>();
- /// Creates a new completer for a [CancelableOperation].
- ///
- /// When the future operation canceled, as long as the completer hasn't yet
- /// completed, [onCancel] is called. If [onCancel] returns a [Future], it's
- /// forwarded to [CancelableOperation.cancel].
- ///
- /// [onCancel] will be called synchronously when the operation is canceled.
- /// It's guaranteed to only be called once.
- CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel;
+ /// The callback to call if the operation is canceled.
+ final FutureOr<void> Function()? _onCancel;
/// The operation controlled by this completer.
late final operation = CancelableOperation<T>._(this);
+ /// Set when [complete] or [completeError] is called.
+ ///
+ /// Completing twice is not allowed.
+ ///
+ /// If [complete] is called with a future, it's still possible to
+ /// cancel the operation until that future completes,
+ /// so this value and [_isCanceled] are not mutually exclusive.
+ bool _isCompleted = false;
+
+ /// Set when [cancel] is called.
+ ///
+ /// Cancelling twice does nothing, nor does completing after cancelling.
+ bool _isCanceled = false;
+
+ /// Creates a new completer for a [CancelableOperation].
+ ///
+ /// The cancelable [operation] can be completed using
+ /// [complete] or [completeError].
+ ///
+ /// The [onCancel] function is called if the [operation] is canceled,
+ /// by calling [CancelableOperation.cancel]
+ /// before the operation has completed.
+ /// If [onCancel] returns a [Future],
+ /// that future is also returned by [CancelableOperation.cancel].
+ ///
+ /// The [onCancel] function will be called at most once.
+ CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel;
+
/// Whether the [complete] or [completeError] have been called.
///
/// Once this completer has been completed with either a result or error,
@@ -223,20 +243,16 @@
/// completed before it's [operation] is completed. In that case the
/// [operation] may still be canceled before the result is available.
bool get isCompleted => _isCompleted;
- bool _isCompleted = false;
/// Whether the completer was canceled before the result was ready.
bool get isCanceled => _isCanceled;
- bool _isCanceled = false;
- /// The memoizer for [_cancel].
- final _cancelMemo = AsyncMemoizer();
-
- /// Completes [operation] to [value].
+ /// Completes [operation] with [value].
///
- /// If [value] is a [Future] the [operation] will complete with the result of
- /// that `Future` once it is available.
- /// In that case [isComplete] will be true before the [operation] is complete.
+ /// If [value] is a [Future] the [operation] will complete
+ /// with the result of that `Future` once it is available.
+ /// In that case [isComplete] will be `true` before the [operation]
+ /// is complete.
///
/// If the type [T] is not nullable [value] may be not be omitted or `null`.
///
@@ -247,20 +263,19 @@
if (_isCompleted) throw StateError('Operation already completed');
_isCompleted = true;
- if (value is! Future) {
+ if (value is! Future<T>) {
if (_isCanceled) return;
_inner.complete(value);
return;
}
- final future = value as Future<T>;
if (_isCanceled) {
// Make sure errors from [value] aren't top-leveled.
- future.catchError((_) {});
+ value.ignore();
return;
}
- future.then((result) {
+ value.then((result) {
if (_isCanceled) return;
_inner.complete(result);
}, onError: (Object error, StackTrace stackTrace) {
@@ -282,24 +297,27 @@
_inner.completeError(error, stackTrace);
}
- /// Cancel the operation.
+ /// Cancels the operation.
///
- /// This call is be ignored if the result of the operation is already
- /// available.
+ /// If the operation has already completed, prior to being cancelled,
+ /// this method does nothing.
+ /// If the operation has already been cancelled, this method returns
+ /// the same result as the first call to `_cancel`.
+ ///
/// The result of the operation may only be available some time after
/// the completer has been completed (using [complete] or [completeError],
/// which sets [isCompleted] to true) if completed with a [Future].
/// The completer can be cancelled until the result becomes available,
/// even if [isCompleted] is true.
- ///
- /// This call is ignored if this completer has already been canceled.
- Future _cancel() {
- if (_inner.isCompleted) return Future.value();
+ Future<void> _cancel() {
+ if (_inner.isCompleted) return Future.value(null);
- return _cancelMemo.runOnce(() {
+ if (!_isCanceled) {
_isCanceled = true;
var onCancel = _onCancel;
- if (onCancel != null) return onCancel();
- });
+ _cancelCompleter
+ .complete(onCancel == null ? null : Future.sync(onCancel));
+ }
+ return _cancelCompleter.future;
}
}
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index e6779ee..e0facaa 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -5,7 +5,6 @@
import 'dart:async';
import 'stream_completer.dart';
-import 'utils.dart';
/// A [Stream] wrapper that forwards to another [Stream] that's initialized
/// lazily.
@@ -15,7 +14,7 @@
/// produce a `Stream`.
class LazyStream<T> extends Stream<T> {
/// The callback that's called to create the inner stream.
- FutureOrCallback<Stream<T>>? _callback;
+ FutureOr<Stream<T>> Function()? _callback;
/// Creates a single-subscription `Stream` that calls [callback] when it gets
/// a listener and forwards to the returned stream.
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
deleted file mode 100644
index 39e5a8a..0000000
--- a/lib/src/utils.dart
+++ /dev/null
@@ -1,12 +0,0 @@
-// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-
-/// A generic typedef for a function that takes one type and returns another.
-typedef UnaryFunction<E, F> = F Function(E argument);
-
-/// A typedef for a function that takes no arguments and returns a Future or a
-/// value.
-typedef FutureOrCallback<T> = FutureOr<T> Function();