blob: b89845d0557615f0a839ba7381caf2d3c32bb784 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async/async.dart';
import 'src/guarantee_channel.dart';
import 'src/stream_channel_transformer.dart';
export 'src/delegating_stream_channel.dart';
export 'src/disconnector.dart';
export 'src/isolate_channel.dart';
export 'src/json_document_transformer.dart';
export 'src/multi_channel.dart';
export 'src/stream_channel_completer.dart';
export 'src/stream_channel_controller.dart';
export 'src/stream_channel_transformer.dart';
/// An abstract class representing a two-way communication channel.
///
/// Users should consider the [stream] emitting a "done" event to be the
/// canonical indicator that the channel has closed. If they wish to close the
/// channel, they should close the [sink]—canceling the stream subscription is
/// not sufficient. Protocol errors may be emitted through the stream or through
/// [Sink.done], depending on their underlying cause. Note that the sink may
/// silently drop events if the channel closes before [Sink.close] is called.
///
/// Implementations are strongly encouraged to mix in or extend
/// [StreamChannelMixin] to get default implementations of the various instance
/// methods. Adding new methods to this interface will not be considered a
/// breaking change if implementations are also added to [StreamChannelMixin].
///
/// Implementations must provide the following guarantees:
///
/// * The stream is single-subscription, and must follow all the guarantees of
/// single-subscription streams.
///
/// * Closing the sink causes the stream to close before it emits any more
/// events.
///
/// * After the stream closes, the sink is automatically closed. If this
/// happens, sink methods should silently drop their arguments until
/// [Sink.close] is called.
///
/// * If the stream closes before it has a listener, the sink should silently
/// drop events if possible.
///
/// * Canceling the stream's subscription has no effect on the sink. The channel
/// must still be able to respond to the other endpoint closing the channel
/// even after the subscription has been canceled.
///
/// * The sink *either* forwards errors to the other endpoint *or* closes as
/// soon as an error is added and forwards that error to the [Sink.done]
/// future.
///
/// These guarantees allow users to interact uniformly with all implementations,
/// and ensure that either endpoint closing the stream produces consistent
/// behavior.
abstract class StreamChannel<T> {
/// The single-subscription stream that emits values from the other endpoint.
Stream<T> get stream;
/// The sink for sending values to the other endpoint.
StreamSink<T> get sink;
/// Creates a new [StreamChannel] that communicates over [stream] and [sink].
///
/// Note that this stream/sink pair must provide the guarantees listed in the
/// [StreamChannel] documentation. If they don't do so natively, [new
/// StreamChannel.withGuarantees] should be used instead.
factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
new _StreamChannel<T>(stream, sink);
/// Creates a new [StreamChannel] that communicates over [stream] and [sink].
///
/// Unlike [new StreamChannel], this enforces the guarantees listed in the
/// [StreamChannel] documentation. This makes it somewhat less efficient than
/// just wrapping a stream and a sink directly, so [new StreamChannel] should
/// be used when the guarantees are provided natively.
///
/// If [allowSinkErrors] is `false`, errors are not allowed to be passed to
/// [sink]. If any are, the connection will close and the error will be
/// forwarded to [Sink.done].
factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
{bool allowSinkErrors: true}) =>
new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
/// Connects [this] to [other], so that any values emitted by either are sent
/// directly to the other.
void pipe(StreamChannel<T> other);
/// Transforms [this] using [transformer].
///
/// This is identical to calling `transformer.bind(channel)`.
StreamChannel transform(StreamChannelTransformer<T, dynamic> transformer);
/// Transforms only the [stream] component of [this] using [transformer].
StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
/// Transforms only the [sink] component of [this] using [transformer].
StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
/// Returns a copy of [this] with [stream] replaced by [change]'s return
/// value.
StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream));
/// Returns a copy of [this] with [sink] replaced by [change]'s return
/// value.
StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink));
}
/// An implementation of [StreamChannel] that simply takes a stream and a sink
/// as parameters.
///
/// This is distinct from [StreamChannel] so that it can use
/// [StreamChannelMixin].
class _StreamChannel<T> extends StreamChannelMixin<T> {
final Stream<T> stream;
final StreamSink<T> sink;
_StreamChannel(this.stream, this.sink);
}
/// A mixin that implements the instance methods of [StreamChannel] in terms of
/// [stream] and [sink].
abstract class StreamChannelMixin<T> implements StreamChannel<T> {
void pipe(StreamChannel<T> other) {
stream.pipe(other.sink);
other.stream.pipe(sink);
}
StreamChannel transform(StreamChannelTransformer<T, dynamic> transformer) =>
transformer.bind(this);
StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
changeStream(transformer.bind);
StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
changeSink(transformer.bind);
StreamChannel<T> changeStream(Stream<T> change(Stream<T> stream)) =>
new StreamChannel(change(stream), sink);
StreamChannel<T> changeSink(StreamSink<T> change(StreamSink<T> sink)) =>
new StreamChannel(stream, change(sink));
}