# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import argparse
import os
import subprocess
import sys
import unittest

from expect_tests import pipeline
from expect_tests import listing
from expect_tests.type_definitions import Test, MultiTest

SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
DATA_DIR = os.path.join(SCRIPT_DIR, 'data')
ROOT_DIR = os.path.dirname(os.path.dirname(SCRIPT_DIR))

class ParsingAndConfigTest(unittest.TestCase):
  def test_get_config(self):
    """Testing that reading the config file works.

    Tests requires a specific content for data/.expect_tests.cfg"""
    black_list = listing.get_config(DATA_DIR)
    self.assertEqual(black_list,
                     set(['directory1', 'directory2', 'ignored']))

  def test_parse_test_glob(self):
    self.assertEqual(listing.parse_test_glob('a/b/c'),
                     (os.path.abspath('a/b/c'), ('*',)))
    self.assertEqual(listing.parse_test_glob('a/b/c:'),
                     (os.path.abspath('a/b/c'), ('*',)))
    self.assertEqual(listing.parse_test_glob('a/b/c:Test'),
                     (os.path.abspath('a/b/c'), ('Test',)))
    self.assertEqual(listing.parse_test_glob('a/b/c:Test.Name'),
                     (os.path.abspath('a/b/c'), ('Test.Name',)))
    self.assertRaises(ValueError, listing.parse_test_glob, 'a:b:c',)
    self.assertRaises(ValueError, listing.parse_test_glob, 'a:b/c',)


