| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| |
| import 'package:async/async.dart'; |
| |
| import '../stream_channel.dart'; |
| |
| /// A [StreamChannel] that enforces the stream channel guarantees. |
| /// |
| /// This is exposed via [new StreamChannel.withGuarantees]. |
| class GuaranteeChannel<T> extends StreamChannelMixin<T> { |
| Stream<T> get stream => _streamController.stream; |
| |
| StreamSink<T> get sink => _sink; |
| _GuaranteeSink<T> _sink; |
| |
| /// The controller for [stream]. |
| /// |
| /// This intermediate controller allows us to continue listening for a done |
| /// event even after the user has canceled their subscription, and to send our |
| /// own done event when the sink is closed. |
| StreamController<T> _streamController; |
| |
| /// The subscription to the inner stream. |
| StreamSubscription<T> _subscription; |
| |
| /// Whether the sink has closed, causing the underlying channel to disconnect. |
| bool _disconnected = false; |
| |
| GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink, |
| {bool allowSinkErrors: true}) { |
| _sink = new _GuaranteeSink<T>(innerSink, this, |
| allowErrors: allowSinkErrors); |
| |
| // Enforce the single-subscription guarantee by changing a broadcast stream |
| // to single-subscription. |
| if (innerStream.isBroadcast) { |
| innerStream = innerStream.transform( |
| const SingleSubscriptionTransformer()); |
| } |
| |
| _streamController = new StreamController<T>(onListen: () { |
| // If the sink has disconnected, we've already called |
| // [_streamController.close]. |
| if (_disconnected) return; |
| |
| _subscription = innerStream.listen(_streamController.add, |
| onError: _streamController.addError, |
| onDone: () { |
| _sink._onStreamDisconnected(); |
| _streamController.close(); |
| }); |
| }, sync: true); |
| } |
| |
| /// Called by [_GuaranteeSink] when the user closes it. |
| /// |
| /// The sink closing indicates that the connection is closed, so the stream |
| /// should stop emitting events. |
| void _onSinkDisconnected() { |
| _disconnected = true; |
| if (_subscription != null) _subscription.cancel(); |
| _streamController.close(); |
| } |
| } |
| |
| /// The sink for [GuaranteeChannel]. |
| /// |
| /// This wraps the inner sink to ignore events and cancel any in-progress |
| /// [addStream] calls when the underlying channel closes. |
| class _GuaranteeSink<T> implements StreamSink<T> { |
| /// The inner sink being wrapped. |
| final StreamSink<T> _inner; |
| |
| /// The [GuaranteeChannel] this belongs to. |
| final GuaranteeChannel<T> _channel; |
| |
| Future get done => _doneCompleter.future; |
| final _doneCompleter = new Completer(); |
| |
| /// Whether connection is disconnected. |
| /// |
| /// This can happen because the stream has emitted a done event, or because |
| /// the user added an error when [_allowErrors] is `false`. |
| bool _disconnected = false; |
| |
| /// Whether the user has called [close]. |
| bool _closed = false; |
| |
| /// The subscription to the stream passed to [addStream], if a stream is |
| /// currently being added. |
| StreamSubscription<T> _addStreamSubscription; |
| |
| /// The completer for the future returned by [addStream], if a stream is |
| /// currently being added. |
| Completer _addStreamCompleter; |
| |
| /// Whether we're currently adding a stream with [addStream]. |
| bool get _inAddStream => _addStreamSubscription != null; |
| |
| /// Whether errors are passed on to the underlying sink. |
| /// |
| /// If this is `false`, any error passed to the sink is piped to [done] and |
| /// the underlying sink is closed. |
| final bool _allowErrors; |
| |
| _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true}) |
| : _allowErrors = allowErrors; |
| |
| void add(T data) { |
| if (_closed) throw new StateError("Cannot add event after closing."); |
| if (_inAddStream) { |
| throw new StateError("Cannot add event while adding stream."); |
| } |
| if (_disconnected) return; |
| |
| _inner.add(data); |
| } |
| |
| void addError(error, [StackTrace stackTrace]) { |
| if (_closed) throw new StateError("Cannot add event after closing."); |
| if (_inAddStream) { |
| throw new StateError("Cannot add event while adding stream."); |
| } |
| if (_disconnected) return; |
| |
| _addError(error, stackTrace); |
| } |
| |
| /// Like [addError], but doesn't check to ensure that an error can be added. |
| /// |
| /// This is called from [addStream], so it shouldn't fail if a stream is being |
| /// added. |
| void _addError(error, [StackTrace stackTrace]) { |
| if (_allowErrors) { |
| _inner.addError(error, stackTrace); |
| return; |
| } |
| |
| _doneCompleter.completeError(error, stackTrace); |
| |
| // Treat an error like both the stream and sink disconnecting. |
| _onStreamDisconnected(); |
| _channel._onSinkDisconnected(); |
| |
| // Ignore errors from the inner sink. We're already surfacing one error, and |
| // if the user handles it we don't want them to have another top-level. |
| _inner.close().catchError((_) {}); |
| } |
| |
| Future addStream(Stream<T> stream) { |
| if (_closed) throw new StateError("Cannot add stream after closing."); |
| if (_inAddStream) { |
| throw new StateError("Cannot add stream while adding stream."); |
| } |
| if (_disconnected) return new Future.value(); |
| |
| _addStreamCompleter = new Completer.sync(); |
| _addStreamSubscription = stream.listen( |
| _inner.add, |
| onError: _addError, |
| onDone: _addStreamCompleter.complete); |
| return _addStreamCompleter.future.then((_) { |
| _addStreamCompleter = null; |
| _addStreamSubscription = null; |
| }); |
| } |
| |
| Future close() { |
| if (_inAddStream) { |
| throw new StateError("Cannot close sink while adding stream."); |
| } |
| |
| if (_closed) return done; |
| _closed = true; |
| |
| if (!_disconnected) { |
| _channel._onSinkDisconnected(); |
| _doneCompleter.complete(_inner.close()); |
| } |
| |
| return done; |
| } |
| |
| /// Called by [GuaranteeChannel] when the stream emits a done event. |
| /// |
| /// The stream being done indicates that the connection is closed, so the |
| /// sink should stop forwarding events. |
| void _onStreamDisconnected() { |
| _disconnected = true; |
| if (!_doneCompleter.isCompleted) _doneCompleter.complete(); |
| |
| if (!_inAddStream) return; |
| _addStreamCompleter.complete(_addStreamSubscription.cancel()); |
| _addStreamCompleter = null; |
| _addStreamSubscription = null; |
| } |
| } |