| #!/usr/bin/env python3 |
| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging |
| |
| |
| import common |
| from client.common_lib import error |
| from client.common_lib import utils as client_utils |
| from server.cros.storage import storage_validate as storage |
| from server.cros.servo.keyboard import servo_keyboard_flasher |
| from server.cros.repair import mac_address_helper |
| from site_utils.admin_audit import base |
| from site_utils.admin_audit import constants |
| from site_utils.admin_audit import servo_updater |
| |
| |
| metrics = client_utils.metrics_mock |
| |
| # Common status used for statistics. |
| STATUS_FAIL = 'fail' |
| STATUS_SUCCESS = 'success' |
| STATUS_SKIPPED = 'skipped' |
| |
| |
| class VerifyDutStorage(base._BaseDUTVerifier): |
| """Verify the state of the storage on the DUT |
| |
| The process to determine the type of storage and read metrics |
| of usage and EOL(end-of-life) information to determine the |
| state. |
| Supported storage types: MMS, NVME, SSD. |
| Possible states are: |
| UNKNOWN - not access to the DUT, not determine type of storage, |
| not information to determine metrics |
| NORMAL - the storage is in good shape and will work stable |
| device will work stable. (supported for all types) |
| ACCEPTABLE - the storage almost used all resources, device will |
| work stable but it is better be ready for replacement |
| device will work stable. (supported by MMS, NVME) |
| NEED_REPLACEMENT - the storage broken or worn off the life limit |
| device can work by not stable and can cause the |
| flakiness on the tests. (supported by all types) |
| """ |
| def __init__(self, dut_host): |
| super(VerifyDutStorage, self).__init__(dut_host) |
| self._state = None |
| |
| def _verify(self, set_label=True, run_badblocks=None): |
| if not self.host_is_up(): |
| logging.info('Host is down; Skipping the verification') |
| return |
| try: |
| validator = storage.StorageStateValidator(self.get_host()) |
| storage_type = validator.get_type() |
| logging.debug('Detected storage type: %s', storage_type) |
| storage_state = validator.get_state(run_badblocks=run_badblocks) |
| logging.debug('Detected storage state: %s', storage_state) |
| state = self.convert_state(storage_state) |
| if state and set_label: |
| self._set_host_info_state(constants.DUT_STORAGE_STATE_PREFIX, |
| state) |
| if state == constants.HW_STATE_NEED_REPLACEMENT: |
| self.get_host().set_device_needs_replacement( |
| resultdir=self.get_result_dir()) |
| self._state = state |
| except Exception as e: |
| raise base.AuditError('Exception during getting state of' |
| ' storage %s' % str(e)) |
| |
| def convert_state(self, state): |
| """Mapping state from validator to verifier""" |
| if state == storage.STORAGE_STATE_NORMAL: |
| return constants.HW_STATE_NORMAL |
| if state == storage.STORAGE_STATE_WARNING: |
| return constants.HW_STATE_ACCEPTABLE |
| if state == storage.STORAGE_STATE_CRITICAL: |
| return constants.HW_STATE_NEED_REPLACEMENT |
| return None |
| |
| def get_state(self): |
| return self._state |
| |
| |
| class VerifyServoUsb(base._BaseServoVerifier): |
| """Verify the state of the USB-drive on the Servo |
| |
| The process to determine by checking the USB-drive on having any |
| bad sectors on it. |
| Possible states are: |
| UNKNOWN - not access to the device or servo, not available |
| software on the servo. |
| NORMAL - the device available for testing and not bad sectors. |
| was found on it, device will work stable |
| NEED_REPLACEMENT - the device available for testing and |
| some bad sectors were found on it. The device can |
| work but cause flakiness in the tests or repair process. |
| |
| badblocks errors: |
| No such device or address while trying to determine device size |
| """ |
| def _verify(self): |
| if not self.servo_is_up(): |
| logging.info('Servo not initialized; Skipping the verification') |
| return |
| try: |
| usb = self.get_host()._probe_and_validate_usb_dev() |
| logging.debug('USB path: %s', usb) |
| except Exception as e: |
| usb = '' |
| logging.debug('(Not critical) %s', e) |
| if not usb: |
| self._set_state(constants.HW_STATE_NOT_DETECTED) |
| return |
| # basic readonly check |
| |
| # path to USB if DUT is sshable |
| logging.info('Starting verification of USB drive...') |
| dut_usb = None |
| if self.host_is_up(): |
| dut_usb = self._usb_path_on_dut() |
| state = None |
| try: |
| if dut_usb: |
| logging.info('Try run check on DUT side.') |
| state = self._run_check_on_host(self._dut_host, dut_usb) |
| else: |
| logging.info('Try run check on ServoHost side.') |
| servo = self.get_host().get_servo() |
| servo_usb = servo.probe_host_usb_dev() |
| state = self._run_check_on_host(self.get_host(), servo_usb) |
| except Exception as e: |
| if 'Timeout encountered:' in str(e): |
| logging.info('Timeout during running action') |
| metrics.Counter( |
| 'chromeos/autotest/audit/servo/usb/timeout' |
| ).increment(fields={'host': self._dut_host.hostname}) |
| else: |
| # badblocks generate errors when device not reachable or |
| # cannot read system information to execute process |
| state = constants.HW_STATE_NEED_REPLACEMENT |
| logging.debug(str(e)) |
| |
| self._set_state(state) |
| logging.info('Finished verification of USB drive.') |
| |
| self._install_stable_image() |
| |
| def _usb_path_on_dut(self): |
| """Return path to the USB detected on DUT side.""" |
| servo = self.get_host().get_servo() |
| servo.switch_usbkey('dut') |
| result = self._dut_host.run('ls /dev/sd[a-z]') |
| for path in result.stdout.splitlines(): |
| cmd = ('. /usr/share/misc/chromeos-common.sh; get_device_type %s' % |
| path) |
| check_run = self._dut_host.run(cmd, timeout=30, ignore_status=True) |
| if check_run.stdout.strip() != 'USB': |
| continue |
| if self._quick_check_if_device_responsive(self._dut_host, path): |
| logging.info('USB drive detected on DUT side as %s', path) |
| return path |
| return None |
| |
| def _quick_check_if_device_responsive(self, host, usb_path): |
| """Verify that device """ |
| validate_cmd = 'fdisk -l %s' % usb_path |
| try: |
| resp = host.run(validate_cmd, ignore_status=True, timeout=30) |
| if resp.exit_status == 0: |
| return True |
| logging.error('USB %s is not detected by fdisk!', usb_path) |
| except error.AutoservRunError as e: |
| if 'Timeout encountered' in str(e): |
| logging.warning('Timeout encountered during fdisk run.') |
| else: |
| logging.error('(Not critical) fdisk check fail for %s; %s', |
| usb_path, str(e)) |
| return False |
| |
| def _run_check_on_host(self, host, usb): |
| """Run badblocks on the provided host. |
| |
| @params host: Host where USB drive mounted |
| @params usb: Path to USB drive. (e.g. /dev/sda) |
| """ |
| command = 'badblocks -w -e 5 -b 4096 -t random %s' % usb |
| logging.info('Running command: %s', command) |
| # The response is the list of bad block on USB. |
| # Extended time for 2 hour to run USB verification. |
| # TODO (otabek@) (b:153661014#comment2) bring F3 to run |
| # check faster if badblocks cannot finish in 2 hours. |
| result = host.run(command, timeout=7200).stdout.strip() |
| logging.info("Check result: '%s'", result) |
| if result: |
| # So has result is Bad and empty is Good. |
| return constants.HW_STATE_NEED_REPLACEMENT |
| return constants.HW_STATE_NORMAL |
| |
| def _install_stable_image(self): |
| """Install stable image to the USB drive.""" |
| # install fresh image to the USB because badblocks formats it |
| # https://crbug.com/1091406 |
| try: |
| logging.debug('Started to install test image to USB-drive') |
| _, image_path = self._dut_host.stage_image_for_servo() |
| self.get_host().get_servo().image_to_servo_usb(image_path, |
| power_off_dut=False) |
| logging.debug('Finished installing test image to USB-drive') |
| except: |
| # ignore any error which happined during install image |
| # it not relative to the main goal |
| logging.info('Fail to install test image to USB-drive') |
| |
| def _set_state(self, state): |
| if state: |
| self._set_host_info_state(constants.SERVO_USB_STATE_PREFIX, state) |
| |
| |
| class VerifyServoFw(base._BaseServoVerifier): |
| """Force update Servo firmware if it not up-to-date. |
| |
| This is rarely case when servo firmware was not updated by labstation |
| when servod started. This should ensure that the servo_v4 and |
| servo_micro is up-to-date. |
| """ |
| def _verify(self): |
| if not self.servo_host_is_up(): |
| logging.info('Servo host is down; Skipping the verification') |
| return |
| servo_updater.update_servo_firmware( |
| self.get_host(), |
| force_update=True) |
| |
| |
| class FlashServoKeyboardMapVerifier(base._BaseDUTVerifier): |
| """Flash the keyboard map on servo.""" |
| |
| def _verify(self): |
| if not self.host_is_up(): |
| raise base.AuditError('Host is down') |
| if not self.servo_is_up(): |
| raise base.AuditError('Servo not initialized') |
| |
| host = self.get_host() |
| flasher = servo_keyboard_flasher.ServoKeyboardMapFlasher() |
| if flasher.is_image_supported(host): |
| flasher.update(host) |
| |
| |
| class VerifyDUTMacAddress(base._BaseDUTVerifier): |
| """Verify and update cached NIC mac address on servo. |
| |
| Servo_v4 plugged to the DUT and providing NIC for that. We caching mac |
| address on servod side to better debugging. |
| """ |
| |
| def _verify(self): |
| if not self.host_is_up(): |
| raise base.AuditError('Host is down.') |
| if not self.servo_is_up(): |
| raise base.AuditError('Servo host is down.') |
| |
| helper = mac_address_helper.MacAddressHelper() |
| helper.update_if_needed(self.get_host()) |