| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| |
| import 'package:stream_channel/stream_channel.dart'; |
| import 'package:test/test.dart'; |
| |
| import 'utils.dart'; |
| |
| void main() { |
| var controller; |
| var channel1; |
| var channel2; |
| setUp(() { |
| controller = new StreamChannelController(); |
| channel1 = new MultiChannel(controller.local); |
| channel2 = new MultiChannel(controller.foreign); |
| }); |
| |
| group("the default virtual channel", () { |
| test("begins connected", () { |
| var first = true; |
| channel2.stream.listen(expectAsync((message) { |
| if (first) { |
| expect(message, equals("hello")); |
| first = false; |
| } else { |
| expect(message, equals("world")); |
| } |
| }, count: 2)); |
| |
| channel1.sink.add("hello"); |
| channel1.sink.add("world"); |
| }); |
| |
| test("closes the remote virtual channel when it closes", () { |
| expect(channel2.stream.toList(), completion(isEmpty)); |
| expect(channel2.sink.done, completes); |
| |
| channel1.sink.close(); |
| }); |
| |
| test("closes the local virtual channel when it closes", () { |
| expect(channel1.stream.toList(), completion(isEmpty)); |
| expect(channel1.sink.done, completes); |
| |
| channel1.sink.close(); |
| }); |
| |
| test("doesn't closes the local virtual channel when the stream " |
| "subscription is canceled", () { |
| channel1.sink.done.then(expectAsync((_) {}, count: 0)); |
| |
| channel1.stream.listen((_) {}).cancel(); |
| |
| // Ensure that there's enough time for the channel to close if it's going |
| // to. |
| return pumpEventQueue(); |
| }); |
| |
| test("closes the underlying channel when it closes without any other " |
| "virtual channels", () { |
| expect(controller.local.sink.done, completes); |
| expect(controller.foreign.sink.done, completes); |
| |
| channel1.sink.close(); |
| }); |
| |
| test("doesn't close the underlying channel when it closes with other " |
| "virtual channels", () { |
| controller.local.sink.done.then(expectAsync((_) {}, count: 0)); |
| controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); |
| |
| // Establish another virtual connection which should keep the underlying |
| // connection open. |
| channel2.virtualChannel(channel1.virtualChannel().id); |
| channel1.sink.close(); |
| |
| // Ensure that there's enough time for the underlying channel to complete |
| // if it's going to. |
| return pumpEventQueue(); |
| }); |
| }); |
| |
| group("a locally-created virtual channel", () { |
| var virtual1; |
| var virtual2; |
| setUp(() { |
| virtual1 = channel1.virtualChannel(); |
| virtual2 = channel2.virtualChannel(virtual1.id); |
| }); |
| |
| test("sends messages only to the other virtual channel", () { |
| var first = true; |
| virtual2.stream.listen(expectAsync((message) { |
| if (first) { |
| expect(message, equals("hello")); |
| first = false; |
| } else { |
| expect(message, equals("world")); |
| } |
| }, count: 2)); |
| |
| // No other virtual channels should receive the message. |
| for (var i = 0; i < 10; i++) { |
| var virtual = channel2.virtualChannel(channel1.virtualChannel().id); |
| virtual.stream.listen(expectAsync((_) {}, count: 0)); |
| } |
| channel2.stream.listen(expectAsync((_) {}, count: 0)); |
| |
| virtual1.sink.add("hello"); |
| virtual1.sink.add("world"); |
| }); |
| |
| test("closes the remote virtual channel when it closes", () { |
| expect(virtual2.stream.toList(), completion(isEmpty)); |
| expect(virtual2.sink.done, completes); |
| |
| virtual1.sink.close(); |
| }); |
| |
| test("closes the local virtual channel when it closes", () { |
| expect(virtual1.stream.toList(), completion(isEmpty)); |
| expect(virtual1.sink.done, completes); |
| |
| virtual1.sink.close(); |
| }); |
| |
| test("doesn't closes the local virtual channel when the stream " |
| "subscription is canceled", () { |
| virtual1.sink.done.then(expectAsync((_) {}, count: 0)); |
| virtual1.stream.listen((_) {}).cancel(); |
| |
| // Ensure that there's enough time for the channel to close if it's going |
| // to. |
| return pumpEventQueue(); |
| }); |
| |
| test("closes the underlying channel when it closes without any other " |
| "virtual channels", () async { |
| // First close the default channel so we can test the new channel as the |
| // last living virtual channel. |
| channel1.sink.close(); |
| |
| await channel2.stream.toList(); |
| expect(controller.local.sink.done, completes); |
| expect(controller.foreign.sink.done, completes); |
| |
| virtual1.sink.close(); |
| }); |
| |
| test("doesn't close the underlying channel when it closes with other " |
| "virtual channels", () { |
| controller.local.sink.done.then(expectAsync((_) {}, count: 0)); |
| controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); |
| |
| virtual1.sink.close(); |
| |
| // Ensure that there's enough time for the underlying channel to complete |
| // if it's going to. |
| return pumpEventQueue(); |
| }); |
| |
| test("doesn't conflict with a remote virtual channel", () { |
| var virtual3 = channel2.virtualChannel(); |
| var virtual4 = channel1.virtualChannel(virtual3.id); |
| |
| // This is an implementation detail, but we assert it here to make sure |
| // we're properly testing two channels with the same id. |
| expect(virtual1.id, equals(virtual3.id)); |
| |
| virtual2.stream.listen( |
| expectAsync((message) => expect(message, equals("hello")))); |
| virtual4.stream.listen( |
| expectAsync((message) => expect(message, equals("goodbye")))); |
| |
| virtual1.sink.add("hello"); |
| virtual3.sink.add("goodbye"); |
| }); |
| }); |
| |
| group("a remotely-created virtual channel", () { |
| var virtual1; |
| var virtual2; |
| setUp(() { |
| virtual1 = channel1.virtualChannel(); |
| virtual2 = channel2.virtualChannel(virtual1.id); |
| }); |
| |
| test("sends messages only to the other virtual channel", () { |
| var first = true; |
| virtual1.stream.listen(expectAsync((message) { |
| if (first) { |
| expect(message, equals("hello")); |
| first = false; |
| } else { |
| expect(message, equals("world")); |
| } |
| }, count: 2)); |
| |
| // No other virtual channels should receive the message. |
| for (var i = 0; i < 10; i++) { |
| var virtual = channel2.virtualChannel(channel1.virtualChannel().id); |
| virtual.stream.listen(expectAsync((_) {}, count: 0)); |
| } |
| channel1.stream.listen(expectAsync((_) {}, count: 0)); |
| |
| virtual2.sink.add("hello"); |
| virtual2.sink.add("world"); |
| }); |
| |
| test("closes the remote virtual channel when it closes", () { |
| expect(virtual1.stream.toList(), completion(isEmpty)); |
| expect(virtual1.sink.done, completes); |
| |
| virtual2.sink.close(); |
| }); |
| |
| test("closes the local virtual channel when it closes", () { |
| expect(virtual2.stream.toList(), completion(isEmpty)); |
| expect(virtual2.sink.done, completes); |
| |
| virtual2.sink.close(); |
| }); |
| |
| test("doesn't closes the local virtual channel when the stream " |
| "subscription is canceled", () { |
| virtual2.sink.done.then(expectAsync((_) {}, count: 0)); |
| virtual2.stream.listen((_) {}).cancel(); |
| |
| // Ensure that there's enough time for the channel to close if it's going |
| // to. |
| return pumpEventQueue(); |
| }); |
| |
| test("closes the underlying channel when it closes without any other " |
| "virtual channels", () async { |
| // First close the default channel so we can test the new channel as the |
| // last living virtual channel. |
| channel2.sink.close(); |
| |
| await channel1.stream.toList(); |
| expect(controller.local.sink.done, completes); |
| expect(controller.foreign.sink.done, completes); |
| |
| virtual2.sink.close(); |
| }); |
| |
| test("doesn't close the underlying channel when it closes with other " |
| "virtual channels", () { |
| controller.local.sink.done.then(expectAsync((_) {}, count: 0)); |
| controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); |
| |
| virtual2.sink.close(); |
| |
| // Ensure that there's enough time for the underlying channel to complete |
| // if it's going to. |
| return pumpEventQueue(); |
| }); |
| |
| test("doesn't allow another virtual channel with the same id", () { |
| expect(() => channel2.virtualChannel(virtual1.id), |
| throwsArgumentError); |
| }); |
| }); |
| |
| group("when the underlying stream", () { |
| var virtual1; |
| var virtual2; |
| setUp(() { |
| virtual1 = channel1.virtualChannel(); |
| virtual2 = channel2.virtualChannel(virtual1.id); |
| }); |
| |
| test("closes, all virtual channels close", () { |
| expect(channel1.stream.toList(), completion(isEmpty)); |
| expect(channel1.sink.done, completes); |
| expect(channel2.stream.toList(), completion(isEmpty)); |
| expect(channel2.sink.done, completes); |
| expect(virtual1.stream.toList(), completion(isEmpty)); |
| expect(virtual1.sink.done, completes); |
| expect(virtual2.stream.toList(), completion(isEmpty)); |
| expect(virtual2.sink.done, completes); |
| |
| controller.local.sink.close(); |
| }); |
| |
| test("closes, more virtual channels are created closed", () async { |
| channel2.sink.close(); |
| virtual2.sink.close(); |
| |
| // Wait for the existing channels to emit done events. |
| await channel1.stream.toList(); |
| await virtual1.stream.toList(); |
| |
| var virtual = channel1.virtualChannel(); |
| expect(virtual.stream.toList(), completion(isEmpty)); |
| expect(virtual.sink.done, completes); |
| |
| virtual = channel1.virtualChannel(); |
| expect(virtual.stream.toList(), completion(isEmpty)); |
| expect(virtual.sink.done, completes); |
| }); |
| |
| test("emits an error, the error is sent only to the default channel", () { |
| channel1.stream.listen(expectAsync((_) {}, count: 0), |
| onError: expectAsync((error) => expect(error, equals("oh no")))); |
| virtual1.stream.listen(expectAsync((_) {}, count: 0), |
| onError: expectAsync((_) {}, count: 0)); |
| |
| controller.foreign.sink.addError("oh no"); |
| }); |
| }); |
| |
| group("stream channel rules", () { |
| group("for the main stream:", () { |
| test("closing the sink causes the stream to close before it emits any more " |
| "events", () { |
| channel1.sink.add(1); |
| channel1.sink.add(2); |
| channel1.sink.add(3); |
| |
| channel2.stream.listen(expectAsync((message) { |
| expect(message, equals(1)); |
| channel2.sink.close(); |
| }, count: 1)); |
| }); |
| |
| test("after the stream closes, the sink ignores events", () async { |
| channel1.sink.close(); |
| |
| // Wait for the done event to be delivered. |
| await channel2.stream.toList(); |
| channel2.sink.add(1); |
| channel2.sink.add(2); |
| channel2.sink.add(3); |
| channel2.sink.close(); |
| |
| // None of our channel.sink additions should make it to the other endpoint. |
| channel1.stream.listen(expectAsync((_) {}, count: 0)); |
| await pumpEventQueue(); |
| }); |
| |
| test("canceling the stream's subscription has no effect on the sink", |
| () async { |
| channel1.stream.listen(null).cancel(); |
| await pumpEventQueue(); |
| |
| channel1.sink.add(1); |
| channel1.sink.add(2); |
| channel1.sink.add(3); |
| channel1.sink.close(); |
| expect(channel2.stream.toList(), completion(equals([1, 2, 3]))); |
| }); |
| |
| test("canceling the stream's subscription doesn't stop a done event", |
| () async { |
| channel1.stream.listen(null).cancel(); |
| await pumpEventQueue(); |
| |
| channel2.sink.close(); |
| await pumpEventQueue(); |
| |
| channel1.sink.add(1); |
| channel1.sink.add(2); |
| channel1.sink.add(3); |
| channel1.sink.close(); |
| |
| // The sink should be ignoring events because the channel closed. |
| channel2.stream.listen(expectAsync((_) {}, count: 0)); |
| await pumpEventQueue(); |
| }); |
| }); |
| |
| group("for a virtual channel:", () { |
| var virtual1; |
| var virtual2; |
| setUp(() { |
| virtual1 = channel1.virtualChannel(); |
| virtual2 = channel2.virtualChannel(virtual1.id); |
| }); |
| |
| test("closing the sink causes the stream to close before it emits any more " |
| "events", () { |
| virtual1.sink.add(1); |
| virtual1.sink.add(2); |
| virtual1.sink.add(3); |
| |
| virtual2.stream.listen(expectAsync((message) { |
| expect(message, equals(1)); |
| virtual2.sink.close(); |
| }, count: 1)); |
| }); |
| |
| test("after the stream closes, the sink ignores events", () async { |
| virtual1.sink.close(); |
| |
| // Wait for the done event to be delivered. |
| await virtual2.stream.toList(); |
| virtual2.sink.add(1); |
| virtual2.sink.add(2); |
| virtual2.sink.add(3); |
| virtual2.sink.close(); |
| |
| // None of our virtual.sink additions should make it to the other endpoint. |
| virtual1.stream.listen(expectAsync((_) {}, count: 0)); |
| await pumpEventQueue(); |
| }); |
| |
| test("canceling the stream's subscription has no effect on the sink", |
| () async { |
| virtual1.stream.listen(null).cancel(); |
| await pumpEventQueue(); |
| |
| virtual1.sink.add(1); |
| virtual1.sink.add(2); |
| virtual1.sink.add(3); |
| virtual1.sink.close(); |
| expect(virtual2.stream.toList(), completion(equals([1, 2, 3]))); |
| }); |
| |
| test("canceling the stream's subscription doesn't stop a done event", |
| () async { |
| virtual1.stream.listen(null).cancel(); |
| await pumpEventQueue(); |
| |
| virtual2.sink.close(); |
| await pumpEventQueue(); |
| |
| virtual1.sink.add(1); |
| virtual1.sink.add(2); |
| virtual1.sink.add(3); |
| virtual1.sink.close(); |
| |
| // The sink should be ignoring events because the stream closed. |
| virtual2.stream.listen(expectAsync((_) {}, count: 0)); |
| await pumpEventQueue(); |
| }); |
| }); |
| }); |
| } |