blob: e04241d6f62d3896b58b111c00600ccd901fdfb1 [file] [log] [blame]
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This script reads test data from csv files.
"""
import collections
import csv
from google.protobuf import text_format
import itertools
import os
from appium_util import input_formatter
from protos import actions_pb2
from protos import test_plan_pb2
import step as step_module
FILE_FORMAT_AUTO='auto'
FILE_FORMAT_CSV='csv'
FILE_FORMAT_PROTO='proto'
class TestDataReaderException(Exception):
"""Franky-specific exception while reading test data files."""
pass
class TestCase(object):
"""Defines a test case object.
Fields:
description: (string) A short human-readable test description.
device_type: (string) Applicable device type for this test.
device_major_versions: (iterable of int) Applicable major device versions.
disabled: (bool) Whether the test should be skipped.
crbug_id: (int) crbug.com/<id> for a disabled test.
steps: (list of step.Step) List of test steps.
"""
def __init__(self, description, device_type=None,
device_major_versions=None, disabled=False, crbug_id=None,
steps=None):
self.description = description
self.device_type = device_type
self.device_major_versions = device_major_versions or set()
self.disabled = disabled
self.crbug_id = crbug_id
self.steps = steps or []
def to_dict(self):
return {
'description': self.description,
'device_type': self.device_type,
'device_major_versions': self.device_major_versions,
'disabled': self.disabled,
'crbug_id': self.crbug_id,
'steps': self.steps,
}
def __repr__(self):
return repr(self.to_dict())
def __eq__(self, other):
return self.to_dict() == other.to_dict()
def copy(self):
"""Make a deep copy of the TestCase instance."""
t = TestCase(self.description)
t.device_type = self.device_type
t.device_major_versions = set(self.device_major_versions)
t.disabled = self.disabled
t.crbug_id = self.crbug_id
t.steps = list(self.steps)
return t
def to_rows(self):
"""Return a list of CSV-compatible rows for the test."""
device_type = self.device_type or ''
# TODO(sergeyberezin): Currently, tests are disabled by adding the
# keyword "DISABLE" to the device_type column. We should have a
# separate column for that.
if self.disabled:
device_type = 'DISABLED'
header = [
self.description or '', '', 'StartTest', '', '', '', '', '', '',
device_type,
','.join(str(v) for v in sorted(self.device_major_versions)),
]
return [header] + [s.to_row() for s in self.steps]
def ReadTestSuites(files, file_format=FILE_FORMAT_AUTO):
"""Reads test suites from csv or proto files in the current working directory.
Args:
files: A list of test suite file names (strings).
file_format: A string representing the file format
('csv', 'proto' or 'auto').
Returns:
A dictionary mapping a test suite name to the list of TestCase objects:
{
'test_suite_name1' : [TestCase11, TestCase12, ...],
'test_suite_name2' : [TestCase21, TestCase22, ...],
}
"""
all_test_suites = {}
for filename in files:
suite_name, _ = os.path.splitext(filename)
all_test_suites[suite_name] = _ReadFile(filename, file_format)
return all_test_suites
def FilterTestSuites(test_suites, device_type, device_major_version):
"""Keep only valid tests and steps in all the test suites.
Args:
test_suites: (dict) mapping of {'test suite name' => [TestCase, ...]}
device_type: (string) the connected device type.
device_major_version: (int) the connected device major version.
Returns:
test_suites (dict) with only those tests and steps that match the
connected device type.
"""
filtered = {}
for name, tests in test_suites.iteritems():
filtered[name] = _FilterValidTestCases(tests, device_type,
device_major_version)
return filtered
def EnumerateTestSuites(test_suites):
"""Update test names with sequential numbers in each test suite."""
numbered = {}
for suite_name, testcases in test_suites.iteritems():
numbered[suite_name] = _EnumerateTests(testcases)
return numbered
def _ReadFile(filename, file_format):
if file_format == FILE_FORMAT_AUTO:
if filename.endswith('.csv'):
file_format = FILE_FORMAT_CSV
else:
file_format = FILE_FORMAT_PROTO
reader = {
FILE_FORMAT_CSV: _ReadTestCasesFromCSVFile,
FILE_FORMAT_PROTO: _ReadTestCasesFromProtoFile,
}.get(file_format)
if not reader:
raise TestDataReaderException(
'test_data_reader.ReadTestSuites: unsupported format %s' % file_format)
return reader(filename)
def _EnumerateTests(testcases):
new_tests = []
for n, test in enumerate(testcases):
new_test = test.copy()
new_test.description = ' (Test No. %d) %s' % (n + 1, test.description)
new_tests.append(new_test)
return new_tests
def _IsValidTestStep(step, device_type, device_major_version):
"""Returns True if a test step should be run on the device.
Args:
step: A Step object.
device_type: A string representing the connected device type.
device_major_version: An int representing the connected device major version.
Returns:
A Boolean True if the test step should be run, or False if not.
"""
if step.device_versions:
device_major_versions = {input_formatter.GetDeviceMajorVersion(version)
for version in step.device_versions}
if device_major_version not in device_major_versions:
return False
return not step.device_type or step.device_type == device_type
def _ReadTestCasesFromCSVFile(file_path):
"""Reads test cases from csv file.
Args:
file_path: A string representing the file path to read data from.
Returns:
A list of TestCase objects.
"""
with open(file_path, 'rb') as csv_file:
csv_reader = csv.reader(csv_file)
# Remove the first line of the csv file which are the attribute names.
test_data = itertools.islice(csv_reader, 1, None)
steps = [step_module.Step.from_row(*row) for row in test_data]
return _RowsToTestCases(steps)
def _ReadTestCasesFromProtoFile(file_path):
"""Reads test cases from text proto file.
Args:
file_path: A string representing the file path to read data from.
device_type: A string representing the connected device type.
device_major_version: An int representing the device major version.
Returns:
A list TestCase objects.
"""
testplan = test_plan_pb2.TestPlan()
with open(file_path) as f:
text_format.Merge(f.read(), testplan)
return [_Proto2TestCase(t) for t in testplan.test]
def _Proto2TestCase(proto):
"""Convert proto representation of a test case to TestCase."""
t = TestCase(
proto.description,
device_type=_Proto2DeviceType(proto.device_type),
device_major_versions=set(str(v) for v in proto.device_major_versions))
t.steps = [_Proto2Step(step) for step in proto.step]
return t
def _ProtoActionName(action_type, action):
"""Get the action name from its action_type and enum action ID."""
action_class = {
'alert': actions_pb2.Action.AlertAction,
'element': actions_pb2.Action.ElementAction,
'input': actions_pb2.Action.InputAction,
'menu': actions_pb2.Action.MenuAction,
'omnibox': actions_pb2.Action.OmniboxAction,
'onboarding': actions_pb2.Action.OnboardingAction,
'scroll': actions_pb2.Action.ScrollAction,
'settings': actions_pb2.Action.SettingsAction,
'system': actions_pb2.Action.SystemAction,
'tap': actions_pb2.Action.TapAction,
'timeout': actions_pb2.Action.TimeoutAction,
}.get(action_type)
if not action_class:
raise TestDataReaderException('Invalid action_type: %s' % action_type)
return action_class.Name(action)
def _ProtoBy(by):
"""Convert the LocateBy enum value to a string representation."""
by = {
test_plan_pb2.Step.UNSET_BY: None,
test_plan_pb2.Step.UIAUTOMATION: step_module.UIAUTOMATION,
test_plan_pb2.Step.XPATH: step_module.XPATH,
test_plan_pb2.Step.ID: step_module.ID,
test_plan_pb2.Step.CLASSNAME: step_module.CLASS_NAME,
}.get(by, 'INVALID')
if by == 'INVALID':
raise TestDataReaderException('Invalid "by" value: %s' % by)
return by
def _Proto2DeviceType(device_type):
dt = {
test_plan_pb2.Test.UNSET_DEVICE_TYPE: None,
test_plan_pb2.Test.PHONE: 'phone',
test_plan_pb2.Test.TABLET: 'tablet',
}.get(device_type, 'INVALID')
if dt == 'INVALID':
raise TestDataReaderException('Invalid device_type: %s' % device_type)
return dt
def _Proto2Step(step):
"""Convert a Step proto message into a Step object."""
action_type = step.action.WhichOneof('action_type')
action = _ProtoActionName(action_type, getattr(step.action, action_type))
start_coordinate = None
if step.HasField('start_coordinate'):
start_coordinate = step_module.Point(step.start_coordinate.x,
step.start_coordinate.y)
end_coordinate = None
if step.HasField('end_coordinate'):
end_coordinate = step_module.Point(step.end_coordinate.x,
step.end_coordinate.y)
return step_module.Step(
description=step.description,
action_type=action_type,
action=action,
by=_ProtoBy(step.by),
path=step.path or None,
value=step.value or None,
duration=step.duration or None,
start_coordinate=start_coordinate,
end_coordinate=end_coordinate,
device_type=_Proto2DeviceType(step.device_type),
device_versions=set(str(v) for v in step.device_major_versions),
)
def _RowsToTestCases(steps):
"""Parses a sequence of rows from a spreadsheet into test cases.
Args:
steps: list of Steps. Each 'step' represents one row in a spreadsheet, some
'steps' being test case headers.
Returns:
List of TestCase objects.
"""
# If the first step in a test case is not 'StartTest' then all succeedding
# test steps are going to be ignored until a step with 'StartTest' is found.
test_plan = []
test_case = None
for step in steps:
# 'StartTest' keyword marks the beginning of a new test which also
# means the end of the previous test if any.
if step.action == 'StartTest':
# A new test case marks the end of a previous test case if any.
if test_case:
test_plan.append(test_case)
device_major_versions = {input_formatter.GetDeviceMajorVersion(version)
for version in step.device_versions}
disabled = step.device_type == 'DISABLED'
device_type = None if disabled else step.device_type
test_case = TestCase(step.description, device_type=device_type,
device_major_versions=device_major_versions,
disabled=disabled)
continue
if test_case:
test_case.steps.append(step)
# Add the last test case, if any.
if test_case:
test_plan.append(test_case)
return test_plan
def _IsValidTestCase(test_case, device_type, device_major_version):
"""Returns True if a test case should be run on the device.
Args:
test_case: A TestCase object.
device_type: A string representing the connected device type.
device_major_version: An int representing the connected device major version.
Returns:
A Boolean True if the test step should be run, or False if not.
"""
if test_case.disabled:
return False
if test_case.device_type and test_case.device_type != device_type:
return False
if not test_case.steps:
return False
if not test_case.device_major_versions:
return True
return device_major_version in test_case.device_major_versions
def _FilterValidSteps(testcase, device_type, device_major_version):
"""Create a copy of the test case with only matching steps for the device."""
t = testcase.copy()
t.steps = [s for s in t.steps if _IsValidTestStep(s, device_type,
device_major_version)]
return t
def _FilterValidTestCases(test_plan, device_type, device_major_version):
"""Keep only the tests that should be run on the current device."""
def filterValidSteps(t):
return _FilterValidSteps(t, device_type, device_major_version)
def isValidTestCase(t):
return _IsValidTestCase(t, device_type, device_major_version)
return [filterValidSteps(t) for t in test_plan if isValidTestCase(t)]