| // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| |
| import 'package:async/async.dart'; |
| import 'package:test/test.dart'; |
| |
| import 'utils.dart'; |
| |
| void main() { |
| var controller; |
| var stream; |
| setUp(() { |
| var cancelFuture = new Future.value(42); |
| controller = new StreamController<int>(onCancel: () => cancelFuture); |
| stream = new ForkableStream<int>(controller.stream); |
| }); |
| |
| group("with no forks", () { |
| test("forwards events, errors, and close", () async { |
| var queue = new StreamQueue(stream); |
| |
| controller.add(1); |
| expect(await queue.next, equals(1)); |
| |
| controller.add(2); |
| expect(await queue.next, equals(2)); |
| |
| controller.addError("error"); |
| expect(queue.next, throwsA("error")); |
| await flushMicrotasks(); |
| |
| controller.add(3); |
| expect(await queue.next, equals(3)); |
| |
| controller.close(); |
| expect(await queue.hasNext, isFalse); |
| }); |
| |
| test("listens to, pauses, and cancels the controller", () { |
| expect(controller.hasListener, isFalse); |
| |
| var sub = stream.listen(null); |
| expect(controller.hasListener, isTrue); |
| |
| sub.pause(); |
| expect(controller.isPaused, isTrue); |
| |
| sub.resume(); |
| expect(controller.isPaused, isFalse); |
| |
| sub.cancel(); |
| expect(controller.hasListener, isFalse); |
| }); |
| |
| test("unpauses the controller when a fork is listened", () { |
| stream.listen(null).pause(); |
| expect(controller.isPaused, isTrue); |
| |
| var fork = stream.fork(); |
| expect(controller.isPaused, isTrue); |
| |
| fork.listen(null); |
| expect(controller.isPaused, isFalse); |
| }); |
| }); |
| |
| group("with a fork created before the stream was listened", () { |
| var fork; |
| setUp(() { |
| fork = stream.fork(); |
| }); |
| |
| test("forwards events, errors, and close to both branches", () async { |
| var queue = new StreamQueue(stream); |
| var forkQueue = new StreamQueue(fork); |
| |
| controller.add(1); |
| expect(await queue.next, equals(1)); |
| expect(await forkQueue.next, equals(1)); |
| |
| controller.add(2); |
| expect(await queue.next, equals(2)); |
| expect(await forkQueue.next, equals(2)); |
| |
| controller.addError("error"); |
| expect(queue.next, throwsA("error")); |
| expect(forkQueue.next, throwsA("error")); |
| await flushMicrotasks(); |
| |
| controller.add(3); |
| expect(await queue.next, equals(3)); |
| expect(await forkQueue.next, equals(3)); |
| |
| controller.close(); |
| expect(await queue.hasNext, isFalse); |
| expect(await forkQueue.hasNext, isFalse); |
| }); |
| |
| test('listens to the source when the original is listened', () { |
| expect(controller.hasListener, isFalse); |
| stream.listen(null); |
| expect(controller.hasListener, isTrue); |
| }); |
| |
| test('listens to the source when the fork is listened', () { |
| expect(controller.hasListener, isFalse); |
| fork.listen(null); |
| expect(controller.hasListener, isTrue); |
| }); |
| }); |
| |
| test("with a fork created after the stream emitted a few events, forwards " |
| "future events, errors, and close to both branches", () async { |
| var queue = new StreamQueue(stream); |
| |
| controller.add(1); |
| expect(await queue.next, equals(1)); |
| |
| controller.add(2); |
| expect(await queue.next, equals(2)); |
| |
| var fork = stream.fork(); |
| var forkQueue = new StreamQueue(fork); |
| |
| controller.add(3); |
| expect(await queue.next, equals(3)); |
| expect(await forkQueue.next, equals(3)); |
| |
| controller.addError("error"); |
| expect(queue.next, throwsA("error")); |
| expect(forkQueue.next, throwsA("error")); |
| await flushMicrotasks(); |
| |
| controller.close(); |
| expect(await queue.hasNext, isFalse); |
| expect(await forkQueue.hasNext, isFalse); |
| }); |
| |
| group("with multiple forks", () { |
| var fork1; |
| var fork2; |
| var fork3; |
| var fork4; |
| setUp(() { |
| fork1 = stream.fork(); |
| fork2 = stream.fork(); |
| fork3 = stream.fork(); |
| fork4 = stream.fork(); |
| }); |
| |
| test("forwards events, errors, and close to all branches", () async { |
| var queue1 = new StreamQueue(stream); |
| var queue2 = new StreamQueue(fork1); |
| var queue3 = new StreamQueue(fork2); |
| var queue4 = new StreamQueue(fork3); |
| var queue5 = new StreamQueue(fork4); |
| |
| controller.add(1); |
| expect(await queue1.next, equals(1)); |
| expect(await queue2.next, equals(1)); |
| expect(await queue3.next, equals(1)); |
| expect(await queue4.next, equals(1)); |
| expect(await queue5.next, equals(1)); |
| |
| controller.add(2); |
| expect(await queue1.next, equals(2)); |
| expect(await queue2.next, equals(2)); |
| expect(await queue3.next, equals(2)); |
| expect(await queue4.next, equals(2)); |
| expect(await queue5.next, equals(2)); |
| |
| controller.addError("error"); |
| expect(queue1.next, throwsA("error")); |
| expect(queue2.next, throwsA("error")); |
| expect(queue3.next, throwsA("error")); |
| expect(queue4.next, throwsA("error")); |
| expect(queue5.next, throwsA("error")); |
| await flushMicrotasks(); |
| |
| controller.add(3); |
| expect(await queue1.next, equals(3)); |
| expect(await queue2.next, equals(3)); |
| expect(await queue3.next, equals(3)); |
| expect(await queue4.next, equals(3)); |
| expect(await queue5.next, equals(3)); |
| |
| controller.close(); |
| expect(await queue1.hasNext, isFalse); |
| expect(await queue2.hasNext, isFalse); |
| expect(await queue3.hasNext, isFalse); |
| expect(await queue4.hasNext, isFalse); |
| expect(await queue5.hasNext, isFalse); |
| }); |
| |
| test("forwards events in order of forking", () async { |
| var queue1 = new StreamQueue(stream); |
| var queue2 = new StreamQueue(fork1); |
| var queue3 = new StreamQueue(fork2); |
| var queue4 = new StreamQueue(fork3); |
| var queue5 = new StreamQueue(fork4); |
| |
| for (var i = 0; i < 4; i++) { |
| controller.add(i); |
| |
| var queue1Fired = false; |
| var queue2Fired = false; |
| var queue3Fired = false; |
| var queue4Fired = false; |
| var queue5Fired = false; |
| |
| queue5.next.then(expectAsync((_) { |
| queue5Fired = true; |
| expect(queue1Fired, isTrue); |
| expect(queue2Fired, isTrue); |
| expect(queue3Fired, isTrue); |
| expect(queue4Fired, isTrue); |
| })); |
| |
| queue1.next.then(expectAsync((_) { |
| queue1Fired = true; |
| expect(queue2Fired, isFalse); |
| expect(queue3Fired, isFalse); |
| expect(queue4Fired, isFalse); |
| expect(queue5Fired, isFalse); |
| })); |
| |
| queue4.next.then(expectAsync((_) { |
| queue4Fired = true; |
| expect(queue1Fired, isTrue); |
| expect(queue2Fired, isTrue); |
| expect(queue3Fired, isTrue); |
| expect(queue5Fired, isFalse); |
| })); |
| |
| queue2.next.then(expectAsync((_) { |
| queue2Fired = true; |
| expect(queue1Fired, isTrue); |
| expect(queue3Fired, isFalse); |
| expect(queue4Fired, isFalse); |
| expect(queue5Fired, isFalse); |
| })); |
| |
| queue3.next.then(expectAsync((_) { |
| queue3Fired = true; |
| expect(queue1Fired, isTrue); |
| expect(queue2Fired, isTrue); |
| expect(queue4Fired, isFalse); |
| expect(queue5Fired, isFalse); |
| })); |
| } |
| }); |
| |
| test("pauses the source when all forks are paused and/or not listening", |
| () { |
| var sub1 = stream.listen(null); |
| var sub2 = fork1.listen(null); |
| expect(controller.isPaused, isFalse); |
| |
| sub1.pause(); |
| expect(controller.isPaused, isFalse); |
| |
| sub2.pause(); |
| expect(controller.isPaused, isTrue); |
| |
| var sub3 = fork2.listen(null); |
| expect(controller.isPaused, isFalse); |
| |
| sub3.pause(); |
| expect(controller.isPaused, isTrue); |
| |
| sub2.resume(); |
| expect(controller.isPaused, isFalse); |
| |
| sub2.cancel(); |
| expect(controller.isPaused, isTrue); |
| }); |
| |
| test("cancels the source when all forks are canceled", () async { |
| var sub1 = stream.listen(null); |
| expect(controller.hasListener, isTrue); |
| |
| var sub2 = fork1.listen(null); |
| expect(controller.hasListener, isTrue); |
| |
| expect(sub1.cancel(), isNull); |
| await flushMicrotasks(); |
| expect(controller.hasListener, isTrue); |
| |
| expect(sub2.cancel(), isNull); |
| await flushMicrotasks(); |
| expect(controller.hasListener, isTrue); |
| |
| expect(fork2.listen(null).cancel(), isNull); |
| await flushMicrotasks(); |
| expect(controller.hasListener, isTrue); |
| |
| expect(fork3.listen(null).cancel(), isNull); |
| await flushMicrotasks(); |
| expect(controller.hasListener, isTrue); |
| |
| expect(fork4.listen(null).cancel(), completion(equals(42))); |
| await flushMicrotasks(); |
| expect(controller.hasListener, isFalse); |
| }); |
| }); |
| |
| group("modification during dispatch:", () { |
| test("forking during onCancel", () { |
| controller = new StreamController<int>(onCancel: expectAsync(() { |
| expect(stream.fork().toList(), completion(isEmpty)); |
| })); |
| stream = new ForkableStream<int>(controller.stream); |
| |
| stream.listen(null).cancel(); |
| }); |
| |
| test("forking during onPause", () { |
| controller = new StreamController<int>(onPause: expectAsync(() { |
| stream.fork().listen(null); |
| })); |
| stream = new ForkableStream<int>(controller.stream); |
| |
| stream.listen(null).pause(); |
| |
| // The fork created in onPause should have resumed the stream. |
| expect(controller.isPaused, isFalse); |
| }); |
| |
| test("forking during onData", () { |
| var sub; |
| sub = stream.listen(expectAsync((value1) { |
| expect(value1, equals(1)); |
| stream.fork().listen(expectAsync((value2) { |
| expect(value2, equals(2)); |
| })); |
| sub.cancel(); |
| })); |
| |
| controller.add(1); |
| controller.add(2); |
| }); |
| |
| test("canceling a fork during onData", () { |
| var fork = stream.fork(); |
| var forkSub = fork.listen(expectAsync((_) {}, count: 0)); |
| |
| stream.listen(expectAsync((_) => forkSub.cancel())); |
| controller.add(null); |
| }); |
| |
| test("forking during onError", () { |
| var sub; |
| sub = stream.listen(null, onError: expectAsync((error1) { |
| expect(error1, equals("error 1")); |
| stream.fork().listen(null, onError: expectAsync((error2) { |
| expect(error2, equals("error 2")); |
| })); |
| sub.cancel(); |
| })); |
| |
| controller.addError("error 1"); |
| controller.addError("error 2"); |
| }); |
| |
| test("canceling a fork during onError", () { |
| var fork = stream.fork(); |
| var forkSub = fork.listen(expectAsync((_) {}, count: 0)); |
| |
| stream.listen(null, onError: expectAsync((_) => forkSub.cancel())); |
| controller.addError("error"); |
| }); |
| |
| test("forking during onDone", () { |
| stream.listen(null, onDone: expectAsync(() { |
| expect(stream.fork().toList(), completion(isEmpty)); |
| })); |
| |
| controller.close(); |
| }); |
| |
| test("canceling a fork during onDone", () { |
| var fork = stream.fork(); |
| var forkSub = fork.listen(null, onDone: expectAsync(() {}, count: 0)); |
| |
| stream.listen(null, onDone: expectAsync(() => forkSub.cancel())); |
| controller.close(); |
| }); |
| }); |
| |
| group("throws an error when", () { |
| test("a cancelled stream is forked", () { |
| stream.listen(null).cancel(); |
| expect(stream.fork().toList(), completion(isEmpty)); |
| }); |
| |
| test("a cancelled stream is forked even when other forks are alive", () { |
| stream.fork().listen(null); |
| stream.listen(null).cancel(); |
| |
| expect(controller.hasListener, isTrue); |
| expect(stream.fork().toList(), completion(isEmpty)); |
| }); |
| |
| test("a closed stream is forked", () async { |
| controller.close(); |
| await stream.listen(null).asFuture(); |
| expect(stream.fork().toList(), completion(isEmpty)); |
| }); |
| }); |
| } |