class PathManipulationTest(unittest.TestCase):
  """Tests for all path-manipulating functions.

  This set uses checked-out files in the present repository to avoid mocking
  the I/O functions.
  """

  def test_get_python_root(self):
    """This function uses the directory structure under data/"""

    cases = [
      # The root of a directory with no __init__.py file is that directory
      (DATA_DIR, DATA_DIR),
      # The root of a package is the parent directory
      (os.path.join(DATA_DIR, 'package1'), DATA_DIR),
      # The root of a subpackage is the parent directory of the root package.
      (os.path.join(DATA_DIR, 'package1', 'subpackage1_1'), DATA_DIR)
      ]
    for path, result in cases:
      self.assertEqual(listing.get_python_root(path), result)

    # When the path does not exist, you get an error.
    self.assertRaises(ValueError, listing.get_python_root,
                      '____non-existing-path____')

  def test_single_dir_runtime_context(self):
    """Computing RuntimeContext from a single directory path."""
    test_globs = [DATA_DIR]
    contexts = listing.get_runtime_contexts(test_globs)
    self.assertEqual(len(contexts), 1)
    self.assertEqual(contexts[0].cwd, DATA_DIR)
    for testing_c in contexts[0].testing_contexts:
      self.assertNotEqual(testing_c.package_name, 'ignored')

  def test_single_package_runtime_context(self):
    """Computing RuntimeContext from a single package path."""
    test_globs = [os.path.join(DATA_DIR, 'package1')]
    contexts = listing.get_runtime_contexts(test_globs)
    self.assertEqual(len(contexts), 1)
    self.assertEqual(contexts[0].cwd, DATA_DIR)
    testing_c = contexts[0].testing_contexts
    self.assertEqual(len(testing_c), 1)
    self.assertEqual(testing_c[0].package_name, 'package1')

  def test_two_packages_runtime_context(self):
    """Computing RuntimeContext from two package paths."""
    test_globs = [os.path.join(DATA_DIR, 'package1'),
                  os.path.join(DATA_DIR, 'package2')]
    contexts = listing.get_runtime_contexts(test_globs)
    self.assertEqual(len(contexts), 1)
    self.assertEqual(contexts[0].cwd, DATA_DIR)
    testing_c = contexts[0].testing_contexts
    self.assertEqual(len(testing_c), 2)
    package_names = set()
    for testing_c in contexts[0].testing_contexts:
      package_names.add(testing_c.package_name)

    self.assertEqual(package_names, set(('package1', 'package2')))

  def test_package_and_directory_runtime_context(self):
    """Computing RuntimeContext from a package and a directory paths.
    """
    # 'package1' is specified both explicit and implicitly through DATA_DIR
    # We check that it is accounted for only once.
    test_globs = [DATA_DIR, os.path.join(DATA_DIR, 'package1')]
    contexts = listing.get_runtime_contexts(test_globs)
    self.assertEqual(len(contexts), 1)
    # 2 is the number of packages under DATA_DIR, not counting
    # the ignored ones.
    self.assertEqual(len(contexts[0].testing_contexts), 2)

    test_globs = [DATA_DIR, os.path.join(DATA_DIR, 'package1'),
                  os.path.join(DATA_DIR, 'package1', 'subpackage1_1')]
    contexts = listing.get_runtime_contexts(test_globs)
    self.assertEqual(len(contexts), 1)
    self.assertEqual(len(contexts[0].testing_contexts), 2)

  def test_package_testing_context_from_path(self):
    """Test the PackageTestingContext class"""
    package_name = 'package1'
    package1 = os.path.join(SCRIPT_DIR, 'data', package_name)

    context = listing.PackageTestingContext.from_path(package1)
    self.assertTrue(os.path.isabs(context.cwd))
    self.assertTrue(len(context.package_name) > 0)
    self.assertEqual(len(context.filters), 1)
    self.assertEqual(context.filters[0][1], '*')

    # Testing with an empty tuple.
    context = listing.PackageTestingContext.from_path(package1,
                                                       filters=())
    self.assertTrue(os.path.isabs(context.cwd))
    self.assertTrue(len(context.package_name) > 0)
    self.assertEqual(len(context.filters), 1)
    self.assertEqual(context.filters[0][1], '*')

    context = listing.PackageTestingContext.from_path(package1,
                                                       filters=('a*', 'b*'))
    self.assertTrue(os.path.isabs(context.cwd))
    self.assertTrue(len(context.package_name) > 0)
    self.assertEqual(len(context.filters), 2)
    self.assertEqual(context.filters[0][1], 'a*')
    self.assertEqual(context.filters[1][1], 'b*')

    self.assertRaises(ValueError,
                      listing.PackageTestingContext.from_path,
                      package1, filters=None)

  def test_merging_package_testing_context(self):
    """Merging PackageTestingContexts pointing at the same package.
    """
    package_name = 'package1'
    package1 = os.path.join(SCRIPT_DIR, 'data', package_name)
    package2 = os.path.join(SCRIPT_DIR, 'data', 'package2')
    other_package1 = os.path.join(SCRIPT_DIR, 'data', 'other', package_name)

    # from_context_list
    c1 = listing.PackageTestingContext.from_path(package1, filters=('a',))
    c2 = listing.PackageTestingContext.from_path(package1,
                                                      filters=('b','c'))
    context = listing.PackageTestingContext.from_context_list((c1, c2))
    self.assertEqual(len(context.filters), 3)
    self.assertEqual(set(filt[1] for filt in context.filters),
                     set(('a', 'b', 'c')))

    c1 = listing.PackageTestingContext.from_path(package1, filters=('a',))
    c2 = listing.PackageTestingContext.from_path(package2,
                                                      filters=('b','c'))
    self.assertRaises(ValueError,
                      listing.PackageTestingContext.from_context_list,
                      (c1, c2))

    # Same package name, different paths.
    c1 = listing.PackageTestingContext.from_path(package1, filters=('a',))
    c2 = listing.PackageTestingContext.from_path(other_package1,
                                                  filters=('b','c'))
    self.assertRaises(ValueError,
                      listing.PackageTestingContext.from_context_list,
                      (c1, c2))

    # Subpackage
    subpackage_path = 'subpackage1_1'
    subpackage1 = os.path.join(package1, subpackage_path)
    c1 = listing.PackageTestingContext.from_path(subpackage1)
    self.assertEqual(c1.package_name, 'package1')
    self.assertEqual(c1.filters, [(subpackage_path, '*')])

  def test_filter_glob_manipulation(self):
    """globs to filter tests are modified if they don't end with a *."""
    package_name = 'package1'
    package1 = os.path.join(SCRIPT_DIR, 'data', package_name)
    subpackage_path = 'subpackage1_1'
    subpackage1 = os.path.join(package1, subpackage_path)
    c1 = listing.PackageTestingContext.from_path(subpackage1,
                                                  filters=('a*',))
    self.assertEqual(c1.package_name, 'package1')
    self.assertEqual(c1.filters, [(subpackage_path, 'a*')])

    for subpath, matcher in c1.itermatchers():
      self.assertIsNotNone(matcher.match('a'))
      self.assertIsNotNone(matcher.match('ab'))
      self.assertIsNone(matcher.match('ba'))
      self.assertIsNone(matcher.match('b'))
      self.assertEqual(subpath, subpackage_path)

    # Test that a star is added to the filter.
    c1 = listing.PackageTestingContext.from_path(subpackage1,
                                                      filters=('a',))
    self.assertEqual(c1.package_name, 'package1')
    self.assertEqual(c1.filters, [(subpackage_path, 'a')])

    for subpath, matcher in c1.itermatchers():
      self.assertIsNotNone(matcher.match('a'))
      self.assertIsNotNone(matcher.match('ab'))
      self.assertIsNone(matcher.match('ba'))
      self.assertIsNone(matcher.match('b'))
      self.assertEqual(subpath, subpackage_path)


  def test_processing_context(self):
    """Test the ProcessingContext class"""
    package_name = 'package1'
    package1 = os.path.join(SCRIPT_DIR, 'data', package_name)
    subpackage1 = os.path.join(SCRIPT_DIR, 'data',
                               package_name, 'subpackage1_1')
    package2 = os.path.join(SCRIPT_DIR, 'data', 'package2')
    other_package1 = os.path.join(SCRIPT_DIR, 'data', 'other', package_name)

    c0 = listing.PackageTestingContext.from_path(package1)
    c1 = listing.PackageTestingContext.from_path(package1, filters=('a',))
    c2 = listing.PackageTestingContext.from_path(subpackage1,
                                                      filters=('d',))
    c3 = listing.PackageTestingContext.from_path(package2,
                                                      filters=('b','c'))
    c4 = listing.PackageTestingContext.from_path(other_package1)

    # A processing context is a cwd + testing contexts.
    # A testing context is cwd + one package name.
    context = listing.ProcessingContext((c1, c2))
    self.assertEqual(len(context.testing_contexts), 1)
    self.assertEqual(set(filt[1]
                         for filt in context.testing_contexts[0].filters),
                     set(('a', 'd')))

    context = listing.ProcessingContext((c0, c1, c2))
    self.assertEqual(len(context.testing_contexts), 1)
    self.assertEqual(set(filt[1]
                         for filt in context.testing_contexts[0].filters),
                     set(('*', 'a', 'd')))


    context = listing.ProcessingContext((c1, c2, c3))
    self.assertEqual(len(context.testing_contexts), 2)

    # Fails because there are two different cwd.
    self.assertRaises(ValueError, listing.ProcessingContext, (c1, c4))


class TestListingTest(unittest.TestCase):
  """Test functions related to listing tests."""
  def test_walk_package(self):
    """This function uses the directory structure under data/"""
    modules = pipeline.walk_package('package1', DATA_DIR)
    self.assertEqual(sorted(modules),
                     ['package1.file1_test', 'package1.file2_test',
                      'package1.subpackage1_1.file3_test',
                      'package1.subpackage1_1.subpackage1_1_1.file4_test'])

    modules = pipeline.walk_package('package1', DATA_DIR,
                                    subpath='subpackage1_1')
    self.assertEqual(sorted(modules),
                     ['package1.subpackage1_1.file3_test',
                      'package1.subpackage1_1.subpackage1_1_1.file4_test'])

    self.assertRaises(ValueError, pipeline.walk_package,
                      'package1', DATA_DIR, 'non-existing')

  def test_get_test_gens_package(self):
    def get_test_names(tests):
      test_names = []

      for gen in tests:
        for test in gen():
          if isinstance(test, MultiTest):
            subtests = test.tests
          else:
            subtests = [test]

          for subtest in subtests:
            self.assertIsInstance(subtest, Test)
            test_names.append(subtest.name)
      return test_names

    sys.path.insert(0, DATA_DIR)  # Ugh. But won't work otherwise.


    opts = argparse.Namespace(verbose=True)
    # TODO(pgervais): add a MultiTest in a package under data/
    package1 = os.path.join(DATA_DIR, 'package1')
    testing_context = listing.PackageTestingContext.from_path(package1)
    tests = pipeline.get_test_gens_package(testing_context, opts)
    self.assertEqual(
      sorted(get_test_names(tests)),
      ['package1.file1_test.File1Test.test_trivial_1',
       'package1.file2_test.File2Test.test_trivial_2',
       'package1.subpackage1_1.file3_test.File3Test.test_trivial_3',
       'package1.subpackage1_1.subpackage1_1_1.file4_test.' +
           'File4Test.test_trivial_4'])

    tests = pipeline.get_test_gens_package(
      testing_context, opts, subpath='subpackage1_1')
    self.assertEqual(
      sorted(get_test_names(tests)),
      ['package1.subpackage1_1.file3_test.File3Test.test_trivial_3',
       'package1.subpackage1_1.subpackage1_1_1.file4_test.' +
           'File4Test.test_trivial_4'])

    tests = pipeline.get_test_gens_package(
      testing_context, opts, subpath='subpackage1_1/subpackage1_1_1')

    self.assertEqual(
      get_test_names(tests),
      ['package1.subpackage1_1.subpackage1_1_1.file4_test.' +
       'File4Test.test_trivial_4'])


