| # Copyright 2017 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """This script reads test data from csv files. |
| """ |
| |
| import collections |
| import csv |
| from google.protobuf import text_format |
| import itertools |
| import os |
| |
| from appium_util import input_formatter |
| from protos import actions_pb2 |
| from protos import test_plan_pb2 |
| import step as step_module |
| |
| |
| FILE_FORMAT_AUTO='auto' |
| FILE_FORMAT_CSV='csv' |
| FILE_FORMAT_PROTO='proto' |
| |
| |
| class TestDataReaderException(Exception): |
| """Franky-specific exception while reading test data files.""" |
| pass |
| |
| |
| class TestCase(object): |
| """Defines a test case object. |
| |
| Fields: |
| description: (string) A short human-readable test description. |
| device_type: (string) Applicable device type for this test. |
| device_major_versions: (iterable of int) Applicable major device versions. |
| disabled: (bool) Whether the test should be skipped. |
| crbug_id: (int) crbug.com/<id> for a disabled test. |
| steps: (list of step.Step) List of test steps. |
| """ |
| def __init__(self, description, device_type=None, |
| device_major_versions=None, disabled=False, crbug_id=None, |
| steps=None): |
| self.description = description |
| self.device_type = device_type |
| self.device_major_versions = device_major_versions or set() |
| self.disabled = disabled |
| self.crbug_id = crbug_id |
| self.steps = steps or [] |
| |
| def to_dict(self): |
| return { |
| 'description': self.description, |
| 'device_type': self.device_type, |
| 'device_major_versions': self.device_major_versions, |
| 'disabled': self.disabled, |
| 'crbug_id': self.crbug_id, |
| 'steps': self.steps, |
| } |
| |
| def __repr__(self): |
| return repr(self.to_dict()) |
| |
| def __eq__(self, other): |
| return self.to_dict() == other.to_dict() |
| |
| def copy(self): |
| """Make a deep copy of the TestCase instance.""" |
| t = TestCase(self.description) |
| t.device_type = self.device_type |
| t.device_major_versions = set(self.device_major_versions) |
| t.disabled = self.disabled |
| t.crbug_id = self.crbug_id |
| t.steps = list(self.steps) |
| return t |
| |
| def to_rows(self): |
| """Return a list of CSV-compatible rows for the test.""" |
| device_type = self.device_type or '' |
| # TODO(sergeyberezin): Currently, tests are disabled by adding the |
| # keyword "DISABLE" to the device_type column. We should have a |
| # separate column for that. |
| if self.disabled: |
| device_type = 'DISABLED' |
| header = [ |
| self.description or '', '', 'StartTest', '', '', '', '', '', '', |
| device_type, |
| ','.join(str(v) for v in sorted(self.device_major_versions)), |
| ] |
| return [header] + [s.to_row() for s in self.steps] |
| |
| |
| def ReadTestSuites(files, file_format=FILE_FORMAT_AUTO): |
| """Reads test suites from csv or proto files in the current working directory. |
| |
| Args: |
| files: A list of test suite file names (strings). |
| file_format: A string representing the file format |
| ('csv', 'proto' or 'auto'). |
| |
| Returns: |
| A dictionary mapping a test suite name to the list of TestCase objects: |
| { |
| 'test_suite_name1' : [TestCase11, TestCase12, ...], |
| 'test_suite_name2' : [TestCase21, TestCase22, ...], |
| } |
| """ |
| all_test_suites = {} |
| for filename in files: |
| suite_name, _ = os.path.splitext(filename) |
| all_test_suites[suite_name] = _ReadFile(filename, file_format) |
| return all_test_suites |
| |
| |
| def FilterTestSuites(test_suites, device_type, device_major_version): |
| """Keep only valid tests and steps in all the test suites. |
| |
| Args: |
| test_suites: (dict) mapping of {'test suite name' => [TestCase, ...]} |
| device_type: (string) the connected device type. |
| device_major_version: (int) the connected device major version. |
| |
| Returns: |
| test_suites (dict) with only those tests and steps that match the |
| connected device type. |
| """ |
| filtered = {} |
| for name, tests in test_suites.iteritems(): |
| filtered[name] = _FilterValidTestCases(tests, device_type, |
| device_major_version) |
| return filtered |
| |
| |
| def EnumerateTestSuites(test_suites): |
| """Update test names with sequential numbers in each test suite.""" |
| numbered = {} |
| for suite_name, testcases in test_suites.iteritems(): |
| numbered[suite_name] = _EnumerateTests(testcases) |
| return numbered |
| |
| |
| def _ReadFile(filename, file_format): |
| if file_format == FILE_FORMAT_AUTO: |
| if filename.endswith('.csv'): |
| file_format = FILE_FORMAT_CSV |
| else: |
| file_format = FILE_FORMAT_PROTO |
| reader = { |
| FILE_FORMAT_CSV: _ReadTestCasesFromCSVFile, |
| FILE_FORMAT_PROTO: _ReadTestCasesFromProtoFile, |
| }.get(file_format) |
| if not reader: |
| raise TestDataReaderException( |
| 'test_data_reader.ReadTestSuites: unsupported format %s' % file_format) |
| return reader(filename) |
| |
| |
| def _EnumerateTests(testcases): |
| new_tests = [] |
| for n, test in enumerate(testcases): |
| new_test = test.copy() |
| new_test.description = ' (Test No. %d) %s' % (n + 1, test.description) |
| new_tests.append(new_test) |
| return new_tests |
| |
| |
| def _IsValidTestStep(step, device_type, device_major_version): |
| """Returns True if a test step should be run on the device. |
| |
| Args: |
| step: A Step object. |
| device_type: A string representing the connected device type. |
| device_major_version: An int representing the connected device major version. |
| |
| Returns: |
| A Boolean True if the test step should be run, or False if not. |
| """ |
| if step.device_versions: |
| device_major_versions = {input_formatter.GetDeviceMajorVersion(version) |
| for version in step.device_versions} |
| if device_major_version not in device_major_versions: |
| return False |
| return not step.device_type or step.device_type == device_type |
| |
| |
| def _ReadTestCasesFromCSVFile(file_path): |
| """Reads test cases from csv file. |
| |
| Args: |
| file_path: A string representing the file path to read data from. |
| |
| Returns: |
| A list of TestCase objects. |
| """ |
| with open(file_path, 'rb') as csv_file: |
| csv_reader = csv.reader(csv_file) |
| # Remove the first line of the csv file which are the attribute names. |
| test_data = itertools.islice(csv_reader, 1, None) |
| steps = [step_module.Step.from_row(*row) for row in test_data] |
| return _RowsToTestCases(steps) |
| |
| |
| def _ReadTestCasesFromProtoFile(file_path): |
| """Reads test cases from text proto file. |
| |
| Args: |
| file_path: A string representing the file path to read data from. |
| device_type: A string representing the connected device type. |
| device_major_version: An int representing the device major version. |
| |
| Returns: |
| A list TestCase objects. |
| """ |
| testplan = test_plan_pb2.TestPlan() |
| with open(file_path) as f: |
| text_format.Merge(f.read(), testplan) |
| |
| return [_Proto2TestCase(t) for t in testplan.test] |
| |
| |
| def _Proto2TestCase(proto): |
| """Convert proto representation of a test case to TestCase.""" |
| t = TestCase( |
| proto.description, |
| device_type=_Proto2DeviceType(proto.device_type), |
| device_major_versions=set(str(v) for v in proto.device_major_versions)) |
| t.steps = [_Proto2Step(step) for step in proto.step] |
| return t |
| |
| |
| def _ProtoActionName(action_type, action): |
| """Get the action name from its action_type and enum action ID.""" |
| action_class = { |
| 'alert': actions_pb2.Action.AlertAction, |
| 'element': actions_pb2.Action.ElementAction, |
| 'input': actions_pb2.Action.InputAction, |
| 'menu': actions_pb2.Action.MenuAction, |
| 'omnibox': actions_pb2.Action.OmniboxAction, |
| 'onboarding': actions_pb2.Action.OnboardingAction, |
| 'scroll': actions_pb2.Action.ScrollAction, |
| 'settings': actions_pb2.Action.SettingsAction, |
| 'system': actions_pb2.Action.SystemAction, |
| 'tap': actions_pb2.Action.TapAction, |
| 'timeout': actions_pb2.Action.TimeoutAction, |
| }.get(action_type) |
| if not action_class: |
| raise TestDataReaderException('Invalid action_type: %s' % action_type) |
| return action_class.Name(action) |
| |
| |
| def _ProtoBy(by): |
| """Convert the LocateBy enum value to a string representation.""" |
| by = { |
| test_plan_pb2.Step.UNSET_BY: None, |
| test_plan_pb2.Step.UIAUTOMATION: step_module.UIAUTOMATION, |
| test_plan_pb2.Step.XPATH: step_module.XPATH, |
| test_plan_pb2.Step.ID: step_module.ID, |
| test_plan_pb2.Step.CLASSNAME: step_module.CLASS_NAME, |
| }.get(by, 'INVALID') |
| if by == 'INVALID': |
| raise TestDataReaderException('Invalid "by" value: %s' % by) |
| return by |
| |
| |
| def _Proto2DeviceType(device_type): |
| dt = { |
| test_plan_pb2.Test.UNSET_DEVICE_TYPE: None, |
| test_plan_pb2.Test.PHONE: 'phone', |
| test_plan_pb2.Test.TABLET: 'tablet', |
| }.get(device_type, 'INVALID') |
| if dt == 'INVALID': |
| raise TestDataReaderException('Invalid device_type: %s' % device_type) |
| return dt |
| |
| |
| def _Proto2Step(step): |
| """Convert a Step proto message into a Step object.""" |
| action_type = step.action.WhichOneof('action_type') |
| action = _ProtoActionName(action_type, getattr(step.action, action_type)) |
| start_coordinate = None |
| if step.HasField('start_coordinate'): |
| start_coordinate = step_module.Point(step.start_coordinate.x, |
| step.start_coordinate.y) |
| end_coordinate = None |
| if step.HasField('end_coordinate'): |
| end_coordinate = step_module.Point(step.end_coordinate.x, |
| step.end_coordinate.y) |
| return step_module.Step( |
| description=step.description, |
| action_type=action_type, |
| action=action, |
| by=_ProtoBy(step.by), |
| path=step.path or None, |
| value=step.value or None, |
| duration=step.duration or None, |
| start_coordinate=start_coordinate, |
| end_coordinate=end_coordinate, |
| device_type=_Proto2DeviceType(step.device_type), |
| device_versions=set(str(v) for v in step.device_major_versions), |
| ) |
| |
| |
| def _RowsToTestCases(steps): |
| """Parses a sequence of rows from a spreadsheet into test cases. |
| |
| Args: |
| steps: list of Steps. Each 'step' represents one row in a spreadsheet, some |
| 'steps' being test case headers. |
| Returns: |
| List of TestCase objects. |
| """ |
| # If the first step in a test case is not 'StartTest' then all succeedding |
| # test steps are going to be ignored until a step with 'StartTest' is found. |
| test_plan = [] |
| test_case = None |
| for step in steps: |
| # 'StartTest' keyword marks the beginning of a new test which also |
| # means the end of the previous test if any. |
| if step.action == 'StartTest': |
| # A new test case marks the end of a previous test case if any. |
| if test_case: |
| test_plan.append(test_case) |
| device_major_versions = {input_formatter.GetDeviceMajorVersion(version) |
| for version in step.device_versions} |
| disabled = step.device_type == 'DISABLED' |
| device_type = None if disabled else step.device_type |
| test_case = TestCase(step.description, device_type=device_type, |
| device_major_versions=device_major_versions, |
| disabled=disabled) |
| continue |
| |
| if test_case: |
| test_case.steps.append(step) |
| # Add the last test case, if any. |
| if test_case: |
| test_plan.append(test_case) |
| return test_plan |
| |
| |
| def _IsValidTestCase(test_case, device_type, device_major_version): |
| """Returns True if a test case should be run on the device. |
| |
| Args: |
| test_case: A TestCase object. |
| device_type: A string representing the connected device type. |
| device_major_version: An int representing the connected device major version. |
| |
| Returns: |
| A Boolean True if the test step should be run, or False if not. |
| """ |
| if test_case.disabled: |
| return False |
| if test_case.device_type and test_case.device_type != device_type: |
| return False |
| if not test_case.steps: |
| return False |
| if not test_case.device_major_versions: |
| return True |
| return device_major_version in test_case.device_major_versions |
| |
| |
| def _FilterValidSteps(testcase, device_type, device_major_version): |
| """Create a copy of the test case with only matching steps for the device.""" |
| t = testcase.copy() |
| t.steps = [s for s in t.steps if _IsValidTestStep(s, device_type, |
| device_major_version)] |
| return t |
| |
| |
| def _FilterValidTestCases(test_plan, device_type, device_major_version): |
| """Keep only the tests that should be run on the current device.""" |
| def filterValidSteps(t): |
| return _FilterValidSteps(t, device_type, device_major_version) |
| def isValidTestCase(t): |
| return _IsValidTestCase(t, device_type, device_major_version) |
| |
| return [filterValidSteps(t) for t in test_plan if isValidTestCase(t)] |