blob: d1f8138d37a7752033f6f8c7f54ab768211bd7da [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides unit tests for test_data_reader.
"""
import contextlib
import csv
import os
import shutil
import tempfile
import textwrap
import unittest
import step
import test_data_reader
# TODO(sergeyberezin): move this to a separate package in infra.git.
# We're trying to be compatible with Python3 tempfile.TemporaryDirectory
# context manager here. And they used 'dir' as a keyword argument.
# pylint: disable=redefined-builtin
@contextlib.contextmanager
def temporary_directory(suffix="", prefix="tmp", dir=None,
keep_directory=False):
"""Create and return a temporary directory. This has the same
behavior as mkdtemp but can be used as a context manager. For
example:
with temporary_directory() as tmpdir:
...
Upon exiting the context, the directory and everything contained
in it are removed.
Args:
suffix, prefix, dir: same arguments as for tempfile.mkdtemp.
keep_directory (bool): if True, do not delete the temporary directory
when exiting. Useful for debugging.
Returns:
tempdir (str): full path to the temporary directory.
"""
tempdir = None # Handle mkdtemp raising an exception
try:
tempdir = tempfile.mkdtemp(suffix, prefix, dir)
yield tempdir
finally:
if tempdir and not keep_directory: # pragma: no branch
try:
shutil.rmtree(tempdir, ignore_errors=True)
except OSError as ex: # pragma: no cover
print("ERROR: {!r} while cleaning up {!r}".format(ex, tempdir))
raise
def _step(description=None, action_type=None, action=None, by=None, path=None,
value=None, duration=None, start_coordinate=None, end_coordinate=None,
device_type=None, device_versions=None, channel=None):
return step.Step.from_row(
description, action_type, action, by, path, value, duration,
start_coordinate, end_coordinate, device_type, device_versions, channel)
# Sample data for test cases with their respectives steps.
TEST_CASE_1 = test_data_reader.TestCase(
'Test Case 1',
steps=[
_step(description='Step 1 of Test Case 1', action_type='tap',
action='Tap', path='Search'),
_step(description='Step 2 of Test Case 1', action_type='input',
action='Type', by='id', path='Search', value='text',
device_type=step.TABLET, device_versions='10'),
_step(description='Step 3 of Test Case 1', action_type='tap',
action='Tap', by='id', path='Go', device_versions='8,9'),
],
)
TEST_CASE_EMPTY = test_data_reader.TestCase('Test Case Empty')
TEST_CASE_3 = test_data_reader.TestCase(
'Test Case 3', device_type=step.PHONE,
steps=[_step(description='Step 1 of Test Case 3', action_type='input',
action='Type', by='id', path='Search', value='text',
device_type=step.PHONE),
_step(description='Step 2 of Test Case 3', action_type='scroll',
action='SwipeWithCoordinates', value='Left',
start_coordinate='0.5,0.5', end_coordinate='1.7,2.8'),
_step(description='Step 3 of Test Case 3', action_type='tap',
action='Tap', by='id', path='Close', device_type=step.TABLET),
],
)
TEST_CASE_DISABLED = test_data_reader.TestCase(
'Test Case 4 Disabled', disabled=True,
steps=[_step(description='Step 1 of Test Case 4', action_type='input',
action='Type', by='id', path='Search', value='text'),
],
)
TEST_CASE_VERSIONED = test_data_reader.TestCase(
'Test Case 5 Versioned', device_type=step.PHONE,
device_major_versions=set([8, 9]),
steps=[_step(description='Step 1 of Test Case 5', action_type='input',
action='Type', by='id', path='Search', value='text',
device_type=step.PHONE),
],
)
TEST_CASE_NAMED = test_data_reader.TestCase(
'Test Case 6 Named', device_type='device_name',
steps=[_step(description='Step 1 of Test Case 6', action_type='input',
action='Type', by='id', path='Search', value='text',
device_type='device_name'),
],
)
ROWS = TEST_CASE_1.to_rows() + TEST_CASE_EMPTY.to_rows() + TEST_CASE_3.to_rows()
ROWS += TEST_CASE_DISABLED.to_rows() + TEST_CASE_VERSIONED.to_rows()
ROWS += TEST_CASE_NAMED.to_rows()
STEPS = [step.Step.from_row(*row) for row in ROWS]
def update_description(test, n):
t = test.copy()
t.description = ' (Test No. %d) %s' % (n, test.description)
return t
def number_testcases(testcases):
"""Add test number text to the description.
This is just to hide variables from the list comprehension.
"""
return [update_description(t, n + 1) for n, t in enumerate(testcases)]
TESTCASES = [TEST_CASE_1, TEST_CASE_EMPTY, TEST_CASE_3, TEST_CASE_DISABLED,
TEST_CASE_VERSIONED, TEST_CASE_NAMED]
NUMBERED_TESTCASES = number_testcases(TESTCASES)
def update_steps(test, steps):
t = test.copy()
t.steps = steps
return t
class TestDataReaderTest(unittest.TestCase):
"""Tests test_data_reader.
"""
def testEnumerateTestSuites(self):
numbered = test_data_reader.EnumerateTestSuites({'test': TESTCASES})
self.assertEqual(' (Test No. 1) '+TESTCASES[0].description,
' (Test No. %d) %s' % (numbered['test'][0].test_number,
numbered['test'][0].description))
self.assertEqual(' (Test No. 5) '+TESTCASES[4].description,
' (Test No. %d) %s' % (numbered['test'][4].test_number,
numbered['test'][4].description))
def testGetValidTestCasesForPhone(self):
"""Verifies valid test cases for phone.
"""
t1, _, t3 = TESTCASES[:3]
input_test_suites = {'test suite': TESTCASES}
expected_result = {'test suite': [
update_steps(t1, t1.steps[0:1]),
update_steps(t3, t3.steps[0:2]),
]}
self.maxDiff = None
filtered_tests = test_data_reader.FilterTestSuites(
input_test_suites, step.PHONE, None, 10, None)
self.assertEquals(expected_result, filtered_tests)
def testGetValidTestCasesForTablet(self):
"""Verifies valid test cases for tablet.
"""
t1 = TESTCASES[0]
input_test_suites = {'test suite': TESTCASES}
expected_filtered = {'test suite': [
update_steps(t1, t1.steps[0:2]),
]}
self.maxDiff = None
filtered_tests = test_data_reader.FilterTestSuites(
input_test_suites, step.TABLET, None, 10, None)
self.assertEquals(expected_filtered, filtered_tests)
def testGetValidTestCasesForNamed(self):
"""Verifies valid test cases for named devices.
"""
t1 = TESTCASES[0]
input_test_suites = {'test suite': TESTCASES}
expected_result = {'test suite': [
update_steps(t1, t1.steps[0:1]),
TESTCASES[5],
]}
self.maxDiff = None
filtered_tests = test_data_reader.FilterTestSuites(
input_test_suites, None, 'device_name', 10, None)
self.assertEquals(expected_result, filtered_tests)
def testReadFileBadFormat(self):
with self.assertRaises(test_data_reader.TestDataReaderException):
test_data_reader._ReadFile('', file_format='bad')
def testReadTestSuitesBadFormat(self):
with self.assertRaises(test_data_reader.TestDataReaderException):
test_data_reader.ReadTestSuites(['nonexistent'], file_format='bad')
def testReadTestSuitesCSVEmpty(self):
with temporary_directory() as tmpdir:
filename = os.path.join(tmpdir, 'testcases.csv')
suite_name = filename[:-4] # Filename without the '.csv' suffix.
with open(filename, 'wb') as f:
w = csv.writer(f)
header = ['dummy'] * 11
# Non-start step should be ignored.
stp = _step(description='ignore me').to_row()
w.writerows([header] + [stp])
testsuites = test_data_reader.ReadTestSuites([filename])
self.assertEqual([], testsuites[suite_name])
def testReadTestSuitesCSV(self):
with temporary_directory() as tmpdir:
filename = os.path.join(tmpdir, 'testcases.csv')
suite_name = filename[:-4] # Filename without the '.csv' suffix.
with open(filename, 'wb') as f:
w = csv.writer(f)
header = ['dummy'] * 11
w.writerows([header] + ROWS)
self.maxDiff = None
testsuites = test_data_reader.ReadTestSuites([filename],
file_format='csv')
self.assertEqual(TESTCASES, testsuites[suite_name])
def testProtoActionNameInvalid(self):
with self.assertRaises(test_data_reader.TestDataReaderException):
test_data_reader._ProtoActionName('invalid action type', 'action')
def testProtoByInvalid(self):
with self.assertRaises(test_data_reader.TestDataReaderException):
test_data_reader._ProtoBy(999)
def testProtoDeviceTypeInvalid(self):
with self.assertRaises(test_data_reader.TestDataReaderException):
test_data_reader._Proto2DeviceType(999)
def testReadTestSuitesProto(self):
with temporary_directory() as tmpdir:
filename = os.path.join(tmpdir, 'testcases.txt')
suite_name = filename[:-4] # Filename without the '.txt' suffix.
with open(filename, 'wb') as f:
# TODO(sergeyberezin): dump this file through proto:
# from google.protobuf import text_format
# text_format.MessageToString(proto)
f.write(textwrap.dedent("""\
test {
description: "Test Case 1"
step: {
description: "Step 1 of Test Case 1"
action: { tap: Tap }
path: "Search"
}
step: {
description: "Step 2 of Test Case 1"
action: { input: Type }
by: ID
path: "Search"
value: "text"
device_type: TABLET
device_major_versions: 10
}
step: {
description: "Step 3 of Test Case 1"
action: { tap: Tap }
by: ID
path: "Go"
device_major_versions: 8
device_major_versions: 9
}
}
test {
description: "Test Case Empty"
}
test {
description: "Test Case 3"
device_type: PHONE
step: {
description: "Step 1 of Test Case 3"
device_type: PHONE
action: { input: Type }
by: ID
path: "Search"
value: "text"
}
step: {
description: "Step 2 of Test Case 3"
action: { scroll: SwipeWithCoordinates }
value: "Left"
start_coordinate: { x: 0.5; y: 0.5 }
end_coordinate: { x: 1.7; y: 2.8 }
}
step: {
description: "Step 3 of Test Case 3"
action: { tap: Tap }
by: ID
path: "Close"
device_type: TABLET
}
}
"""))
expected = TESTCASES[:3]
self.maxDiff = None
testsuites = test_data_reader.ReadTestSuites([filename])
self.assertEqual(len(expected), len(testsuites[suite_name]))
self.assertEqual(expected, testsuites[suite_name])
if __name__ == '__main__':
unittest.main()