| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| |
| import 'package:async/async.dart'; |
| |
| import '../stream_channel.dart'; |
| |
| /// A [channel] where the source and destination are provided later. |
| /// |
| /// The [channel] is a normal channel that can be listened to and that events |
| /// can be added to immediately, but until [setChannel] is called it won't emit |
| /// any events and all events added to it will be buffered. |
| class StreamChannelCompleter<T> { |
| /// The completer for this channel's stream. |
| final _streamCompleter = new StreamCompleter<T>(); |
| |
| /// The completer for this channel's sink. |
| final _sinkCompleter = new StreamSinkCompleter<T>(); |
| |
| /// The channel for this completer. |
| StreamChannel<T> get channel => _channel; |
| StreamChannel<T> _channel; |
| |
| /// Whether [setChannel] has been called. |
| bool _set = false; |
| |
| /// Convert a `Future<StreamChannel>` to a `StreamChannel`. |
| /// |
| /// This creates a channel using a channel completer, and sets the source |
| /// channel to the result of the future when the future completes. |
| /// |
| /// If the future completes with an error, the returned channel's stream will |
| /// instead contain just that error. The sink will silently discard all |
| /// events. |
| static StreamChannel fromFuture(Future<StreamChannel> channelFuture) { |
| var completer = new StreamChannelCompleter(); |
| channelFuture.then(completer.setChannel, onError: completer.setError); |
| return completer.channel; |
| } |
| |
| StreamChannelCompleter() { |
| _channel = new StreamChannel<T>( |
| _streamCompleter.stream, _sinkCompleter.sink); |
| } |
| |
| /// Set a channel as the source and destination for [channel]. |
| /// |
| /// A channel may be set at most once. |
| /// |
| /// Either [setChannel] or [setError] may be called at most once. Trying to |
| /// call either of them again will fail. |
| void setChannel(StreamChannel<T> channel) { |
| if (_set) throw new StateError("The channel has already been set."); |
| _set = true; |
| |
| _streamCompleter.setSourceStream(channel.stream); |
| _sinkCompleter.setDestinationSink(channel.sink); |
| } |
| |
| /// Indicates that there was an error connecting the channel. |
| /// |
| /// This makes the stream emit [error] and close. It makes the sink discard |
| /// all its events. |
| /// |
| /// Either [setChannel] or [setError] may be called at most once. Trying to |
| /// call either of them again will fail. |
| void setError(error, [StackTrace stackTrace]) { |
| if (_set) throw new StateError("The channel has already been set."); |
| _set = true; |
| |
| _streamCompleter.setError(error, stackTrace); |
| _sinkCompleter.setDestinationSink(new NullStreamSink()); |
| } |
| } |