blob: 1e78e6c3cc88dd1f8a7e9088582dbc223457f43f [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2021 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Parsing logic to read files for the Web App testing framework.
"""
from collections import defaultdict
import logging
import os
import re
from typing import Dict, List, Set, Tuple
from models import Action
from models import ActionCoverage
from models import ActionType
from models import ActionsByName
from models import CoverageTest
from models import PartialAndFullCoverageByBaseName
from models import TestIdsByPlatform
from models import TestIdsByPlatformSet
from models import TestPartitionDescription
from models import TestPlatform
MIN_COLUMNS_ACTIONS_FILE = 9
MIN_COLUMNS_SUPPORTED_ACTIONS_FILE = 5
MIN_COLUMNS_UNPROCESSED_COVERAGE_FILE = 6
def human_friendly_name_to_canonical_action_name(
human_friendly_action_name: str,
action_base_name_to_default_mode: Dict[str, str]):
"""
Converts a human-friendly action name (used in the spreadsheet) and turns
into the format compatible with this testing framework. This does two
things:
1) Resolving specified modes, from "()" format into a "_". For example,
"action(with_mode)" turns into "action_with_mode".
2) Resolving modeless actions (that have a default mode) to
include the default mode. For example, if
|action_base_name_to_default_mode| contains an entry for
|human_friendly_action_name|, then that entry is appended to the action.
"action" and {"action": "default_mode"} will respectively will return
"action_default_mode".
If neither of those cases apply, then the |human_friendly_action_name| is
returned.
"""
human_friendly_action_name = human_friendly_action_name.strip()
if human_friendly_action_name in action_base_name_to_default_mode:
# Handle default mode.
human_friendly_action_name += "_" + action_base_name_to_default_mode[
human_friendly_action_name]
elif '(' in human_friendly_action_name:
# Handle mode being specified.
human_friendly_action_name = human_friendly_action_name.replace(
"(", "_").rstrip(")")
return human_friendly_action_name
def read_platform_supported_actions(csv_file
) -> PartialAndFullCoverageByBaseName:
"""Reads the action base names and coverage from the given csv file.
Args:
csv_file: The comma-separated-values file which lists action base names
and whether it is fully or partially supported.
Returns:
A dictionary of action base name to a set of partially supported
and fully supported platforms.
"""
actions_base_name_to_coverage: PartialAndFullCoverageByBaseName = {}
column_offset_to_platform = {
0: TestPlatform.MAC,
1: TestPlatform.WINDOWS,
2: TestPlatform.LINUX,
3: TestPlatform.CHROME_OS
}
for i, row in enumerate(csv_file):
if not row:
continue
if row[0].startswith("#"):
continue
if len(row) < MIN_COLUMNS_SUPPORTED_ACTIONS_FILE:
raise ValueError(f"Row {i} does not contain enough entries. "
f"Got {row}.")
action_base_name: str = row[0].strip()
if action_base_name in actions_base_name_to_coverage:
raise ValueError(f"Action base name '{action_base_name}' on "
f"row {i} is already specified.")
if not re.fullmatch(r'[a-z_]+', action_base_name):
raise ValueError(
f"Invald action base name '{action_base_name}' on "
f"row {i}. Please use snake_case.")
fully_supported_platforms: Set[TestPlatform] = set()
partially_supported_platforms: Set[TestPlatform] = set()
for j, value in enumerate(row[1:5]):
value = value.strip()
if not value:
continue
if value == "🌕":
fully_supported_platforms.add(column_offset_to_platform[j])
elif value == "🌓":
partially_supported_platforms.add(column_offset_to_platform[j])
elif value != "🌑":
raise ValueError(f"Invalid coverage '{value}' on row {i}. "
f"Please use '🌕', '🌓', or '🌑'.")
actions_base_name_to_coverage[action_base_name] = (
partially_supported_platforms, fully_supported_platforms)
return actions_base_name_to_coverage
def read_actions_file(
actions_csv_file,
supported_platform_actions: PartialAndFullCoverageByBaseName
) -> Tuple[ActionsByName, Dict[str, str]]:
"""Reads the actions comma-separated-values file.
If modes are specified for an action in the file, then one action is
added to the results dictionary per action_base_name + mode
parameterized. A mode marked with a "*" is considered the default
mode for that action.
If output actions are specified for an action, then it will be a
PARAMETERIZED action and the output actions will be resolved into the
`Action.output_actions` field.
See the README.md for more information about actions and action templates.
Args:
actions_csv_file: The comma-separated-values file read to parse all
actions.
supported_platform_actions: A dictionary of platform to the actions that
are fully or partially covered on that
platform.
Returns (actions_by_name,
action_base_name_to_default_mode):
actions_by_name:
Index of all actions by action name.
action_base_name_to_default_mode:
Index of action base names to the default mode. Only populated
for actions with default modes.
Raises:
ValueError: The input file is invalid.
"""
actions_by_name: Dict[str, Action] = {}
action_base_name_to_default_mode: Dict[str, str] = {}
action_base_names: Set[str] = set()
all_output_action_names: List[str] = []
all_short_name: Set[str] = set()
for i, row in enumerate(actions_csv_file):
if not row:
continue
if row[0].startswith("#"):
continue
if len(row) < MIN_COLUMNS_ACTIONS_FILE:
raise ValueError(f"Row {i!r} does not contain enough entries. "
f"Got {row}.")
if row[8] == "Abandoned":
continue
action_base_name = row[0].strip()
action_base_names.add(action_base_name)
if not re.fullmatch(r'[a-z_]+', action_base_name):
raise ValueError(f"Invald action base name {action_base_name} on "
f"row {i!r}. Please use snake_case.")
short_name_base = row[3].strip()
if not short_name_base or short_name_base in all_short_name:
raise ValueError(
f"Short name '{short_name_base}' on line {i!r} is "
f"not populated or already used.")
type = ActionType.STATE_CHANGE
if action_base_name.startswith("check_"):
type = ActionType.STATE_CHECK
output_action_names = []
output_actions_str = row[2].strip()
if output_actions_str:
type = ActionType.PARAMETERIZED
# Output actions for parameterized actions can also specify (or
# assume default) action modes (e.g. `do_action(Mode1)`) if the
# parameterized action doesn't have a mode. However, they cannot be
# fully resolved yet without reading all actions. So the resolution
# must happen later.
output_action_names = [
output.strip() for output in output_actions_str.split("&")
]
# Keep track of all specified output actions for error checking.
# Resolve any parameters if they are specified.
all_output_action_names.extend([
name.replace("(", "_").rstrip(")")
for name in output_action_names
])
(partially_supported_platforms,
fully_supported_platforms) = supported_platform_actions.get(
action_base_name, (set(), set()))
modes = [mode.strip() for mode in row[1].split("|")]
if not modes:
modes = [""]
for mode in modes:
if not re.fullmatch(r'([A-Z]\w*\*?)|', mode):
raise ValueError(f"Invald action mode name {mode!r}) on row "
f"{i!r}. Please use PascalCase.")
if "*" in mode:
action_base_name_to_default_mode[
action_base_name] = mode.rstrip("*")
mode = mode.rstrip("*")
name = action_base_name
# Convert the `cpp_method` to Pascal-case
cpp_method = ''.join(word.title()
for word in action_base_name.split('_'))
output_action_names_with_mode: List[str] = []
short_name: str = ""
if mode != "":
# If the action has a real mode, then modify the name, short
# name, output actions, and cpp method.
name += "_" + mode
short_name = short_name_base + mode
output_action_names_with_mode = [
action_name + "_" + mode
for action_name in output_action_names
]
cpp_method += "(\"" + mode + "\")"
else:
output_action_names_with_mode = output_action_names
short_name = short_name_base
cpp_method += "()"
if name in actions_by_name:
raise ValueError(f"Cannot add duplicate action {name} on row "
f"{i!r}")
action = Action(name, action_base_name, short_name, cpp_method,
type, fully_supported_platforms,
partially_supported_platforms)
if output_action_names_with_mode:
action._output_action_names = output_action_names_with_mode
actions_by_name[action.name] = action
unused_supported_actions = set(
supported_platform_actions.keys()).difference(action_base_names)
if unused_supported_actions:
raise ValueError(f"Actions specified as suppored that are not in "
f"the actions list: {unused_supported_actions}.")
# Filter out empty strings from the output_action_base_names.
all_output_action_names = list(filter(len, all_output_action_names))
# Make sure all output actions are either resolvable or are base names.
for output_action_name in all_output_action_names:
if (output_action_name not in actions_by_name
and output_action_name not in action_base_names):
raise ValueError(f"Could not find action for "
f"specified output action {output_action_name}.")
# Resolve the output actions
for action in actions_by_name.values():
if action.type is not ActionType.PARAMETERIZED:
continue
assert (action._output_action_names)
for output_action_name in action._output_action_names:
# Output actions can specify a mode or assume default action modes
# if the parameterized action doesn't have a mode.
canonical_name = human_friendly_name_to_canonical_action_name(
output_action_name, action_base_name_to_default_mode)
if canonical_name in actions_by_name:
action.output_actions.append(actions_by_name[canonical_name])
else:
# Having this lookup fail is a feature, it allows a
# parameterized action to reference output actions that might
# not all support every mode of the parameterized action. When
# that mode is specified in a test case, then that action would
# be excluded & one less test case would be generated.
logging.info(f"Output action {canonical_name} not found for "
f"parameterized action {action.name}.")
assert (action.output_actions)
return (actions_by_name, action_base_name_to_default_mode)
def read_unprocessed_coverage_tests_file(
coverage_csv_file, actions_by_name: ActionsByName,
action_base_name_to_default_mode: Dict[str, str]
) -> List[CoverageTest]:
"""Reads the coverage tests comma-separated-values file.
The coverage tests file can have blank entries in the test row, and does not
have test names.
Args:
coverage_csv_file: The comma-separated-values file with all coverage
tests.
actions_by_name: An index of action name to Action
action_base_name_to_default_mode: An index of action base name to
default mode, if there is one.
Returns:
A list of CoverageTests read from the file.
Raises:
ValueError: The input file is invalid.
"""
missing_actions = []
required_coverage_tests = []
for i, row in enumerate(coverage_csv_file):
if not row:
continue
if row[0].strip().startswith("#"):
continue
if len(row) < MIN_COLUMNS_UNPROCESSED_COVERAGE_FILE:
raise ValueError(f"Row {i!r} does not have test actions: {row}")
platforms = TestPlatform.get_platforms_from_chars(row[0])
actions = []
for action_name in row[4:]:
action_name = action_name.strip()
if "," in action_name:
raise ValueError(f"Actions on row {i!r} cannot have "
f"multiple modes: {action_name}")
if action_name == "":
continue
action_name = human_friendly_name_to_canonical_action_name(
action_name, action_base_name_to_default_mode)
if action_name not in actions_by_name:
missing_actions.append(action_name)
logging.error(f"Could not find action on row {i!r}: "
f"{action_name}")
continue
actions.append(actions_by_name[action_name])
coverage_test = CoverageTest(actions, platforms)
required_coverage_tests.append(coverage_test)
if missing_actions:
raise ValueError(f"Actions missing from actions dictionary: "
f"{', '.join(missing_actions)}")
return required_coverage_tests
def get_tests_in_browsertest(file: str) -> Dict[str, Set[TestPlatform]]:
"""
Returns a dictionary of all test ids found to the set of detected platforms
the test is enabled on.
For reference, this is what a disabled test by a sheriff typically looks
like:
TEST_F(WebAppIntegrationTestBase, DISABLED_NavSiteA_InstallIconShown) {
...
}
In the above case, the test will be considered disabled on all platforms.
This is what a test disabled by a sheriff on a specific platform looks like:
#if defined(OS_WIN)
#define MAYBE_NavSiteA_InstallIconShown DISABLED_NavSiteA_InstallIconShown
#else
#define MAYBE_NavSiteA_InstallIconShown NavSiteA_InstallIconShown
#endif
TEST_F(WebAppIntegrationTestBase, MAYBE_NavSiteA_InstallIconShown) {
...
}
In the above case, the test will be considered disabled on
`TestPlatform.WINDOWS` and thus enabled on {`TestPlatform.MAC`,
`TestPlatform.CHROME_OS`, and `TestPlatform.LINUX`}.
"""
tests: Dict[str, Set[TestPlatform]] = {}
# Attempts to only match test test name in a test declaration, where the
# name contains the test id prefix. Purposefully allows any prefixes on
# the test name (like MAYBE_ or DISABLED_).
for match in re.finditer(fr'{CoverageTest.TEST_ID_PREFIX}(\w+)\)', file):
test_id = match.group(1)
tests[test_id] = set(TestPlatform)
test_name = f"{CoverageTest.TEST_ID_PREFIX}{test_id}"
if f"DISABLED_{test_name}" not in file:
continue
enabled_platforms: Set[TestPlatform] = tests[test_id]
for platform in TestPlatform:
# Search for macro that specifies the given platform before
# the string "DISABLED_<test_name>".
macro_for_regex = re.escape(platform.macro)
# This pattern ensures that there aren't any '{' or '}' characters
# between the macro and the disabled test name, which ensures that
# the macro is applying to the correct test.
if re.search(fr"{macro_for_regex}[^{{}}]+DISABLED_{test_name}",
file):
enabled_platforms.remove(platform)
return tests
def find_existing_and_disabled_tests(
test_partitions: List[TestPartitionDescription]
) -> Tuple[TestIdsByPlatformSet, TestIdsByPlatform]:
"""
Returns a dictionary of platform set to test id, and a dictionary of
platform to disabled test ids.
"""
existing_tests: TestIdsByPlatformSet = defaultdict(lambda: set())
disabled_tests: TestIdsByPlatform = defaultdict(lambda: set())
for partition in test_partitions:
for file in os.listdir(partition.browsertest_dir):
if not file.startswith(partition.test_file_prefix):
continue
platforms = frozenset(
TestPlatform.get_platforms_from_browsertest_filename(file))
filename = os.path.join(partition.browsertest_dir, file)
with open(filename) as f:
file = f.read()
tests = get_tests_in_browsertest(file)
for test_id in tests.keys():
if test_id in existing_tests[platforms]:
raise ValueError(f"Already found test {test_id}. "
f"Duplicate test in {filename}")
existing_tests[platforms].add(test_id)
for platform in platforms:
for test_id, enabled_platforms in tests.items():
if platform not in enabled_platforms:
disabled_tests[platform].add(test_id)
logging.info(f"Found tests in {filename}:\n{tests.keys()}")
return (existing_tests, disabled_tests)