blob: d15adcf94b43d95d47eacd50224caf6144515217 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async/async.dart';
import '../stream_channel.dart';
/// A [channel] where the source and destination are provided later.
///
/// The [channel] is a normal channel that can be listened to and that events
/// can be added to immediately, but until [setChannel] is called it won't emit
/// any events and all events added to it will be buffered.
class StreamChannelCompleter<T> {
/// The completer for this channel's stream.
final _streamCompleter = new StreamCompleter<T>();
/// The completer for this channel's sink.
final _sinkCompleter = new StreamSinkCompleter<T>();
/// The channel for this completer.
StreamChannel<T> get channel => _channel;
StreamChannel<T> _channel;
/// Whether [setChannel] has been called.
bool _set = false;
/// Convert a `Future<StreamChannel>` to a `StreamChannel`.
///
/// This creates a channel using a channel completer, and sets the source
/// channel to the result of the future when the future completes.
///
/// If the future completes with an error, the returned channel's stream will
/// instead contain just that error. The sink will silently discard all
/// events.
static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
var completer = new StreamChannelCompleter();
channelFuture.then(completer.setChannel, onError: completer.setError);
return completer.channel;
}
StreamChannelCompleter() {
_channel = new StreamChannel<T>(
_streamCompleter.stream, _sinkCompleter.sink);
}
/// Set a channel as the source and destination for [channel].
///
/// A channel may be set at most once.
///
/// Either [setChannel] or [setError] may be called at most once. Trying to
/// call either of them again will fail.
void setChannel(StreamChannel<T> channel) {
if (_set) throw new StateError("The channel has already been set.");
_set = true;
_streamCompleter.setSourceStream(channel.stream);
_sinkCompleter.setDestinationSink(channel.sink);
}
/// Indicates that there was an error connecting the channel.
///
/// This makes the stream emit [error] and close. It makes the sink discard
/// all its events.
///
/// Either [setChannel] or [setError] may be called at most once. Trying to
/// call either of them again will fail.
void setError(error, [StackTrace stackTrace]) {
if (_set) throw new StateError("The channel has already been set.");
_set = true;
_streamCompleter.setError(error, stackTrace);
_sinkCompleter.setDestinationSink(new NullStreamSink());
}
}