blob: ee6f8d2a1e11d8a40aa950aa1cbb1027667e6422 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
void main() {
late StreamChannelController controller;
late MultiChannel channel1;
late MultiChannel channel2;
setUp(() {
controller = StreamChannelController();
channel1 = MultiChannel<int>(controller.local);
channel2 = MultiChannel<int>(controller.foreign);
});
group('the default virtual channel', () {
test('begins connected', () {
var first = true;
channel2.stream.listen(expectAsync1((message) {
if (first) {
expect(message, equals(1));
first = false;
} else {
expect(message, equals(2));
}
}, count: 2));
channel1.sink.add(1);
channel1.sink.add(2);
});
test('closes the remote virtual channel when it closes', () {
expect(channel2.stream.toList(), completion(isEmpty));
expect(channel2.sink.done, completes);
channel1.sink.close();
});
test('closes the local virtual channel when it closes', () {
expect(channel1.stream.toList(), completion(isEmpty));
expect(channel1.sink.done, completes);
channel1.sink.close();
});
test(
"doesn't closes the local virtual channel when the stream "
'subscription is canceled', () {
channel1.sink.done.then(expectAsync1((_) {}, count: 0));
channel1.stream.listen((_) {}).cancel();
// Ensure that there's enough time for the channel to close if it's going
// to.
return pumpEventQueue();
});
test(
'closes the underlying channel when it closes without any other '
'virtual channels', () {
expect(controller.local.sink.done, completes);
expect(controller.foreign.sink.done, completes);
channel1.sink.close();
});
test(
"doesn't close the underlying channel when it closes with other "
'virtual channels', () {
controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
// Establish another virtual connection which should keep the underlying
// connection open.
channel2.virtualChannel(channel1.virtualChannel().id);
channel1.sink.close();
// Ensure that there's enough time for the underlying channel to complete
// if it's going to.
return pumpEventQueue();
});
});
group('a locally-created virtual channel', () {
late VirtualChannel virtual1;
late VirtualChannel virtual2;
setUp(() {
virtual1 = channel1.virtualChannel();
virtual2 = channel2.virtualChannel(virtual1.id);
});
test('sends messages only to the other virtual channel', () {
var first = true;
virtual2.stream.listen(expectAsync1((message) {
if (first) {
expect(message, equals(1));
first = false;
} else {
expect(message, equals(2));
}
}, count: 2));
// No other virtual channels should receive the message.
for (var i = 0; i < 10; i++) {
var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
virtual.stream.listen(expectAsync1((_) {}, count: 0));
}
channel2.stream.listen(expectAsync1((_) {}, count: 0));
virtual1.sink.add(1);
virtual1.sink.add(2);
});
test('closes the remote virtual channel when it closes', () {
expect(virtual2.stream.toList(), completion(isEmpty));
expect(virtual2.sink.done, completes);
virtual1.sink.close();
});
test('closes the local virtual channel when it closes', () {
expect(virtual1.stream.toList(), completion(isEmpty));
expect(virtual1.sink.done, completes);
virtual1.sink.close();
});
test(
"doesn't closes the local virtual channel when the stream "
'subscription is canceled', () {
virtual1.sink.done.then(expectAsync1((_) {}, count: 0));
virtual1.stream.listen((_) {}).cancel();
// Ensure that there's enough time for the channel to close if it's going
// to.
return pumpEventQueue();
});
test(
'closes the underlying channel when it closes without any other '
'virtual channels', () async {
// First close the default channel so we can test the new channel as the
// last living virtual channel.
unawaited(channel1.sink.close());
await channel2.stream.toList();
expect(controller.local.sink.done, completes);
expect(controller.foreign.sink.done, completes);
unawaited(virtual1.sink.close());
});
test(
"doesn't close the underlying channel when it closes with other "
'virtual channels', () {
controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
virtual1.sink.close();
// Ensure that there's enough time for the underlying channel to complete
// if it's going to.
return pumpEventQueue();
});
test("doesn't conflict with a remote virtual channel", () {
var virtual3 = channel2.virtualChannel();
var virtual4 = channel1.virtualChannel(virtual3.id);
// This is an implementation detail, but we assert it here to make sure
// we're properly testing two channels with the same id.
expect(virtual1.id, equals(virtual3.id));
virtual2.stream
.listen(expectAsync1((message) => expect(message, equals(1))));
virtual4.stream
.listen(expectAsync1((message) => expect(message, equals(2))));
virtual1.sink.add(1);
virtual3.sink.add(2);
});
});
group('a remotely-created virtual channel', () {
late VirtualChannel virtual1;
late VirtualChannel virtual2;
setUp(() {
virtual1 = channel1.virtualChannel();
virtual2 = channel2.virtualChannel(virtual1.id);
});
test('sends messages only to the other virtual channel', () {
var first = true;
virtual1.stream.listen(expectAsync1((message) {
if (first) {
expect(message, equals(1));
first = false;
} else {
expect(message, equals(2));
}
}, count: 2));
// No other virtual channels should receive the message.
for (var i = 0; i < 10; i++) {
var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
virtual.stream.listen(expectAsync1((_) {}, count: 0));
}
channel1.stream.listen(expectAsync1((_) {}, count: 0));
virtual2.sink.add(1);
virtual2.sink.add(2);
});
test('closes the remote virtual channel when it closes', () {
expect(virtual1.stream.toList(), completion(isEmpty));
expect(virtual1.sink.done, completes);
virtual2.sink.close();
});
test('closes the local virtual channel when it closes', () {
expect(virtual2.stream.toList(), completion(isEmpty));
expect(virtual2.sink.done, completes);
virtual2.sink.close();
});
test(
"doesn't closes the local virtual channel when the stream "
'subscription is canceled', () {
virtual2.sink.done.then(expectAsync1((_) {}, count: 0));
virtual2.stream.listen((_) {}).cancel();
// Ensure that there's enough time for the channel to close if it's going
// to.
return pumpEventQueue();
});
test(
'closes the underlying channel when it closes without any other '
'virtual channels', () async {
// First close the default channel so we can test the new channel as the
// last living virtual channel.
unawaited(channel2.sink.close());
await channel1.stream.toList();
expect(controller.local.sink.done, completes);
expect(controller.foreign.sink.done, completes);
unawaited(virtual2.sink.close());
});
test(
"doesn't close the underlying channel when it closes with other "
'virtual channels', () {
controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));
virtual2.sink.close();
// Ensure that there's enough time for the underlying channel to complete
// if it's going to.
return pumpEventQueue();
});
test("doesn't allow another virtual channel with the same id", () {
expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError);
});
test('dispatches events received before the virtual channel is created',
() async {
virtual1 = channel1.virtualChannel();
virtual1.sink.add(1);
await pumpEventQueue();
virtual1.sink.add(2);
await pumpEventQueue();
expect(channel2.virtualChannel(virtual1.id).stream, emitsInOrder([1, 2]));
});
test(
'dispatches close events received before the virtual channel is '
'created', () async {
virtual1 = channel1.virtualChannel();
unawaited(virtual1.sink.close());
await pumpEventQueue();
expect(channel2.virtualChannel(virtual1.id).stream.toList(),
completion(isEmpty));
});
});
group('when the underlying stream', () {
late VirtualChannel virtual1;
late VirtualChannel virtual2;
setUp(() {
virtual1 = channel1.virtualChannel();
virtual2 = channel2.virtualChannel(virtual1.id);
});
test('closes, all virtual channels close', () {
expect(channel1.stream.toList(), completion(isEmpty));
expect(channel1.sink.done, completes);
expect(channel2.stream.toList(), completion(isEmpty));
expect(channel2.sink.done, completes);
expect(virtual1.stream.toList(), completion(isEmpty));
expect(virtual1.sink.done, completes);
expect(virtual2.stream.toList(), completion(isEmpty));
expect(virtual2.sink.done, completes);
controller.local.sink.close();
});
test('closes, more virtual channels are created closed', () async {
unawaited(channel2.sink.close());
unawaited(virtual2.sink.close());
// Wait for the existing channels to emit done events.
await channel1.stream.toList();
await virtual1.stream.toList();
var virtual = channel1.virtualChannel();
expect(virtual.stream.toList(), completion(isEmpty));
expect(virtual.sink.done, completes);
virtual = channel1.virtualChannel();
expect(virtual.stream.toList(), completion(isEmpty));
expect(virtual.sink.done, completes);
});
test('emits an error, the error is sent only to the default channel', () {
channel1.stream.listen(expectAsync1((_) {}, count: 0),
onError: expectAsync1((error) => expect(error, equals('oh no'))));
virtual1.stream.listen(expectAsync1((_) {}, count: 0),
onError: expectAsync1((_) {}, count: 0));
controller.foreign.sink.addError('oh no');
});
});
group('stream channel rules', () {
group('for the main stream:', () {
test(
'closing the sink causes the stream to close before it emits any '
'more events', () {
channel1.sink.add(1);
channel1.sink.add(2);
channel1.sink.add(3);
channel2.stream.listen(expectAsync1((message) {
expect(message, equals(1));
channel2.sink.close();
}, count: 1));
});
test('after the stream closes, the sink ignores events', () async {
unawaited(channel1.sink.close());
// Wait for the done event to be delivered.
await channel2.stream.toList();
channel2.sink.add(1);
channel2.sink.add(2);
channel2.sink.add(3);
unawaited(channel2.sink.close());
// None of our channel.sink additions should make it to the other
// endpoint.
channel1.stream.listen(expectAsync1((_) {}, count: 0));
await pumpEventQueue();
});
test("canceling the stream's subscription has no effect on the sink",
() async {
unawaited(channel1.stream.listen(null).cancel());
await pumpEventQueue();
channel1.sink.add(1);
channel1.sink.add(2);
channel1.sink.add(3);
unawaited(channel1.sink.close());
expect(channel2.stream.toList(), completion(equals([1, 2, 3])));
});
test("canceling the stream's subscription doesn't stop a done event",
() async {
unawaited(channel1.stream.listen(null).cancel());
await pumpEventQueue();
unawaited(channel2.sink.close());
await pumpEventQueue();
channel1.sink.add(1);
channel1.sink.add(2);
channel1.sink.add(3);
unawaited(channel1.sink.close());
// The sink should be ignoring events because the channel closed.
channel2.stream.listen(expectAsync1((_) {}, count: 0));
await pumpEventQueue();
});
});
group('for a virtual channel:', () {
late VirtualChannel virtual1;
late VirtualChannel virtual2;
setUp(() {
virtual1 = channel1.virtualChannel();
virtual2 = channel2.virtualChannel(virtual1.id);
});
test(
'closing the sink causes the stream to close before it emits any '
'more events', () {
virtual1.sink.add(1);
virtual1.sink.add(2);
virtual1.sink.add(3);
virtual2.stream.listen(expectAsync1((message) {
expect(message, equals(1));
virtual2.sink.close();
}, count: 1));
});
test('after the stream closes, the sink ignores events', () async {
unawaited(virtual1.sink.close());
// Wait for the done event to be delivered.
await virtual2.stream.toList();
virtual2.sink.add(1);
virtual2.sink.add(2);
virtual2.sink.add(3);
unawaited(virtual2.sink.close());
// None of our virtual.sink additions should make it to the other
// endpoint.
virtual1.stream.listen(expectAsync1((_) {}, count: 0));
await pumpEventQueue();
});
test("canceling the stream's subscription has no effect on the sink",
() async {
unawaited(virtual1.stream.listen(null).cancel());
await pumpEventQueue();
virtual1.sink.add(1);
virtual1.sink.add(2);
virtual1.sink.add(3);
unawaited(virtual1.sink.close());
expect(virtual2.stream.toList(), completion(equals([1, 2, 3])));
});
test("canceling the stream's subscription doesn't stop a done event",
() async {
unawaited(virtual1.stream.listen(null).cancel());
await pumpEventQueue();
unawaited(virtual2.sink.close());
await pumpEventQueue();
virtual1.sink.add(1);
virtual1.sink.add(2);
virtual1.sink.add(3);
unawaited(virtual1.sink.close());
// The sink should be ignoring events because the stream closed.
virtual2.stream.listen(expectAsync1((_) {}, count: 0));
await pumpEventQueue();
});
});
});
}