| # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| """The finalize test is the last step before DUT switching to release image. |
| |
| Description |
| ----------- |
| The test invokes ``gooftool finalize`` with specified arguments to switch the |
| machine to shipping state, in following steps: |
| |
| 1. Run preflight tasks, including: |
| |
| a. Download HWID file from server if available. |
| b. Log test states. |
| c. Log image versions. |
| |
| 2. Call ``gooftool finalize``, which executes following sub commands in order: |
| |
| a. Verify firmware, keys, disk image, hardware components... etc. (equivalent |
| to ``gooftool verify``) |
| b. Clear manufacturing flags in firmware (equivalent to ``gooftool |
| clear_gbb_flags``) |
| c. Enable software write protect (equivalent to ``gooftool write_protect``) |
| d. Start wiping process (equivalent to ``gooftool wipe_in_place``), which will |
| do the following tasks: |
| |
| 1. Wipe stateful partiton |
| 2. Enable release partition |
| 3. Notify factory server |
| 4. Battery cutoff |
| |
| You can use ``gooftool_waive_list`` and ``gooftool_skip_list`` to waive or skip |
| some gooftool steps. |
| |
| Test Procedure |
| -------------- |
| When started, the pytest runs a few preflight tasks, to check configuration or |
| prepare logs. |
| |
| After that, ``gooftool finalize`` will be called, and it will check device's |
| state, from hardware to software configuration. |
| |
| If everything looks good (or waived, skipped by test arguments), ``gooftool`` |
| will enable shipping mode by clearing firmware manufacturing flags, enabling |
| write protection, enabling release image, wiping out manufacturing disk data, |
| cutting off battery. |
| |
| During battery cutoff, operator might be prompted to plug / unplug charger if |
| battery charge percentage is too low or too high. |
| |
| Dependency |
| ---------- |
| Almost everything essential to Chrome OS, especially: |
| |
| * crossystem (developer switch status, hardware WP status) |
| * battery driver (read battery percentage from sysfs) |
| * flashrom (to turn on software WP) |
| * TPM (read from sysfs) |
| * frecon (to show wipe progress and instructions) |
| * network connection (to notify factory server) |
| * clobber-state (/sbin/clobber-state, which wipes stateful partition) |
| |
| Examples |
| -------- |
| A minimum example should be:: |
| |
| { |
| "pytest_name": "finalize" |
| } |
| |
| Where, |
| |
| * ``write_protection`` will be ``True`` for PVT phase, otherwise ``False``. |
| * ``enable_factory_server`` is ``True``, will try to connect to factory server |
| and update HWID data, flush event logs. |
| * All gooftool verification rules are not skipped or waived. |
| |
| For early builds (PROTO, EVT), you can skip things that are not ready:: |
| |
| { |
| "pytest_name": "finalize", |
| "args": { |
| "gooftool_skip_list": ["clear_gbb_flags"], |
| "write_protection": false, |
| "gooftool_waive_list": ["verify_tpm", "verify_hwid"] |
| } |
| } |
| """ |
| |
| |
| import json |
| import logging |
| import os |
| import random |
| import subprocess |
| import threading |
| |
| import yaml |
| |
| import factory_common # pylint: disable=unused-import |
| from cros.factory.device import device_utils |
| from cros.factory.device.links import ssh |
| from cros.factory.test import device_data |
| from cros.factory.test.env import paths |
| from cros.factory.test import event_log # TODO(chuntsen): Deprecate event log. |
| from cros.factory.test import gooftools |
| from cros.factory.test.i18n import _ |
| from cros.factory.test.rules import phase |
| from cros.factory.test import server_proxy |
| from cros.factory.test import session |
| from cros.factory.test import state |
| from cros.factory.test import test_case |
| from cros.factory.test.utils import deploy_utils |
| from cros.factory.test.utils import update_utils |
| from cros.factory.testlog import testlog |
| from cros.factory.utils.arg_utils import Arg |
| from cros.factory.utils import file_utils |
| from cros.factory.utils import net_utils |
| from cros.factory.utils import sync_utils |
| from cros.factory.utils import type_utils |
| |
| |
| MSG_BUILD_PHASE = _('Build Phase') |
| MSG_WRITE_PROTECTION = _('Write Protection') |
| MSG_FACTORY_SERVER = _('Factory Server') |
| MSG_ENABLED = _('Enabled') |
| MSG_DISABLED = _('Disabled') |
| MSG_PREFLIGHT = _( |
| 'Running preflight tasks to prepare for finalization, please wait...') |
| MSG_FINALIZING = _('Finalizing, please wait.<br>' |
| 'Do not restart the device or terminate this test,<br>' |
| 'or the device may become unusable.') |
| |
| |
| class Finalize(test_case.TestCase): |
| """The main class for finalize pytest.""" |
| ARGS = [ |
| Arg('write_protection', bool, |
| 'Check and enable write protection.', default=None), |
| Arg('has_ectool', bool, 'Has ectool utility or not.', default=True), |
| Arg('secure_wipe', bool, |
| 'Wipe the stateful partition securely (False for a fast wipe).', |
| default=True), |
| Arg('upload_method', str, |
| 'Upload method for "gooftool finalize"', |
| default=None), |
| Arg('upload_max_retry_times', int, |
| 'Number of tries to upload. 0 to retry infinitely.', |
| default=0), |
| Arg('upload_retry_interval', int, |
| 'Retry interval in seconds between retries.', |
| default=None), |
| Arg('upload_allow_fail', bool, |
| 'Continue finalize if report upload fails, instead of raising error.', |
| default=False), |
| Arg('enable_factory_server', bool, |
| 'Perform factory server operations: update HWID data and flush event ' |
| 'logs.', default=True), |
| Arg('hwid_need_vpd', bool, |
| 'Whether the HWID validation process needs the vpd data.', |
| default=False), |
| Arg('rma_mode', bool, |
| 'Enable rma_mode, do not check for deprecated components.', |
| default=False), |
| Arg('is_cros_core', bool, |
| 'For ChromeOS Core device, skip setting firmware bitmap locale.', |
| default=False), |
| Arg('is_chromebox', bool, |
| 'Perform ChromeBox specific checks.', |
| default=None), |
| Arg('enforced_release_channels', list, |
| 'A list of string indicating the enforced release image channels. ' |
| 'Each item should be one of "dev", "beta" or "stable".', |
| default=None), |
| Arg('ec_pubkey_path', str, |
| ('Path to public key in vb2 format. Verify EC key with pubkey file.' |
| 'Verify by pubkey file should have higher priority.'), |
| default=None), |
| Arg('ec_pubkey_hash', str, |
| 'A string for public key hash. Verify EC key with the given hash.', |
| default=None), |
| Arg('use_local_gooftool', bool, |
| 'If DUT is local, use factory.par or local gooftool? If DUT is not ' |
| 'local, factory.par is always used.', default=True), |
| Arg('station_ip', str, |
| 'IP address of this station.', default=None), |
| Arg('gooftool_waive_list', list, |
| 'A list of waived checks for "gooftool finalize", ' |
| 'see "gooftool finalize --help" for available items.', |
| default=[]), |
| Arg('gooftool_skip_list', list, |
| 'A list of skipped checks for "gooftool finalize", ' |
| 'see "gooftool finalize --help" for available items.', |
| default=[]) |
| ] |
| |
| FINALIZE_TIMEOUT = 180 |
| |
| def setUp(self): |
| self.dut = device_utils.CreateDUTInterface() |
| self.force = False |
| self.go_cond = threading.Condition() |
| self.test_states_path = os.path.join(paths.DATA_LOG_DIR, 'test_states') |
| self.factory_par = deploy_utils.CreateFactoryTools(self.dut) |
| |
| # variables for remote SSH DUT |
| self.dut_response = None |
| self.response_listener = None |
| |
| def tearDown(self): |
| if self.response_listener: |
| self.response_listener.shutdown() |
| self.response_listener.server_close() |
| self.response_listener = None |
| |
| def runTest(self): |
| testlog.LogParam(name='phase', value=str(phase.GetPhase())) |
| # TODO(hungte) Should we set a percentage of units to run WP on DVT? |
| if self.args.write_protection is None: |
| self.args.write_protection = phase.GetPhase() >= phase.PVT |
| phase.AssertStartingAtPhase(phase.PVT, self.args.write_protection, |
| 'Write protection must be enabled') |
| |
| def GetState(v): |
| return (['<b style="color: green;">', MSG_ENABLED, '</b>'] |
| if v else ['<b style="color: red;">', MSG_DISABLED, '</b>']) |
| |
| self.ui.SetInstruction([ |
| MSG_WRITE_PROTECTION, ': ', |
| GetState(self.args.write_protection), '<br>', MSG_BUILD_PHASE, |
| ': %s, ' % phase.GetPhase(), MSG_FACTORY_SERVER, ': ', |
| GetState(self.args.enable_factory_server) |
| ]) |
| self.ui.SetState(MSG_PREFLIGHT) |
| self.Preflight() |
| self.ui.SetState(MSG_FINALIZING) |
| self.DoFinalize() |
| |
| def Preflight(self): |
| # Check for HWID bundle update from factory server. |
| if self.args.enable_factory_server: |
| update_utils.UpdateHWIDDatabase(self.dut) |
| self.LogTestStates() |
| self.LogImageVersion() |
| |
| def LogTestStates(self): |
| test_list = self.test_info.ReadTestList() |
| test_states = test_list.AsDict( |
| state.GetInstance().GetTestStates()) |
| file_utils.TryMakeDirs(os.path.dirname(self.test_states_path)) |
| with open(self.test_states_path, 'w') as f: |
| yaml.dump(test_states, f) |
| event_log.Log('test_states', test_states=test_states) |
| testlog.LogParam('test_states', test_states) |
| |
| def LogImageVersion(self): |
| release_image_version = self.dut.info.release_image_version |
| factory_image_version = self.dut.info.factory_image_version |
| if release_image_version: |
| logging.info('release image version: %s', release_image_version) |
| else: |
| self.FailTask('Can not determine release image version') |
| if factory_image_version: |
| logging.info('factory image version: %s', factory_image_version) |
| else: |
| self.FailTask('Can not determine factory image version') |
| event_log.Log('finalize_image_version', |
| factory_image_version=factory_image_version, |
| release_image_version=release_image_version) |
| testlog.LogParam('factory_image_version', factory_image_version) |
| testlog.LogParam('release_image_version', release_image_version) |
| |
| def _CallGoofTool(self, command): |
| """Execute a gooftool command, `command`. |
| |
| Args: |
| command: a string object which starts with 'gooftool '. |
| """ |
| assert command.startswith('gooftool ') |
| |
| if self.dut.link.IsLocal() and self.args.use_local_gooftool: |
| (out, unused_err, returncode) = gooftools.run(command) |
| # since STDERR is logged, we only need to log STDOUT |
| session.console.info('========= STDOUT ========') |
| session.console.info(out) |
| else: |
| session.console.info('call factory.par: %s', command) |
| session.console.info('=== STDOUT and STDERR ===') |
| # append STDOUT and STDERR to console log. |
| console_log_path = paths.CONSOLE_LOG_PATH |
| file_utils.TryMakeDirs(os.path.dirname(console_log_path)) |
| with open(console_log_path, 'a') as output: |
| returncode = self.factory_par.Call(command, stdout=output, |
| stderr=subprocess.STDOUT) |
| session.console.info('=========================') |
| session.console.info('return code: %d', returncode) |
| return returncode == 0 |
| |
| def Warn(self, message, times=3): |
| """Alerts user that a required test is bypassed.""" |
| for i in range(times, 0, -1): |
| session.console.warn( |
| '%s. ' |
| 'THIS DEVICE CANNOT BE QUALIFIED. ' |
| '(will continue in %d seconds)', message, i) |
| self.Sleep(1) |
| |
| def NormalizeUploadMethod(self, method): |
| """Builds the report file name and resolves variables.""" |
| if method in [None, 'none']: |
| # gooftool accepts only 'none', not empty string. |
| return 'none' |
| |
| if method == 'shopfloor': |
| method = 'shopfloor:%s#%s' % (server_proxy.GetServerURL(), |
| device_data.GetSerialNumber()) |
| logging.info('Using upload method %s', method) |
| |
| return method |
| |
| def DoFinalize(self): |
| upload_method = self.NormalizeUploadMethod(self.args.upload_method) |
| |
| command = 'gooftool -v 4 finalize' |
| |
| if self.args.enable_factory_server: |
| state.GetInstance().FlushEventLogs() |
| |
| if not self.args.write_protection: |
| self.Warn('WRITE PROTECTION IS DISABLED.') |
| command += ' --no_write_protect' |
| if not self.args.has_ectool: |
| command += ' --no_ectool' |
| if not self.args.secure_wipe: |
| command += ' --fast' |
| |
| if self.args.enable_factory_server: |
| server_url = server_proxy.GetServerURL() |
| if server_url: |
| command += ' --shopfloor_url "%s"' % server_url |
| |
| command += ' --upload_method "%s"' % upload_method |
| if self.args.upload_max_retry_times: |
| command += ' --upload_max_retry_times %s' % ( |
| self.args.upload_max_retry_times) |
| if self.args.upload_retry_interval is not None: |
| command += ' --upload_retry_interval %s' % self.args.upload_retry_interval |
| if self.args.upload_allow_fail: |
| command += ' --upload_allow_fail' |
| command += ' --add_file "%s"' % self.test_states_path |
| if self.args.hwid_need_vpd: |
| command += ' --hwid-run-vpd' |
| if self.args.rma_mode: |
| command += ' --rma_mode' |
| logging.info('Using RMA mode. Accept deprecated components') |
| if self.args.is_cros_core: |
| command += ' --cros_core' |
| logging.info('ChromeOS Core device. Skip some check.') |
| if self.args.is_chromebox: |
| command += ' --chromebox' |
| logging.info('ChromeBox device. Perform additional checks.') |
| if self.args.enforced_release_channels: |
| command += ' --enforced_release_channels %s' % ( |
| ' '.join(self.args.enforced_release_channels)) |
| logging.info( |
| 'Enforced release channels: %s.', self.args.enforced_release_channels) |
| if self.args.ec_pubkey_path: |
| command += ' --ec_pubkey_path %s' % self.args.ec_pubkey_path |
| elif self.args.ec_pubkey_hash: |
| command += ' --ec_pubkey_hash %s' % self.args.ec_pubkey_hash |
| if self.args.gooftool_waive_list: |
| command += ' --waive_list ' + ' '.join(self.args.gooftool_waive_list) |
| if self.args.gooftool_skip_list: |
| command += ' --skip_list ' + ' '.join(self.args.gooftool_skip_list) |
| command += ' --phase "%s"' % phase.GetPhase() |
| |
| self._FinalizeWipeInPlace(command) |
| |
| def _FinalizeWipeInPlace(self, command): |
| if self.dut.link.IsLocal(): |
| self._CallGoofTool(command) |
| # Wipe-in-place will terminate all processes that are using stateful |
| # partition, this test should be killed at here. |
| self.Sleep(self.FINALIZE_TIMEOUT) |
| raise type_utils.TestFailure('DUT Failed to finalize in %d seconds' % |
| self.FINALIZE_TIMEOUT) |
| elif isinstance(self.dut.link, ssh.SSHLink): |
| # For remote SSH DUT, we ask DUT to send wipe log back. |
| self._FinalizeRemoteSSHDUT(command) |
| else: |
| # For other remote links, we only checks if it has lost connection in |
| # @self.FINALIZE_TIMEOUT seconds |
| self._CallGoofTool(command) |
| try: |
| sync_utils.WaitFor(lambda: not self.dut.IsReady(), |
| self.FINALIZE_TIMEOUT, |
| poll_interval=1) |
| except type_utils.TimeoutError: |
| raise type_utils.TestFailure( |
| 'Remote DUT failed to finalize in %d seconds' % |
| self.FINALIZE_TIMEOUT) |
| |
| def _FinalizeRemoteSSHDUT(self, command): |
| # generate a random token, so the response is different for every DUT. |
| token = "{:016x}".format(random.getrandbits(64)) |
| |
| dut_finished = threading.Event() |
| self.dut_response = None |
| |
| def _Callback(handler): |
| """Receive and verify DUT message. |
| |
| Args: |
| :type handler: SocketServer.StreamRequestHandler |
| """ |
| try: |
| dut_response = json.loads(handler.rfile.readline()) |
| if dut_response['token'] == token: |
| self.dut_response = dut_response |
| dut_finished.set() |
| # otherwise, the reponse is invalid, just ignore it |
| except Exception: |
| pass |
| |
| # Start response listener |
| self.response_listener = net_utils.CallbackSocketServer(_Callback) |
| server_thread = threading.Thread( |
| target=self.response_listener.serve_forever) |
| server_thread.daemon = True |
| server_thread.start() |
| |
| # If station IP is not given, we assume that this station is the first host |
| # in the subnet, and number of prefix bits in this subnet is 24. |
| station_ip = (self.args.station_ip or |
| net_utils.CIDR(str(self.dut.link.host), 24).SelectIP(1)) |
| command += ' --station_ip "%s"' % station_ip |
| command += ' --station_port %d' % self.response_listener.server_address[1] |
| command += ' --wipe_finish_token "%s"' % token |
| |
| if not self._CallGoofTool(command): |
| raise type_utils.TestFailure('finalize command failed') |
| |
| session.console.info('wait DUT to finish wiping') |
| |
| if not dut_finished.wait(self.FINALIZE_TIMEOUT): |
| raise type_utils.TestFailure( |
| 'Remote DUT not response in %d seconds' % self.FINALIZE_TIMEOUT) |
| |
| # save log files in test data directory |
| output_dir = os.path.join( |
| paths.DATA_TESTS_DIR, session.GetCurrentTestPath()) |
| with open(os.path.join(output_dir, 'wipe_in_tmpfs.log'), 'w') as f: |
| f.write(self.dut_response.get('wipe_in_tmpfs_log', '')) |
| with open(os.path.join(output_dir, 'wipe_init.log'), 'w') as f: |
| f.write(self.dut_response.get('wipe_init_log', '')) |
| |
| self.assertTrue(self.dut_response['success']) |