class TestLauncherScript(unittest.TestCase):
  """Test that expect_tests script actually works, in particular on Windows."""

  def call(self, args, cwd=ROOT_DIR):
    # Make sure our copy of expect_test is in front of the import list (in case
    # there's a system installed expect_tests package).
    pp = os.environ.get('PYTHONPATH', '').split(os.pathsep)
    pp.insert(0, ROOT_DIR)
    env = os.environ.copy()
    env['PYTHONPATH'] = os.pathsep.join(pp)
    proc = subprocess.Popen(
      args=[
        sys.executable, os.path.join(ROOT_DIR, 'scripts', 'expect_tests'),
      ] + list(args),
      executable=sys.executable,
      cwd=cwd,
      env=env,
      stdout=subprocess.PIPE,
      stderr=subprocess.STDOUT)
    out, _ = proc.communicate()
    return proc.returncode, out

  def test_list(self):
    ret, out = self.call(['list', DATA_DIR])
    if ret:
      print out
    self.assertEqual(ret, 0)
    self.assertEqual(
      sorted(out.splitlines()),
      ['package1.file1_test.File1Test.test_trivial_1',
       'package1.file2_test.File2Test.test_trivial_2',
       'package1.subpackage1_1.file3_test.File3Test.test_trivial_3',
       'package1.subpackage1_1.subpackage1_1_1.file4_test.' +
           'File4Test.test_trivial_4'])

  def test_fail_on_caught_import_exception(self):
    ret, out = self.call(
        ['test', '--jobs', '5', 'package'],
        cwd=os.path.join(DATA_DIR, 'exceptional', 'import_raises'))
    if ret == 0:
      print out
    self.assertIn('ImportError: No module named this_package_does_not_exist',
                  out)
    self.assertIn('ABORTED', out)
    self.assertNotEqual(ret, 0)

  def test_fail_on_uncaught_import_exception(self):
    ret, out = self.call(
        ['test', '--jobs', '5', 'package'],
        cwd=os.path.join(DATA_DIR, 'exceptional', 'import_crashes'))
    if ret == 0:
      print out
    self.assertIn('ABORTED', out.splitlines())
    self.assertNotEqual(ret, 0)

  def test_fail_on_uncaught_test_exception(self):
    ret, out = self.call(
        ['test', '--jobs', '5', 'package'],
        cwd=os.path.join(DATA_DIR, 'exceptional', 'test_raises'))
    if ret == 0:
      print out
    self.assertIn(
        '1 or more test runner processes crashed; run with `--jobs 1` to debug',
        out)
    self.assertIn('FAILED (crashed=1, process_crashed_errors=1)', out)
    self.assertNotEqual(ret, 0)
