| #!/usr/bin/env python |
| # |
| # Copyright 2017 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Runs all Franky test cases for a given channel. |
| """ |
| |
| |
| import sys |
| |
| import argparse |
| import collections |
| import datetime |
| import glob |
| import itertools |
| import os |
| import shutil |
| import subprocess |
| import unittest |
| |
| import actions |
| from actions import action_error |
| from actions import factory |
| import app_info |
| from appium_util import appium_driver_util |
| from appium_util import input_formatter |
| from appium_util import timeout_util |
| import device_info |
| import driver_provider |
| import generate_report |
| import step as step_module |
| import test_data_reader |
| |
| |
| # App file extensions. |
| _APP_EXTENTION = { |
| appium_driver_util.IOS: '.app', |
| appium_driver_util.ANDROID: '.apk', |
| } |
| |
| # Chrome channels to run test cases against. |
| _CANARY = 'Canary' |
| _BETA = 'Beta' |
| |
| # Test results formats. |
| _HTML = 'html' |
| _JSON = 'json' |
| |
| _INSTRUMENTS_LOGS_PATH = '/Library/Caches/com.apple.dt.instruments' |
| # Reset the appium server after a certain number of tests. |
| # This is to clear memory. See Crbug.com/685300 |
| _SERVER_RESET_COUNT = 30 |
| |
| # All data needed after running a test case. result_code is an integer 0 if |
| # the test passes and 1 if it fails. 'test_output' is a string of |
| # |step.description| for each step in the test case. 'stacktrace' is |
| # a string for a test failure stacktrace. |
| TestResult = collections.namedtuple( |
| 'TestResult', ['result_code', 'test_case_description', |
| 'test_output', 'stacktrace']) |
| |
| # All data needed after running a test suite. 'test_results' is a list of |
| # tuples TestResult. |
| TestSuiteResult = collections.namedtuple( |
| 'TestSuiteResult', ['success_counter', 'failure_counter', 'test_results']) |
| |
| # Path for Accept and Continue button found on first screen of onboarding flow. |
| _ACCEPT_CONTINUE_PATH = 'Accept & Continue' |
| |
| # Franky reports bucket names on Google Cloud Storage. |
| _FRANKY_BUCKET_NAMES = { |
| appium_driver_util.ANDROID: 'franky-reports/android', |
| appium_driver_util.IOS: 'franky-reports/ios', |
| } |
| |
| |
| class FrankyTestCase(unittest.TestCase): |
| """Runs one test with steps provided in the initializer. |
| """ |
| |
| def __init__(self, test_description, steps, test_output, |
| driver_provider_instance, platform, verbose): |
| """Inits FrankyTestCase. |
| |
| Args: |
| test_description: A string representing the test method name. |
| steps: A list of Step objects. |
| test_output: An empty list, which will contain test output when the test |
| has finished running. |
| driver_provider_instance: An instance of DriverProvider class. |
| platform: A string representing the platform name. |
| verbose: A boolean to increase test output verbosity or not. |
| """ |
| super(FrankyTestCase, self).__init__(test_description) |
| self._steps = steps |
| self._test_output = test_output |
| self._driver_provider_instance = driver_provider_instance |
| self._platform = platform |
| self._verbose = verbose |
| |
| def setUp(self): |
| """Clears app data and automatically launches the app before each test. |
| """ |
| system_action = factory.CreateAction( |
| actions.system, self._driver_provider_instance.platform_name) |
| system_action().ResetApp(self._driver_provider_instance, None) |
| |
| def testFrankySteps(self): |
| """Maps test steps to their respective action type class for execution. |
| """ |
| for step in self._steps: |
| description = '*** %s' % step.description |
| if self._verbose: |
| print description |
| self._test_output.append(description) |
| # Pull the action module from |step|'s action type. |
| action_module = actions.ACTIONS[step.action_type] |
| # TODO(crbug.com/710213): make this less magical. |
| action_class = getattr(action_module, |
| self._platform + step.action_type.title()) |
| action_method = getattr(action_class, step.action) |
| try: |
| action_method(action_class(), self._driver_provider_instance, step) |
| except (action_error.VerificationError, action_error.TapError, |
| action_error.NoElementFoundError, action_error.InstallAppError, |
| action_error.ProcessError): |
| self._RecoverFromFailure() |
| raise |
| |
| def _RecoverFromFailure(self): |
| """Recovers from a test failure. |
| """ |
| # Common reasons of a failed test is the absence of wifi or modal |
| # First Run panel. |
| action = factory.CreateAction( |
| actions.element, self._driver_provider_instance.platform_name) |
| is_onboarding_screen_present = action().IsElementBySeleniumLocatorPresent( |
| self._driver_provider_instance, by=step_module.ID, |
| path=_ACCEPT_CONTINUE_PATH, |
| timeout=timeout_util.WAIT_FOR_ABSENCE_TIMEOUT) |
| if is_onboarding_screen_present: |
| onboarding_action = factory.CreateAction( |
| actions.onboarding, self._driver_provider_instance.platform_name) |
| onboarding_action().ByPassFirstRun(self._driver_provider_instance, None) |
| system_action = factory.CreateAction( |
| actions.system, self._driver_provider_instance.platform_name) |
| system_action().EnableWiFi(self._driver_provider_instance, None) |
| |
| |
| def _FormatTestOutput(output): |
| """Formats test output to 80 characters per line. |
| |
| Args: |
| output: A string representing the test output. |
| |
| Returns: |
| A string representing the formatted test output. |
| """ |
| formatted_output = '' |
| for line in output.strip().split('\n'): |
| while line: |
| formatted_output = formatted_output + line[:80] + '\n' |
| line = line[80:] |
| return formatted_output |
| |
| |
| def _ClearInstrumentsLogs(): |
| """Clears generated instruments logs.""" |
| if os.path.exists(_INSTRUMENTS_LOGS_PATH): |
| shutil.rmtree(_INSTRUMENTS_LOGS_PATH) |
| |
| |
| def _CreateTestSuite(steps, test_output, driver_provider_instance, platform, |
| verbose): |
| """Creates a Franky test suite. |
| |
| Adds one test case in the suite, which is the only test method in the child |
| class. This test method is reused with different arguments each test run. |
| |
| Args: |
| steps: A list of Step objects. |
| test_output: An empty list, which will contain test output when the test |
| has finished running. |
| driver_provider_instance: An instance of DriverProvider class. |
| platform: A string representing the platform name. |
| verbose: A boolean to increase test output verbosity or not. |
| |
| Returns: |
| A unittest.suite.TestSuite. |
| """ |
| test_loader = unittest.TestLoader() |
| test_names = test_loader.getTestCaseNames(FrankyTestCase) |
| suite = unittest.TestSuite() |
| suite.addTest(FrankyTestCase(test_names[0], steps, test_output, |
| driver_provider_instance, platform, verbose)) |
| return suite |
| |
| |
| def _RunTestSuite(test_suite, driver_provider_instance, no_retry, platform, |
| verbose): |
| """Runs test suite and analyzes test results. |
| |
| Args: |
| test_suite: A list of TestCase objects. |
| driver_provider_instance: A instance of DriverProvider. |
| no_retry: A boolean to rerun a test or not after a failure. |
| platform: A string representing the platform name. |
| verbose: A boolean to increase test output verbosity or not. |
| |
| Returns: |
| A tuple of TestSuiteResult. |
| """ |
| success_counter = 0 |
| failure_counter = 0 |
| test_counter = 0 |
| test_results = [] |
| for test_case in test_suite: |
| if verbose: |
| print '\n >>> Starting test with description %s' % test_case.description |
| test_output = ['\n'] |
| suite = _CreateTestSuite(test_case.steps, test_output, |
| driver_provider_instance, platform, verbose) |
| result = unittest.TextTestRunner(verbosity=1).run(suite) |
| # Rerun a test if it fails the first time. |
| if not result.wasSuccessful() and not no_retry: |
| _ClearInstrumentsLogs() |
| restart_message = '\n @@@ Rerunning test %s' % test_case.description |
| if verbose: |
| print restart_message |
| test_output.append('\n' + restart_message) |
| result = unittest.TextTestRunner(verbosity=1).run(suite) |
| # Analyze test result. |
| if result.wasSuccessful(): |
| result_code = 0 |
| success_counter += 1 |
| formatted_stacktrace = '' |
| else: |
| result_code = 1 |
| failure_counter += 1 |
| _, stacktrace = result.errors[-1] |
| formatted_stacktrace = _FormatTestOutput(stacktrace) |
| formatted_test_output = '\n\n' + _FormatTestOutput('\n'.join(test_output)) |
| test_results.append(TestResult(result_code, test_case.description, |
| formatted_test_output, formatted_stacktrace)) |
| test_counter = _ResetAppiumServer(test_counter, driver_provider_instance) |
| |
| # Kill appium server when done with tests. |
| appium_driver_util.KillAppiumServer(driver_provider_instance._appium_process) |
| return TestSuiteResult(success_counter, failure_counter, test_results) |
| |
| |
| def _ResetAppiumServer(test_count, driver_provider_instance): |
| """Resets the Appium Server periodically. |
| |
| Args: |
| test_count: The number of tests since the last reset. |
| driver_provider_instance: An instance of DriverProvider. |
| Returns: |
| The number of tests since the last reset. |
| """ |
| test_count += 1 |
| if test_count % _SERVER_RESET_COUNT == 0: |
| appium_driver_util.KillAppiumServer( |
| driver_provider_instance._appium_process) |
| # No need to start a new server, next action will start one automatically. |
| test_count = 0 |
| return test_count |
| |
| |
| def _CopyHTMLReportToCloudStorage(report_file_name, platform): |
| """Copies HTML report to Google Cloud Storage. |
| |
| Args: |
| report_file_name: A string for the HTML report file name. |
| platform: A string representing the platform name. |
| """ |
| # Network bandwidth and space in Google Cloud Storage are saved while copying |
| # html file with the -z option. The file is compressed before it is uploaded, |
| # but the actual file is left uncompressed on the local disk. |
| gsutil_command = ('gsutil cp -z html -a public-read %s gs://%s' % |
| (report_file_name, _FRANKY_BUCKET_NAMES[platform])) |
| subprocess.check_output(gsutil_command.split()) |
| |
| |
| def main(): |
| arg_parser = argparse.ArgumentParser( |
| description='Runs Franky test cases on a Chrome Channel.') |
| arg_parser.add_argument('channel', choices=[_CANARY, _BETA], |
| help='The Chrome channel to run test cases against.') |
| arg_parser.add_argument('platform', choices=[appium_driver_util.IOS, |
| appium_driver_util.ANDROID], |
| help='The platform to run tests against.') |
| arg_parser.add_argument( |
| '-t', '--test-suite', action='append', |
| help='Test suite file(s) to run; can be a glob. Multiple arguments ' |
| 'add up. Default are all files matching "*<channel>*.csv" in CWD.' |
| 'File type is determined by extension: .csv is CSV, .txt is text proto.') |
| arg_parser.add_argument( |
| '-s', '--appium-dir-path', |
| default=os.path.join(os.path.expanduser('~'), 'appium'), |
| help='The full path to appium server directory. A special value NONE ' |
| 'will keep it unset. The default location is %(default)s.') |
| arg_parser.add_argument( |
| '--appium-command', |
| default='node .', |
| help='The command to invoke appium server. A special value of NONE ' |
| 'disables Appium invocation and relies on an already running server. ' |
| 'Default: "%(default)s".') |
| arg_parser.add_argument('-a', '--app-path', |
| help='The full path to the .app bundle to be tested. ' |
| 'The default location is the current directory.') |
| arg_parser.add_argument('--use-prebuilt-wda', action='store_true', |
| default=False, |
| help='Do not rebuild the WebDriverAgent app.') |
| arg_parser.add_argument('--xcconfig', metavar='FILE', |
| help='XCode config for xcodebuild launching WDA.') |
| arg_parser.add_argument('-r', '--report-format', choices=[_HTML, _JSON], |
| default=_HTML, help='Generate a json or html report ' |
| 'out of the test results.') |
| arg_parser.add_argument('-n', '--no-retry', action='store_true', |
| default=False, help='Do not rerun a test.' |
| 'If not set, a test will rerun after it fails.') |
| arg_parser.add_argument('-v', '--verbose', action='store_true', |
| default=False, help='Increase test output verbosity.') |
| arg_parser.add_argument('-c', '--copy-to-cloud-storage', action='store_true', |
| default=False, help='Copy generated HTML reports to ' |
| 'Google Cloud Storage.') |
| arg_parser.add_argument('-d', '--host-address', action='store_true', |
| default='127.0.0.1', help='Default Host for Appium ' |
| 'server.') |
| args = arg_parser.parse_args() |
| |
| appium_dir_path = args.appium_dir_path |
| platform = args.platform |
| if appium_dir_path == 'NONE': |
| appium_dir_path = None |
| if args.appium_command == 'NONE': |
| args.appium_command = None |
| else: |
| args.appium_command = args.appium_command.split(' ') |
| app_path = args.app_path |
| if not app_path: |
| app_path = os.path.abspath(args.channel + _APP_EXTENTION[platform]) |
| if platform == appium_driver_util.IOS: |
| current_app_info = app_info.GetIOSAppInformation(app_path) |
| current_device_info = device_info.GetIOSDeviceInformation() |
| elif platform == appium_driver_util.ANDROID: |
| current_app_info = app_info.GetAndroidAppInformation(app_path) |
| current_device_info = device_info.GetAndroidDeviceInformation() |
| |
| driver_provider_instance = driver_provider.DriverProvider( |
| platform, args.channel, app_path, appium_dir_path, current_device_info, |
| current_app_info, args.host_address, args.use_prebuilt_wda, |
| args.xcconfig, args.appium_command) |
| device_major_version = input_formatter.GetDeviceMajorVersion( |
| current_device_info.version) |
| |
| test_files = args.test_suite |
| if not test_files: |
| test_files = ['*%s*.csv' % args.channel] |
| test_files = list(itertools.chain(*(glob.glob(f) for f in test_files))) |
| test_suites = test_data_reader.ReadTestSuites(test_files) |
| test_suites = test_data_reader.FilterTestSuites( |
| test_suites, current_device_info.device_class, device_major_version) |
| test_suites = test_data_reader.EnumerateTestSuites(test_suites) |
| verbose = args.verbose |
| for test_suite_name, test_suite in sorted(test_suites.iteritems()): |
| if verbose: |
| print '\n*** Running Test Suite with name %s ***\n' % test_suite_name |
| start_time = datetime.datetime.now().isoformat() |
| test_suite_result = _RunTestSuite(test_suite, driver_provider_instance, |
| args.no_retry, platform, verbose) |
| report_data = {'start_time': start_time, |
| 'stop_time': datetime.datetime.now().isoformat(), |
| 'description': test_suite_name, |
| 'app_info': current_app_info, |
| 'device_info': current_device_info, |
| 'test_suite_result': test_suite_result |
| } |
| report_format = args.report_format |
| report_file_name = generate_report.GenerateReport(report_format, |
| report_data) |
| if report_format == _HTML and args.copy_to_cloud_storage: |
| _CopyHTMLReportToCloudStorage(report_file_name, platform) |
| _ClearInstrumentsLogs() |
| |
| |
| if __name__ == '__main__': |
| main() |