| #!/usr/bin/env python |
| # |
| # Copyright 2017 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Provides unit tests for test_data_reader. |
| """ |
| |
| import contextlib |
| import csv |
| import os |
| import shutil |
| import tempfile |
| import textwrap |
| import unittest |
| |
| import step |
| import test_data_reader |
| |
| |
| # TODO(sergeyberezin): move this to a separate package in infra.git. |
| |
| # We're trying to be compatible with Python3 tempfile.TemporaryDirectory |
| # context manager here. And they used 'dir' as a keyword argument. |
| # pylint: disable=redefined-builtin |
| @contextlib.contextmanager |
| def temporary_directory(suffix="", prefix="tmp", dir=None, |
| keep_directory=False): |
| """Create and return a temporary directory. This has the same |
| behavior as mkdtemp but can be used as a context manager. For |
| example: |
| |
| with temporary_directory() as tmpdir: |
| ... |
| |
| Upon exiting the context, the directory and everything contained |
| in it are removed. |
| |
| Args: |
| suffix, prefix, dir: same arguments as for tempfile.mkdtemp. |
| keep_directory (bool): if True, do not delete the temporary directory |
| when exiting. Useful for debugging. |
| |
| Returns: |
| tempdir (str): full path to the temporary directory. |
| """ |
| tempdir = None # Handle mkdtemp raising an exception |
| try: |
| tempdir = tempfile.mkdtemp(suffix, prefix, dir) |
| yield tempdir |
| |
| finally: |
| if tempdir and not keep_directory: # pragma: no branch |
| try: |
| shutil.rmtree(tempdir, ignore_errors=True) |
| except OSError as ex: # pragma: no cover |
| print("ERROR: {!r} while cleaning up {!r}".format(ex, tempdir)) |
| raise |
| |
| |
| def _step(description=None, action_type=None, action=None, by=None, path=None, |
| value=None, duration=None, start_coordinate=None, end_coordinate=None, |
| device_type=None, device_versions=None, channel=None): |
| return step.Step.from_row( |
| description, action_type, action, by, path, value, duration, |
| start_coordinate, end_coordinate, device_type, device_versions, channel) |
| |
| |
| # Sample data for test cases with their respectives steps. |
| TEST_CASE_1 = test_data_reader.TestCase( |
| 'Test Case 1', |
| steps=[ |
| _step(description='Step 1 of Test Case 1', action_type='tap', |
| action='Tap', path='Search'), |
| _step(description='Step 2 of Test Case 1', action_type='input', |
| action='Type', by='id', path='Search', value='text', |
| device_type=step.TABLET, device_versions='10'), |
| _step(description='Step 3 of Test Case 1', action_type='tap', |
| action='Tap', by='id', path='Go', device_versions='8,9'), |
| ], |
| ) |
| |
| TEST_CASE_EMPTY = test_data_reader.TestCase('Test Case Empty') |
| |
| TEST_CASE_3 = test_data_reader.TestCase( |
| 'Test Case 3', device_type=step.PHONE, |
| steps=[_step(description='Step 1 of Test Case 3', action_type='input', |
| action='Type', by='id', path='Search', value='text', |
| device_type=step.PHONE), |
| _step(description='Step 2 of Test Case 3', action_type='scroll', |
| action='SwipeWithCoordinates', value='Left', |
| start_coordinate='0.5,0.5', end_coordinate='1.7,2.8'), |
| _step(description='Step 3 of Test Case 3', action_type='tap', |
| action='Tap', by='id', path='Close', device_type=step.TABLET), |
| ], |
| ) |
| |
| TEST_CASE_DISABLED = test_data_reader.TestCase( |
| 'Test Case 4 Disabled', disabled=True, |
| steps=[_step(description='Step 1 of Test Case 4', action_type='input', |
| action='Type', by='id', path='Search', value='text'), |
| ], |
| ) |
| |
| TEST_CASE_VERSIONED = test_data_reader.TestCase( |
| 'Test Case 5 Versioned', device_type=step.PHONE, |
| device_major_versions=set([8, 9]), |
| steps=[_step(description='Step 1 of Test Case 5', action_type='input', |
| action='Type', by='id', path='Search', value='text', |
| device_type=step.PHONE), |
| ], |
| ) |
| |
| TEST_CASE_NAMED = test_data_reader.TestCase( |
| 'Test Case 6 Named', device_type='device_name', |
| steps=[_step(description='Step 1 of Test Case 6', action_type='input', |
| action='Type', by='id', path='Search', value='text', |
| device_type='device_name'), |
| ], |
| ) |
| |
| ROWS = TEST_CASE_1.to_rows() + TEST_CASE_EMPTY.to_rows() + TEST_CASE_3.to_rows() |
| ROWS += TEST_CASE_DISABLED.to_rows() + TEST_CASE_VERSIONED.to_rows() |
| ROWS += TEST_CASE_NAMED.to_rows() |
| STEPS = [step.Step.from_row(*row) for row in ROWS] |
| |
| |
| def update_description(test, n): |
| t = test.copy() |
| t.description = ' (Test No. %d) %s' % (n, test.description) |
| return t |
| |
| def number_testcases(testcases): |
| """Add test number text to the description. |
| |
| This is just to hide variables from the list comprehension. |
| """ |
| return [update_description(t, n + 1) for n, t in enumerate(testcases)] |
| |
| |
| TESTCASES = [TEST_CASE_1, TEST_CASE_EMPTY, TEST_CASE_3, TEST_CASE_DISABLED, |
| TEST_CASE_VERSIONED, TEST_CASE_NAMED] |
| NUMBERED_TESTCASES = number_testcases(TESTCASES) |
| |
| |
| def update_steps(test, steps): |
| t = test.copy() |
| t.steps = steps |
| return t |
| |
| |
| class TestDataReaderTest(unittest.TestCase): |
| """Tests test_data_reader. |
| """ |
| |
| def testEnumerateTestSuites(self): |
| numbered = test_data_reader.EnumerateTestSuites({'test': TESTCASES}) |
| self.assertEqual(' (Test No. 1) '+TESTCASES[0].description, |
| ' (Test No. %d) %s' % (numbered['test'][0].test_number, |
| numbered['test'][0].description)) |
| self.assertEqual(' (Test No. 5) '+TESTCASES[4].description, |
| ' (Test No. %d) %s' % (numbered['test'][4].test_number, |
| numbered['test'][4].description)) |
| |
| def testGetValidTestCasesForPhone(self): |
| """Verifies valid test cases for phone. |
| """ |
| t1, _, t3 = TESTCASES[:3] |
| input_test_suites = {'test suite': TESTCASES} |
| expected_result = {'test suite': [ |
| update_steps(t1, t1.steps[0:1]), |
| update_steps(t3, t3.steps[0:2]), |
| ]} |
| self.maxDiff = None |
| filtered_tests = test_data_reader.FilterTestSuites( |
| input_test_suites, step.PHONE, None, 10, None) |
| self.assertEquals(expected_result, filtered_tests) |
| |
| def testGetValidTestCasesForTablet(self): |
| """Verifies valid test cases for tablet. |
| """ |
| t1 = TESTCASES[0] |
| input_test_suites = {'test suite': TESTCASES} |
| expected_filtered = {'test suite': [ |
| update_steps(t1, t1.steps[0:2]), |
| ]} |
| self.maxDiff = None |
| |
| filtered_tests = test_data_reader.FilterTestSuites( |
| input_test_suites, step.TABLET, None, 10, None) |
| self.assertEquals(expected_filtered, filtered_tests) |
| |
| def testGetValidTestCasesForNamed(self): |
| """Verifies valid test cases for named devices. |
| """ |
| t1 = TESTCASES[0] |
| input_test_suites = {'test suite': TESTCASES} |
| expected_result = {'test suite': [ |
| update_steps(t1, t1.steps[0:1]), |
| TESTCASES[5], |
| ]} |
| self.maxDiff = None |
| filtered_tests = test_data_reader.FilterTestSuites( |
| input_test_suites, None, 'device_name', 10, None) |
| self.assertEquals(expected_result, filtered_tests) |
| |
| def testReadFileBadFormat(self): |
| with self.assertRaises(test_data_reader.TestDataReaderException): |
| test_data_reader._ReadFile('', file_format='bad') |
| |
| def testReadTestSuitesBadFormat(self): |
| with self.assertRaises(test_data_reader.TestDataReaderException): |
| test_data_reader.ReadTestSuites(['nonexistent'], file_format='bad') |
| |
| def testReadTestSuitesCSVEmpty(self): |
| with temporary_directory() as tmpdir: |
| filename = os.path.join(tmpdir, 'testcases.csv') |
| suite_name = filename[:-4] # Filename without the '.csv' suffix. |
| with open(filename, 'wb') as f: |
| w = csv.writer(f) |
| header = ['dummy'] * 11 |
| # Non-start step should be ignored. |
| stp = _step(description='ignore me').to_row() |
| w.writerows([header] + [stp]) |
| |
| testsuites = test_data_reader.ReadTestSuites([filename]) |
| self.assertEqual([], testsuites[suite_name]) |
| |
| def testReadTestSuitesCSV(self): |
| with temporary_directory() as tmpdir: |
| filename = os.path.join(tmpdir, 'testcases.csv') |
| suite_name = filename[:-4] # Filename without the '.csv' suffix. |
| with open(filename, 'wb') as f: |
| w = csv.writer(f) |
| header = ['dummy'] * 11 |
| w.writerows([header] + ROWS) |
| |
| self.maxDiff = None |
| testsuites = test_data_reader.ReadTestSuites([filename], |
| file_format='csv') |
| self.assertEqual(TESTCASES, testsuites[suite_name]) |
| |
| def testProtoActionNameInvalid(self): |
| with self.assertRaises(test_data_reader.TestDataReaderException): |
| test_data_reader._ProtoActionName('invalid action type', 'action') |
| |
| def testProtoByInvalid(self): |
| with self.assertRaises(test_data_reader.TestDataReaderException): |
| test_data_reader._ProtoBy(999) |
| |
| def testProtoDeviceTypeInvalid(self): |
| with self.assertRaises(test_data_reader.TestDataReaderException): |
| test_data_reader._Proto2DeviceType(999) |
| |
| def testReadTestSuitesProto(self): |
| with temporary_directory() as tmpdir: |
| filename = os.path.join(tmpdir, 'testcases.txt') |
| suite_name = filename[:-4] # Filename without the '.txt' suffix. |
| with open(filename, 'wb') as f: |
| # TODO(sergeyberezin): dump this file through proto: |
| # from google.protobuf import text_format |
| # text_format.MessageToString(proto) |
| f.write(textwrap.dedent("""\ |
| test { |
| description: "Test Case 1" |
| step: { |
| description: "Step 1 of Test Case 1" |
| action: { tap: Tap } |
| path: "Search" |
| } |
| step: { |
| description: "Step 2 of Test Case 1" |
| action: { input: Type } |
| by: ID |
| path: "Search" |
| value: "text" |
| device_type: TABLET |
| device_major_versions: 10 |
| } |
| step: { |
| description: "Step 3 of Test Case 1" |
| action: { tap: Tap } |
| by: ID |
| path: "Go" |
| device_major_versions: 8 |
| device_major_versions: 9 |
| } |
| } |
| test { |
| description: "Test Case Empty" |
| } |
| test { |
| description: "Test Case 3" |
| device_type: PHONE |
| step: { |
| description: "Step 1 of Test Case 3" |
| device_type: PHONE |
| action: { input: Type } |
| by: ID |
| path: "Search" |
| value: "text" |
| } |
| step: { |
| description: "Step 2 of Test Case 3" |
| action: { scroll: SwipeWithCoordinates } |
| value: "Left" |
| start_coordinate: { x: 0.5; y: 0.5 } |
| end_coordinate: { x: 1.7; y: 2.8 } |
| } |
| step: { |
| description: "Step 3 of Test Case 3" |
| action: { tap: Tap } |
| by: ID |
| path: "Close" |
| device_type: TABLET |
| } |
| } |
| """)) |
| |
| expected = TESTCASES[:3] |
| |
| self.maxDiff = None |
| testsuites = test_data_reader.ReadTestSuites([filename]) |
| self.assertEqual(len(expected), len(testsuites[suite_name])) |
| self.assertEqual(expected, testsuites[suite_name]) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |