// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

library barback.test.stream_pool_test;

import 'dart:async';

import 'package:barback/src/utils.dart';
import 'package:barback/src/utils/stream_pool.dart';
import 'package:scheduled_test/scheduled_test.dart';

import 'utils.dart';

main() {
  initConfig();

  group("buffered", () {
    test("buffers events from multiple inputs", () {
      var pool = new StreamPool<String>();

      var controller1 = new StreamController<String>();
      pool.add(controller1.stream);
      controller1.add("first");

      var controller2 = new StreamController<String>();
      pool.add(controller2.stream);
      controller2.add("second");

      // Call [toList] asynchronously to be sure that the events have been
      // buffered beforehand and aren't just being received unbuffered.
      expect(newFuture(() => pool.stream.toList()),
          completion(equals(["first", "second"])));

      pumpEventQueue().then((_) => pool.close());
    });

    test("buffers errors from multiple inputs", () {
      var pool = new StreamPool<String>();

      var controller1 = new StreamController<String>();
      pool.add(controller1.stream);
      controller1.add("first");

      var controller2 = new StreamController<String>();
      pool.add(controller2.stream);
      controller2.add("second");
      controller1.addError("third");
      controller2.addError("fourth");
      controller1.add("fifth");

      expect(newFuture(() {
        return pool.stream.transform(new StreamTransformer.fromHandlers(
            handleData: (data, sink) => sink.add(["data", data]),
            handleError: (error, stackTrace, sink) {
          sink.add(["error", error]);
        })).toList();
      }), completion(equals([
        ["data", "first"],
        ["data", "second"],
        ["error", "third"],
        ["error", "fourth"],
        ["data", "fifth"]
      ])));

      pumpEventQueue().then((_) => pool.close());
    });

    test("buffers inputs from a broadcast stream", () {
      var pool = new StreamPool<String>();
      var controller = new StreamController<String>.broadcast();
      pool.add(controller.stream);
      controller.add("first");
      controller.add("second");

      // Call [toList] asynchronously to be sure that the events have been
      // buffered beforehand and aren't just being received unbuffered.
      expect(newFuture(() => pool.stream.toList()),
          completion(equals(["first", "second"])));

      pumpEventQueue().then((_) => pool.close());
    });
  });

  group("broadcast", () {
    test("doesn't buffer inputs", () {
      var pool = new StreamPool<String>.broadcast();

      var controller1 = new StreamController<String>.broadcast();
      pool.add(controller1.stream);
      controller1.add("first");

      var controller2 = new StreamController<String>.broadcast();
      pool.add(controller2.stream);
      controller2.add("second");

      // Call [toList] asynchronously to be sure that the events have been
      // buffered beforehand and aren't just being received unbuffered.
      expect(newFuture(() => pool.stream.toList()), completion(isEmpty));

      pumpEventQueue().then((_) => pool.close());
    });

    test("doesn't buffer errors", () {
      var pool = new StreamPool<String>.broadcast();

      var controller1 = new StreamController<String>.broadcast();
      pool.add(controller1.stream);
      controller1.addError("first");

      var controller2 = new StreamController<String>.broadcast();
      pool.add(controller2.stream);
      controller2.addError("second");

      expect(newFuture(() {
        return pool.stream.transform(new StreamTransformer.fromHandlers(
            handleData: (data, sink) => sink.add(data),
            handleError: (error, stackTrace, sink) { sink.add(error); }))
            .toList();
      }), completion(isEmpty));

      pumpEventQueue().then((_) => pool.close());
    });

    test("doesn't buffer inputs from a buffered stream", () {
      var pool = new StreamPool<String>.broadcast();
      var controller = new StreamController<String>();
      pool.add(controller.stream);
      controller.add("first");
      controller.add("second");

      expect(pumpEventQueue().then((_) => pool.stream.toList()),
          completion(isEmpty));

      pumpEventQueue().then((_) => pool.close());
    });
  });

  for (var type in ["buffered", "broadcast"]) {
    group(type, () {
      var pool;
      var bufferedController;
      var bufferedStream;
      var bufferedSyncController;
      var broadcastController;
      var broadcastStream;
      var broadcastSyncController;

      setUp(() {
        if (type == "buffered") {
          pool = new StreamPool<String>();
        } else {
          pool = new StreamPool<String>.broadcast();
        }

        bufferedController = new StreamController<String>();
        pool.add(bufferedController.stream);

        bufferedSyncController = new StreamController<String>(sync: true);
        pool.add(bufferedSyncController.stream);

        broadcastController = new StreamController<String>.broadcast();
        pool.add(broadcastController.stream);

        broadcastSyncController =
          new StreamController<String>.broadcast(sync: true);
        pool.add(broadcastSyncController.stream);
      });

      test("emits events to a listener", () {
        expect(pool.stream.toList(), completion(equals(["first", "second"])));

        bufferedController.add("first");
        broadcastController.add("second");
        pumpEventQueue().then((_) => pool.close());
      });

      test("emits sync events synchronously", () {
        var events = [];
        pool.stream.listen(events.add);

        bufferedSyncController.add("first");
        expect(events, equals(["first"]));

        broadcastSyncController.add("second");
        expect(events, equals(["first", "second"]));
      });

      test("emits async events asynchronously", () {
        var events = [];
        pool.stream.listen(events.add);

        bufferedController.add("first");
        broadcastController.add("second");
        expect(events, isEmpty);

        expect(pumpEventQueue().then((_) => events),
            completion(equals(["first", "second"])));
      });

      test("doesn't emit events from removed streams", () {
        expect(pool.stream.toList(), completion(equals(["first", "third"])));

        bufferedController.add("first");
        expect(pumpEventQueue().then((_) {
          pool.remove(bufferedController.stream);
          bufferedController.add("second");
        }).then((_) {
          broadcastController.add("third");
          return pumpEventQueue();
        }).then((_) {
          pool.remove(broadcastController.stream);
          broadcastController.add("fourth");
          pool.close();
        }), completes);
      });
    });
  }
}
