Add IsolateChannel.connect* constructors.
These constructors use a lightweight protocol to establish a two-way
connection over a previously-one-way connection.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1638183002 .
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
index 46375c9..a466d87 100644
--- a/lib/src/isolate_channel.dart
+++ b/lib/src/isolate_channel.dart
@@ -5,7 +5,11 @@
import 'dart:async';
import 'dart:isolate';
+import 'package:async/async.dart';
+import 'package:stack_trace/stack_trace.dart';
+
import '../stream_channel.dart';
+import 'isolate_channel/send_port_sink.dart';
/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
/// presumably with another isolate.
@@ -22,125 +26,71 @@
/// ensure that they always close the [sink] of every [IsolateChannel] they use
/// to avoid leaving dangling [ReceivePort]s.
class IsolateChannel<T> extends StreamChannelMixin<T> {
- /// The port that produces incoming messages.
+ final Stream<T> stream;
+ final StreamSink<T> sink;
+
+ /// Connects to a remote channel that was created with
+ /// [IsolateChannel.connectSend].
///
- /// This is wrapped in a [StreamView] to produce [stream].
- final ReceivePort _receivePort;
+ /// These constructors establish a connection using only a single
+ /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+ /// connect constructors.
+ ///
+ /// The connection protocol is guaranteed to remain compatible across versions
+ /// at least until the next major version release. If the protocol is
+ /// violated, the resulting channel will emit a single value on its stream and
+ /// then close.
+ factory IsolateChannel.connectReceive(ReceivePort receivePort) {
+ // We can't use a [StreamChannelCompleter] here because we need the return
+ // value to be an [IsolateChannel].
+ var streamCompleter = new StreamCompleter<T>();
+ var sinkCompleter = new StreamSinkCompleter<T>();
+ var channel = new IsolateChannel._(
+ streamCompleter.stream, sinkCompleter.sink);
- /// The port that sends outgoing messages.
- final SendPort _sendPort;
+ // The first message across the ReceivePort should be a SendPort pointing to
+ // the remote end. If it's not, we'll make the stream emit an error
+ // complaining.
+ var subscription;
+ subscription = receivePort.listen((message) {
+ if (message is SendPort) {
+ streamCompleter.setSourceStream(
+ new SubscriptionStream<T>(subscription));
+ sinkCompleter.setDestinationSink(
+ new SendPortSink<T>(receivePort, message));
+ return;
+ }
- Stream<T> get stream => _stream;
- final Stream<T> _stream;
+ streamCompleter.setError(
+ new StateError('Unexpected Isolate response "$message".'),
+ new Trace.current());
+ sinkCompleter.setDestinationSink(new NullStreamSink<T>());
+ subscription.cancel();
+ });
- StreamSink<T> get sink => _sink;
- _SendPortSink<T> _sink;
+ return channel;
+ }
+
+ /// Connects to a remote channel that was created with
+ /// [IsolateChannel.connectReceive].
+ ///
+ /// These constructors establish a connection using only a single
+ /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+ /// connect constructors.
+ ///
+ /// The connection protocol is guaranteed to remain compatible across versions
+ /// at least until the next major version release.
+ factory IsolateChannel.connectSend(SendPort sendPort) {
+ var receivePort = new ReceivePort();
+ sendPort.send(receivePort.sendPort);
+ return new IsolateChannel(receivePort, sendPort);
+ }
/// Creates a stream channel that receives messages from [receivePort] and
/// sends them over [sendPort].
- IsolateChannel(ReceivePort receivePort, this._sendPort)
- : _receivePort = receivePort,
- _stream = new StreamView<T>(receivePort) {
- _sink = new _SendPortSink<T>(this);
- }
-}
+ IsolateChannel(ReceivePort receivePort, SendPort sendPort)
+ : stream = new StreamView<T>(receivePort),
+ sink = new SendPortSink<T>(receivePort, sendPort);
-/// The sink for [IsolateChannel].
-///
-/// [SendPort] doesn't natively implement any sink API, so this adds that API as
-/// a wrapper. Closing this just closes the [ReceivePort].
-class _SendPortSink<T> implements StreamSink<T> {
- /// The channel that this sink is for.
- final IsolateChannel _channel;
-
- Future get done => _doneCompleter.future;
- final _doneCompleter = new Completer();
-
- /// Whether [done] has been completed.
- ///
- /// This is distinct from [_closed] because [done] can complete with an error
- /// without the user explicitly calling [close].
- bool get _isDone => _doneCompleter.isCompleted;
-
- /// Whether the user has called [close].
- bool _closed = false;
-
- /// Whether we're currently adding a stream with [addStream].
- bool _inAddStream = false;
-
- _SendPortSink(this._channel);
-
- void add(T data) {
- if (_closed) throw new StateError("Cannot add event after closing.");
- if (_inAddStream) {
- throw new StateError("Cannot add event while adding stream.");
- }
- if (_isDone) return;
-
- _add(data);
- }
-
- /// A helper for [add] that doesn't check for [StateError]s.
- ///
- /// This is called from [addStream], so it shouldn't check [_inAddStream].
- void _add(T data) {
- _channel._sendPort.send(data);
- }
-
- void addError(error, [StackTrace stackTrace]) {
- if (_closed) throw new StateError("Cannot add event after closing.");
- if (_inAddStream) {
- throw new StateError("Cannot add event while adding stream.");
- }
-
- _close(error, stackTrace);
- }
-
- Future close() {
- if (_inAddStream) {
- throw new StateError("Cannot close sink while adding stream.");
- }
-
- _closed = true;
- return _close();
- }
-
- /// A helper for [close] that doesn't check for [StateError]s.
- ///
- /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
- /// also forwards [error] and [stackTrace] to [done] if they're passed.
- Future _close([error, StackTrace stackTrace]) {
- if (_isDone) return done;
-
- _channel._receivePort.close();
-
- if (error != null) {
- _doneCompleter.completeError(error, stackTrace);
- } else {
- _doneCompleter.complete();
- }
-
- return done;
- }
-
- Future addStream(Stream<T> stream) {
- if (_closed) throw new StateError("Cannot add stream after closing.");
- if (_inAddStream) {
- throw new StateError("Cannot add stream while adding stream.");
- }
- if (_isDone) return new Future.value();
-
- _inAddStream = true;
- var completer = new Completer.sync();
- stream.listen(_add,
- onError: (error, stackTrace) {
- _close(error, stackTrace);
- completer.complete();
- },
- onDone: completer.complete,
- cancelOnError: true);
- return completer.future.then((_) {
- _inAddStream = false;
- });
- }
+ IsolateChannel._(this.stream, this.sink);
}
diff --git a/lib/src/isolate_channel/send_port_sink.dart b/lib/src/isolate_channel/send_port_sink.dart
new file mode 100644
index 0000000..d98f1da
--- /dev/null
+++ b/lib/src/isolate_channel/send_port_sink.dart
@@ -0,0 +1,111 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+/// The sink for [IsolateChannel].
+///
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as
+/// a wrapper. Closing this just closes the [ReceivePort].
+class SendPortSink<T> implements StreamSink<T> {
+ /// The port that produces incoming messages.
+ ///
+ /// This is wrapped in a [StreamView] to produce [stream].
+ final ReceivePort _receivePort;
+
+ /// The port that sends outgoing messages.
+ final SendPort _sendPort;
+
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
+
+ /// Whether [done] has been completed.
+ ///
+ /// This is distinct from [_closed] because [done] can complete with an error
+ /// without the user explicitly calling [close].
+ bool get _isDone => _doneCompleter.isCompleted;
+
+ /// Whether the user has called [close].
+ bool _closed = false;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool _inAddStream = false;
+
+ SendPortSink(this._receivePort, this._sendPort);
+
+ void add(T data) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+ if (_isDone) return;
+
+ _add(data);
+ }
+
+ /// A helper for [add] that doesn't check for [StateError]s.
+ ///
+ /// This is called from [addStream], so it shouldn't check [_inAddStream].
+ void _add(T data) {
+ _sendPort.send(data);
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+
+ _close(error, stackTrace);
+ }
+
+ Future close() {
+ if (_inAddStream) {
+ throw new StateError("Cannot close sink while adding stream.");
+ }
+
+ _closed = true;
+ return _close();
+ }
+
+ /// A helper for [close] that doesn't check for [StateError]s.
+ ///
+ /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
+ /// also forwards [error] and [stackTrace] to [done] if they're passed.
+ Future _close([error, StackTrace stackTrace]) {
+ if (_isDone) return done;
+
+ _receivePort.close();
+
+ if (error != null) {
+ _doneCompleter.completeError(error, stackTrace);
+ } else {
+ _doneCompleter.complete();
+ }
+
+ return done;
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_closed) throw new StateError("Cannot add stream after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add stream while adding stream.");
+ }
+ if (_isDone) return new Future.value();
+
+ _inAddStream = true;
+ var completer = new Completer.sync();
+ stream.listen(_add,
+ onError: (error, stackTrace) {
+ _close(error, stackTrace);
+ completer.complete();
+ },
+ onDone: completer.complete,
+ cancelOnError: true);
+ return completer.future.then((_) {
+ _inAddStream = false;
+ });
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 97057ab..a8e26cd 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -7,5 +7,6 @@
sdk: '>=1.8.0 <2.0.0'
dependencies:
async: '^1.8.0'
+ stack_trace: '^1.0.0'
dev_dependencies:
test: '^0.12.0'
diff --git a/test/isolate_channel_test.dart b/test/isolate_channel_test.dart
index 9e4fddc..fa4d8d5 100644
--- a/test/isolate_channel_test.dart
+++ b/test/isolate_channel_test.dart
@@ -123,4 +123,39 @@
channel.sink.add(1);
});
});
+
+ group("connect constructors", () {
+ var connectPort;
+ setUp(() {
+ connectPort = new ReceivePort();
+ });
+
+ tearDown(() {
+ connectPort.close();
+ });
+
+ test("create a connected pair of channels", () {
+ var channel1 = new IsolateChannel.connectReceive(connectPort);
+ var channel2 = new IsolateChannel.connectSend(connectPort.sendPort);
+
+ channel1.sink.add(1);
+ channel1.sink.add(2);
+ channel1.sink.add(3);
+ expect(channel2.stream.take(3).toList(), completion(equals([1, 2, 3])));
+
+ channel2.sink.add(4);
+ channel2.sink.add(5);
+ channel2.sink.add(6);
+ expect(channel1.stream.take(3).toList(), completion(equals([4, 5, 6])));
+ });
+
+ test("the receiving channel produces an error if it gets the wrong message",
+ () {
+ var connectedChannel = new IsolateChannel.connectReceive(connectPort);
+ connectPort.sendPort.send("wrong value");
+
+ expect(connectedChannel.stream.toList(), throwsStateError);
+ expect(connectedChannel.sink.done, completes);
+ });
+ });
}