| # Copyright 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| from __future__ import absolute_import |
| import configparser |
| import glob |
| import os |
| import re |
| import sys |
| |
| |
| CONFIG_FILE_NAME = '.expect_tests.cfg' |
| |
| |
| def get_python_root(path): |
| """Get the lowest directory with no __init__.py file. |
| |
| When ``path`` is pointing inside a Python package, this function returns the |
| directory directly containing this package. If ``path`` points outside of |
| a Python package, the it returns ``path``. |
| |
| Args: |
| path (str): arbitrary path |
| Returns: |
| root (str): ancestor directory, with no __init__.py file in it. |
| """ |
| if not os.path.exists(path): |
| raise ValueError('path must exist: %s') |
| |
| while path != os.path.dirname(path): |
| if not os.path.exists(os.path.join(path, '__init__.py')): |
| return path |
| path = os.path.dirname(path) |
| |
| # This is not supposed to happen, but in case somebody adds a __init__.py |
| # at the filesystem root ... |
| raise IOError("Unable to find a python root for %s" % path) |
| |
| |
| def parse_test_glob(test_glob): |
| """A test glob is composed of a path and a glob expression like: |
| '<path>:<glob>'. The path should point to a directory or a file inside |
| a Python package (it can be the root directory of that package). It can use |
| native path separator (e.g. '\\' on Windows) or '/'. |
| The glob is a Python name used to filter tests. |
| |
| A test name is the name of the method prepended with the class name. |
| |
| Example: |
| 'my/nice/package/test1/:TestA*', the package root being 'my/nice/package': |
| this matches all tests whose class name starts with 'TestA' inside all |
| files matching test1/*_test.py, like |
| ``TestABunchOfStuff.testCatFoodDeployment``. |
| |
| Args: |
| test_glob (str): a test glob |
| Returns: |
| (path, test_filter): absolute path and test filter glob. |
| """ |
| parts = test_glob.replace('/', os.sep).split(':') |
| |
| # On Windows test_glob may contain drive letter (e.g C:\blah\...). Put it back |
| # into parts[0]. |
| if (sys.platform == 'win32' and |
| len(parts) >= 2 and |
| len(parts[0]) == 1 and |
| parts[1] and parts[1][0] == '\\'): |
| parts[1] = parts[0] + ':\\' + parts[1] |
| parts = parts[1:] |
| |
| if len(parts) > 2: |
| raise ValueError('A test_glob should contain at most one colon (got %s)' |
| % test_glob) |
| if len(parts) == 2: |
| path, test_filter = parts |
| if os.sep in test_filter: |
| raise ValueError('A test filter cannot contain a slash (got %s)', |
| test_filter) |
| |
| if not test_filter: # empty string case |
| test_filter = ('*',) |
| else: |
| test_filter = (test_filter,) |
| else: |
| path, test_filter = parts[0], ('*',) |
| |
| path = os.path.abspath(path) |
| return path, test_filter |
| |
| |
| class PackageTestingContext(object): |
| def __init__(self, cwd, package_name, filters): |
| """Information to run a set of tests in a single package. |
| |
| See also parse_test_glob. |
| """ |
| # TODO(iannucci): let's scan packages too so that <path> can also be a |
| # glob. Then expect_tests can use a default of '*:*' when no tests are |
| # specified. |
| |
| self.cwd = cwd |
| self.package_name = package_name |
| # list of (path, filter) pairs. |
| # The path is a relative path to a subdirectory of |
| # os.path.join(self.cwd, self.package_name) in which to look for tests. |
| # Only tests whose name matches the glob are kept. |
| self.filters = filters |
| |
| def itermatchers(self): |
| """Iterate over all filters, and yield matchers for each of them. |
| |
| Yields: |
| path (str): restrict test listing to this subpackage. |
| matcher (SRE_Pattern): whitelist matcher |
| """ |
| for filt in self.filters: |
| # Implicitely append * to globs |
| one_glob = '%s%s' % (filt[1], '*' if '*' not in filt[1] else '') |
| matcher = re.compile('^(?:%s)$' % glob.fnmatch.translate(one_glob)) |
| |
| if matcher.pattern == '^$': |
| matcher = re.compile('^.*$') |
| |
| yield filt[0], matcher |
| |
| @classmethod |
| def from_path(cls, path, filters=('*',)): |
| path = os.path.abspath(path) |
| if not os.path.exists(path): |
| raise ValueError('Path does not exist: %s' % path) |
| cwd = get_python_root(path) |
| package_name = os.path.relpath(path, cwd).split(os.path.sep)[0] |
| # The path in which to look for tests. Only tests whose name matches the |
| # glob are kept. |
| relpath = os.path.relpath(path, os.path.join(cwd, package_name)) |
| |
| if not isinstance(filters, (list, tuple)): |
| raise ValueError('the "filter" parameter must be a tuple or a list, ' |
| 'got %s' % type(filters).__name__) |
| if len(filters) == 0: |
| filters = [(relpath, '*')] |
| else: |
| filters = [(relpath, filt) for filt in filters] |
| |
| return cls(cwd, package_name, filters) |
| |
| @classmethod |
| def from_context_list(cls, contexts): |
| """Merge several PackageTestingContext pointing to the same package.""" |
| cwd = set(context.cwd for context in contexts) |
| if len(cwd) > 1: |
| raise ValueError( |
| 'from_context_list() was given' |
| 'process contexts containing the following cwds, ' |
| 'but can only process contexts which all share a single cwd: ' |
| '%s' % str(cwd)) |
| |
| package_name = set(context.package_name for context in contexts) |
| if len(package_name) > 1: |
| raise ValueError( |
| 'from_context_list() was given' |
| 'process contexts containing the following package_name, ' |
| 'but can only process contexts which all share a single package_name: ' |
| '%s' % str(package_name)) |
| |
| filters = [] |
| for context in contexts: |
| filters.extend(context.filters) |
| |
| return cls(cwd.pop(), package_name.pop(), filters) |
| |
| |
| class ProcessingContext(object): |
| def __init__(self, testing_contexts): |
| """Information to run a set of tasks in a given working directory. |
| |
| Args: |
| testing_contexts (list): list of PackageTestingContext instances. |
| """ |
| self.cwd = testing_contexts[0].cwd |
| |
| # Merge testing_contexts by package |
| groups = {} |
| for context in testing_contexts: |
| if context.cwd != self.cwd: |
| raise ValueError('All package must have the same value for "cwd"') |
| groups.setdefault(context.package_name, []).append(context) |
| |
| self.testing_contexts = [PackageTestingContext.from_context_list(contexts) |
| for contexts in groups.values()] |
| |
| |
| def get_config(path): |
| """Get configuration values |
| |
| Reads the config file in provided path, and returns content. |
| See Python ConfigParser for general formatting syntax. |
| |
| Example: |
| [expect_tests] |
| skip=directory1 |
| directory2 |
| directory3 |
| |
| Args: |
| path (str): path to a directory. |
| |
| Returns: |
| black_list (set): blacklisted subdirectories. |
| """ |
| black_list = set() |
| |
| config_file_name = os.path.join(path, CONFIG_FILE_NAME) |
| parser = configparser.ConfigParser() |
| parser.read([config_file_name]) |
| |
| if not parser.has_section('expect_tests'): |
| return black_list |
| |
| if parser.has_option('expect_tests', 'skip'): |
| black_list.update(parser.get('expect_tests', 'skip').splitlines()) |
| return black_list |
| |
| |
| def get_runtime_contexts(test_globs): |
| """Compute the list of packages/filters to get tests from.""" |
| # Step 1: compute list of packages + subtree |
| testing_contexts = [] |
| for test_glob in test_globs: |
| path, test_filter = parse_test_glob(test_glob) |
| |
| if os.path.exists(os.path.join(path, '__init__.py')): |
| testing_contexts.append( |
| PackageTestingContext.from_path(path, test_filter)) |
| else: |
| # Look for all packages in path. |
| subpaths = [] |
| black_list = get_config(path) |
| |
| for filename in [x for x in os.listdir(path) if x not in black_list]: |
| abs_filename = os.path.join(path, filename) |
| if (os.path.isdir(abs_filename) |
| and os.path.isfile(os.path.join(abs_filename, '__init__.py'))): |
| subpaths.append(abs_filename) |
| |
| testing_contexts.extend( |
| [PackageTestingContext.from_path(subpath, test_filter) |
| for subpath in subpaths]) |
| |
| # Step 2: group by working directory - one process per wd. |
| groups = {} |
| for context in testing_contexts: |
| groups.setdefault(context.cwd, []).append(context) |
| return [ProcessingContext(contexts) for contexts in groups.values()] |