| # -*- coding: utf-8 -*- |
| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import asyncio |
| import docker |
| import google.api_core |
| import os |
| import re |
| import requests |
| import subprocess |
| import time |
| import logging |
| import json |
| |
| from collections import namedtuple, defaultdict |
| from datetime import datetime |
| from google.cloud import storage |
| from google.protobuf.field_mask_pb2 import FieldMask |
| |
| from packaging.version import Version |
| |
| from moblab_common import build_connector |
| from moblab_common import dut_connector |
| from moblab_common import host_connector |
| from moblab_common import moblab_build_connector |
| |
| from moblab_common import afe_connector |
| from moblab_common import config_connector |
| from moblab_common import result_upload_status_connector |
| from dut_manager_connector import ( |
| DutManagerRpcConnector, |
| DutManagerRpcConnectorAsync, |
| ) |
| |
| from host_scheduler_pause_service import HostSchedulerPauseService |
| |
| from lab.managed_dut_pb2 import ManagedDut, Pool |
| from chromiumos.test.api.dut_attribute_pb2 import DutAttribute |
| from cache import Cache |
| |
| MilestonePair = namedtuple("MilestonePair", ["milestone", "is_staged"]) |
| BuildVersionPair = namedtuple( |
| "BuildVersionPair", ["build_version", "status", "is_staged", "labels"] |
| ) |
| BuildResults = namedtuple("BuildResults", ["results", "is_incomplete"]) |
| |
| _DEFAULT_SUITE_TIMEOUT_MINS = 1440 |
| _SUITE_TIMEOUT_MAP = { |
| "hardware_storagequal": 40320, |
| "hardware_storagequal_quick": 40320, |
| "storage_qual_v2_xs": 2160, # 1.5 days |
| "storage_qual_v2_s": 4320, # 3 days |
| "storage_qual_v2_m": 7200, # 5 days |
| "storage_qual_v2_l": 15840, # 11 days |
| "storage_qual_v2_xl": 28800, # 20 days |
| "cts": 2880, # 2 days |
| "gts": 2880, # 2 days |
| "performance_cuj": 10080, # 7 days |
| "performance_cuj_experimental": 10080, # 7 days |
| "mtbf": 14400, # 10 days |
| } |
| |
| _STORAGE_QUAL_VARIATIONS = { |
| 0: "hardware_storagequal", |
| 1: "hardware_storagequal_quick", |
| 2: "hardware_storagequal_external", |
| 3: "storage_qual_v2_quick", |
| 4: "storage_qual_v2_xs", |
| 5: "storage_qual_v2_s", |
| 6: "storage_qual_v2_m", |
| 7: "storage_qual_v2_l", |
| 8: "storage_qual_v2_xl", |
| } |
| |
| HARDWARE_AVL_BUG_ID = "bug_id" |
| HARDWARE_AVL_PART_NO = "part_id" |
| |
| STORAGE_QUAL_DISK_SIZE_GB = "tast_disk_size_gb" |
| STORAGE_QUAL_DUAL_NAMESPACE = "tast_storage_slc_qual" |
| STORAGE_QUAL_BLOCK_TIMEOUT = "tast_stress_block_timeout" |
| STORAGE_QUAL_FOLLOW_UP = "tast_followup_qual" |
| |
| MOBLAB_DOCKER_REGISTRY = "gcr.io/chromeos-partner-moblab/" |
| |
| MAX_BUILD_STAGING_SECONDS = 300 # 5 minutes |
| |
| DUT_MANAGER_TO_AFE_DUT_STATUS_MAPPINGS = { |
| ManagedDut.ManagedState.VERIFY: "Verifying", |
| ManagedDut.ManagedState.LEASED: "Running", |
| ManagedDut.ManagedState.READY: "Ready", |
| ManagedDut.ManagedState.FAILED: "Repair Failed", |
| ManagedDut.ManagedState.RESET: "Resetting", |
| ManagedDut.ManagedState.PROVISION: "Provisioning", |
| ManagedDut.ManagedState.UNKNOWN: "Unknown", |
| } |
| |
| _USE_DUT_MANAGER_FEATURE_FLAG = True # True |
| |
| |
| class MoblabRpcError(Exception): |
| pass |
| |
| |
| class LabelsParser(object): |
| _model_re = re.compile(r"^model:(.*)") |
| _build_target_re = re.compile(r"^board:(.*)") |
| _pool_re = re.compile(r"^pool:(.*)") |
| _build_target_model_re = re.compile(r"^buildTargets/(\w+)/models/(\w+)$") |
| |
| @classmethod |
| def _extract_from_labels(cls, regexp, labels): |
| result = [ |
| m.group(1) for m in [regexp.match(label) for label in labels] if m |
| ] |
| return list(set(result)) |
| |
| @classmethod |
| def extract_model_from_labels(cls, labels): |
| try: |
| return cls._extract_from_labels(cls._model_re, labels)[0] |
| except IndexError: |
| logging.exception("Failed to parse model from label tags of DUT.") |
| return "" |
| |
| @classmethod |
| def extract_build_target_from_labels(cls, labels): |
| try: |
| return cls._extract_from_labels(cls._build_target_re, labels)[0] |
| except IndexError: |
| logging.exception( |
| "Failed to parse build target from label tags of DUT." |
| ) |
| return "" |
| |
| @classmethod |
| def extract_pool_from_labels(cls, labels): |
| return cls._extract_from_labels(cls._pool_re, labels) |
| |
| @classmethod |
| def extrat_model_build_target_from_api_path(cls, models): |
| """Extracts model & build target from moblab API listModels path""" |
| build_target, model = None, None |
| match = cls._build_target_model_re.match(models) |
| if match: |
| build_target, model = match.group(1), match.group(2) |
| return (build_target, model) |
| |
| @classmethod |
| def extract_model_and_build_target_from_labels(cls, labels): |
| """Extract model & build target from labels""" |
| model = None |
| board = None |
| |
| for label in labels: |
| model = ( |
| cls._model_re.match(label).group(1) |
| if cls._model_re.match(label) |
| else model |
| ) |
| board = ( |
| cls._build_target_re.match(label).group(1) |
| if cls._build_target_re.match(label) |
| else board |
| ) |
| |
| return (model, board) |
| |
| |
| class MoblabService(object): |
| |
| _last_config_update = None |
| _last_build_connector_update = None |
| _last_moblab_build_connector_update = None |
| |
| def __init__(self): |
| self.afe_connector = afe_connector.AFEConnector() |
| self.config_connector = config_connector.MoblabConfigConnector( |
| self.afe_connector |
| ) |
| self.setup_devserver_connector() |
| self.dut_connector = dut_connector.MoblabDUTConnector() |
| self.pause_service = HostSchedulerPauseService() |
| self.upload_status_connector = ( |
| result_upload_status_connector.ResultUploadStatusConnector() |
| ) |
| self.milestone_cache = Cache(maxsize=50) |
| self.build_version_cache = Cache(maxsize=30) |
| |
| def _find_dut_id_given_ip(self, dut_hostname): |
| """Given an ip address, will return the id given to the DUT by AFE. |
| Raises: |
| MoblabRpcError: if unable to find the DUT. |
| """ |
| try: |
| return self.afe_connector.get_connected_devices( |
| hostname=dut_hostname |
| )[0]["id"] |
| except (IndexError, KeyError): |
| msg = "Unable to find AFE id for dut: {}".format(dut_hostname) |
| logging.error(msg) |
| raise MoblabRpcError(msg) |
| |
| def setup_devserver_connector(self): |
| self.moblab_bucket_name = self.config_connector.get_cloud_bucket() |
| logging.info("Using bucket: %s", self.moblab_bucket_name) |
| self._init_build_connector(raise_error=False) |
| |
| def _init_build_connector(self, raise_error=True): |
| try: |
| self._build_connector = build_connector.MoblabBuildConnector( |
| self.moblab_bucket_name |
| ) |
| except build_connector.MoblabBuildConnectorException: |
| logging.exception("Failed to initialize MoblabBuildConnector.") |
| if raise_error: |
| raise |
| |
| def _init_moblab_build_connector(self, raise_error=True): |
| """ |
| Creates a MoblabBuildConnector for Moblab API calls to get build items. |
| Args: |
| raise_error: if true, will raise any exception encountered during |
| init attempt. |
| Returns: |
| MoblabBuildConnector. |
| """ |
| try: |
| self._moblab_build_connector = ( |
| moblab_build_connector.MoblabBuildConnector() |
| ) |
| except build_connector.MoblabBuildConnectorException: |
| logging.exception("Failed to initialize MoblabBuildConnector.") |
| if raise_error: |
| raise |
| |
| @property |
| def build_connector(self): |
| """ |
| Returns a build_connector.MoblabBuildConnector instance ( distinct from |
| moblab_build_connector.MoblabBuildConnector ), |
| |
| TODO(wangmichael) change naming so that difference between two |
| connectors is more clear. |
| """ |
| if ( |
| not hasattr(self, "_build_connector") |
| or not self._build_connector |
| or self._last_config_update != self._last_build_connector_update |
| ): |
| self.moblab_bucket_name = self.config_connector.get_cloud_bucket( |
| force_reload=True |
| ) |
| self._init_build_connector() |
| self._last_build_connector_update = self._last_config_update |
| return self._build_connector |
| |
| @property |
| def moblab_build_connector(self): |
| """ |
| Returns a moblab.build_connector.MoblabBuildConnector instance |
| ( distinct from build_connector.MoblabBuildConnector ), |
| """ |
| if ( |
| not hasattr(self, "_moblab_build_connector") |
| or not self._moblab_build_connector |
| or self._last_config_update |
| != self._last_moblab_build_connector_update |
| ): |
| self.moblab_bucket_name = self.config_connector.get_cloud_bucket( |
| force_reload=True |
| ) |
| self._init_moblab_build_connector() |
| self._last_moblab_build_connector_update = self._last_config_update |
| |
| return self._moblab_build_connector |
| |
| def _is_moblab_bucket_in_asia(self): |
| if not self.moblab_bucket_name: |
| raise MoblabRpcError( |
| ( |
| "Attempted to check bucket location while bucket has not" |
| " been configured." |
| ) |
| ) |
| return self._build_connector.is_moblab_bucket_in_asia( |
| self.moblab_bucket_name |
| ) |
| |
| def get_connected_devices(self): |
| connected_duts = self.dut_connector.get_connected_duts() |
| afe_formatted_duts = [] |
| for dut in connected_duts: |
| afe_formatted_duts.append( |
| {"ip": dut[0], "isConnected": dut[1], "error": dut[2]} |
| ) |
| return afe_formatted_duts |
| |
| def _convert_managed_dut_to_dut_dict(self, managed_dut): |
| dut = {} |
| dut["hostname"] = managed_dut.name.ip_address |
| dut["status"] = DUT_MANAGER_TO_AFE_DUT_STATUS_MAPPINGS[ |
| managed_dut.state |
| ] |
| |
| labels = [ |
| a.id.value |
| for a in managed_dut.tag.dut_attributes |
| if not a.field_path |
| ] |
| attributes = { |
| a.id.value: a.field_path |
| for a in managed_dut.tag.dut_attributes |
| if a.field_path |
| } |
| dut["labels"] = labels |
| dut["attributes"] = attributes |
| return dut |
| |
| async def _list_duts_from_dut_manager(self): |
| managed_duts = await DutManagerRpcConnectorAsync.list_managed_duts() |
| duts = [] |
| for managed_dut in managed_duts: |
| dut = self._convert_managed_dut_to_dut_dict(managed_dut) |
| duts.append(dut) |
| return duts |
| |
| async def list_duts(self): |
| """Returns the merge of enrolled and connected DUTs""" |
| managed_duts = ( |
| await self._list_duts_from_dut_manager() |
| if _USE_DUT_MANAGER_FEATURE_FLAG |
| else self.afe_connector.get_connected_devices() |
| ) |
| potential_hosts = self.get_connected_devices() |
| connected_ips = [] |
| connected_duts = [] |
| |
| for dut in managed_duts: |
| dut["enrolled"] = True |
| connected_ips.append(dut["hostname"]) |
| connected_duts.append(dut) |
| |
| for dut in potential_hosts: |
| if dut["ip"] not in connected_ips: |
| dut["hostname"] = dut["ip"] |
| dut["enrolled"] = False |
| dut["status"] = "Not Enrolled" |
| connected_duts.append(dut) |
| |
| self._hydrate_dut_details_with_mac(connected_duts) |
| |
| return connected_duts |
| |
| def _default_suite_timeout_mins(self, suite): |
| if suite in _SUITE_TIMEOUT_MAP: |
| return _SUITE_TIMEOUT_MAP[suite] |
| else: |
| return _DEFAULT_SUITE_TIMEOUT_MINS |
| |
| def run_storage_qual_suite( |
| self, |
| build_target, |
| model, |
| milestone, |
| build_version, |
| bug_id, |
| part_id, |
| variation, |
| pool=None, |
| disk_size_gb=0, |
| is_dual_namespace=False, |
| is_pre_qualified=False, |
| ): |
| """Starts storage qual suite run. Formats arguments s.t. they can be |
| ingested by AFE. |
| |
| Args: |
| build_target: Ex. octopus |
| model: String identifier for the specific model to be run. |
| milestone: String, Ex. R80 |
| build_version: A numeric version string of the form: '00000.00.0' |
| bug_id: Numeric string identifying the associated |
| buganizer issue. |
| part_id: Numeric string identifying relevant part number. |
| variation: RunStorageQualificationSuiteRequest.Variation |
| pool: String identifier for which pool of DUT's to run tests with. |
| disk_size_gb: Size of main storage disk (in whole GB). |
| is_dual_namespace: Dual-namespace test requested. |
| is_pre_qualified: Has the component been previously qualified. |
| Returns: |
| String response message. |
| """ |
| avl_args = { |
| HARDWARE_AVL_BUG_ID: bug_id, |
| HARDWARE_AVL_PART_NO: part_id, |
| STORAGE_QUAL_DISK_SIZE_GB: disk_size_gb, |
| STORAGE_QUAL_DUAL_NAMESPACE: is_dual_namespace, |
| } |
| # reduce the time block and specify to tast that it is follow up |
| # if the component is pre-qualified |
| if is_pre_qualified: |
| avl_args[STORAGE_QUAL_BLOCK_TIMEOUT] = "30m" |
| avl_args[STORAGE_QUAL_FOLLOW_UP] = True |
| |
| return self.run_suite( |
| build_target, |
| model, |
| milestone, |
| build_version, |
| _STORAGE_QUAL_VARIATIONS[variation], |
| pool, |
| test_args=avl_args, |
| ) |
| |
| def run_memory_qual_suite( |
| self, |
| build_target, |
| model, |
| milestone, |
| build_version, |
| bug_id, |
| part_id, |
| pool=None, |
| ): |
| """Starts memory qual suite run. Formats arguments s.t. they can be |
| ingested by AFE. |
| |
| Args: |
| build_target: Ex. octopus |
| model: String identifer for the specific model to be run. |
| milestone: String, Ex. R80 |
| build_version: A numeric version string of the form: '00000.00.0' |
| bug_id: Numeric string identifying the associated buganizer issue. |
| part_id: Numeric string identifying relevant part number. |
| pool: String identifier for which pool of DUT's to run tests with. |
| Returns: |
| String response message. |
| """ |
| avl_args = {HARDWARE_AVL_BUG_ID: bug_id, HARDWARE_AVL_PART_NO: part_id} |
| return self.run_suite( |
| build_target, |
| model, |
| milestone, |
| build_version, |
| "hardware_memoryqual", # suite |
| pool, |
| test_args=avl_args, |
| ) |
| |
| def run_cts_suite( |
| self, |
| build_target, |
| model, |
| milestone, |
| build_version, |
| suite, |
| pool=None, |
| autotest_tests=None, |
| ): |
| """Starts cts suite run. Formats arguments s.t. they can be ingested by |
| AFE. |
| |
| Args: |
| build_target: Ex. octopus |
| milestone: Ex. R80 |
| build_version: A numeric version string of the form: '00000.00.0' |
| model: String identfier for the specific model to be run. |
| pool: String identifier for which pool of DUT's to run tests with. |
| specific_modules: List of strings of specific module names to |
| run. |
| Returns: |
| String response message. |
| """ |
| |
| suite_args = {} |
| if autotest_tests and len(autotest_tests): |
| suite_args["tests"] = [test.strip() for test in autotest_tests] |
| |
| return self.run_suite( |
| build_target, |
| model, |
| milestone, |
| build_version, |
| suite, |
| pool, |
| suite_args=suite_args, |
| ) |
| |
| def run_gts_suite( |
| self, |
| build_target, |
| model, |
| milestone, |
| build_version, |
| pool=None, |
| autotest_tests=None, |
| ): |
| """Starts gts suite run. Formats arguments s.t. they can be ingested by |
| AFE. |
| |
| Args: |
| build_target: Ex. octopus |
| milestone: Ex. R80 |
| build_version: A numeric version string of the form: '00000.00.0' |
| model: String identifer for the specific model to be run. |
| pool: String identifier for which pool of DUT's to run tests with. |
| specific_modules: Comma-separated string of specific module names |
| to run. |
| Returns: |
| String response message. |
| """ |
| suite_args = {} |
| if autotest_tests: |
| suite_args["tests"] = [test.strip() for test in autotest_tests] |
| |
| return self.run_suite( |
| build_target, |
| model, |
| milestone, |
| build_version, |
| "gts", # suite |
| pool, |
| suite_args=suite_args, |
| ) |
| |
| def run_faft_suite( |
| self, |
| build_target, |
| model, |
| milestone, |
| build_version, |
| suite, |
| pool=None, |
| tests=None, |
| ): |
| """Starts faft suite run. Formats arguments s.t. they can be ingested by |
| AFE. |
| |
| Args: |
| build_target: Ex. octopus |
| milestone: Ex. R80 |
| build_version: A numeric version string of the form: '00000.00.0' |
| model: String identfier for the specific model to be run. |
| pool: String identifier for which pool of DUT's to run tests with. |
| tests: List of strings of specific test names to |
| run. |
| Returns: |
| String response message. |
| """ |
| |
| suite_args = {} |
| if tests and len(tests): |
| suite_args["tests"] = [test.strip() for test in tests] |
| |
| return self.run_suite( |
| build_target, |
| model, |
| milestone, |
| build_version, |
| suite, |
| pool, |
| suite_args=suite_args, |
| ) |
| |
| def run_fwupd_suite( |
| self, |
| build_target, |
| model, |
| milestone, |
| build_version, |
| suite, |
| pool=None, |
| test_args=None, |
| ): |
| """Starts fwupd suite run. Formats arguments s.t. they can be ingested by |
| AFE. |
| |
| Args: |
| build_target: Ex. octopus |
| milestone: Ex. R80 |
| build_version: A numeric version string of the form: '00000.00.0' |
| model: String identfier for the specific model to be run. |
| pool: String identifier for which pool of DUT's to run tests with. |
| test_args: List of strings of specific test names to |
| run. |
| Returns: |
| String response message. |
| """ |
| |
| autotest_args = {} |
| # this is expected to work, but is tested for now |
| if test_args and len(test_args): |
| for test_arg in test_args: |
| [key, val] = test_arg.split("=", 1) |
| autotest_args[key] = val |
| |
| return self.run_suite( |
| build_target, |
| model, |
| milestone, |
| build_version, |
| suite, |
| pool, |
| suite_args=autotest_args, |
| test_args=autotest_args, |
| ) |
| |
| def run_suite( |
| self, |
| build_target, |
| model, |
| milestone, |
| build_version, |
| suite, |
| pool=None, |
| suite_args={}, |
| test_args={}, |
| suite_timeout_mins=None, |
| extra_builds={}, |
| ): |
| """Starts suite run. Formats input arguments s.t. that they can be used |
| by an AFE suite run command. |
| |
| Args: |
| build_target: Ex. octopus |
| milestone: Numeric string Ex. 80 |
| build_version: A numeric version string of the form: '00000.00.0' |
| suite: String identifier for the suite to be run. |
| model: String identifer for the specific model to be run. |
| pool: String identifier for which pool of DUT's to run tests with. |
| suite_args: Dict containing key=val pairs to be used by suite |
| control file. |
| test_args: Dict containing key=val pairs to be used by test control |
| file. |
| Returns: |
| String response message. |
| """ |
| |
| # additional tags -d/-a will appear in the build_version for the local |
| # builds. Quick way to check if it is local build |
| # Example for local build: R102-14590.0.0-d022_03_22_2322-a1 |
| # try jobs does not have these -d/-a tags. So moving to a less |
| # stricter check |
| if "-" in build_version or "_" in build_version: |
| build = "{}-local/R{}-{}".format( |
| build_target, milestone, build_version |
| ) |
| logging.info( |
| "maybe this is a local build version, going to check in folder:" |
| + build |
| ) |
| else: |
| build = "{}-release/R{}-{}".format( |
| build_target, milestone, build_version |
| ) |
| |
| if not self._is_build_staged( |
| build_version, build_target, milestone, model |
| ): |
| self._stage_build(model, build_target, build_version) |
| |
| builds = {"cros-version": build} |
| |
| builds.update(extra_builds) |
| |
| if not suite_timeout_mins: |
| suite_timeout_mins = self._default_suite_timeout_mins(suite) |
| |
| dut_wifi_info = self.get_dut_wifi_info() |
| ap_name = dut_wifi_info.get("wifi_dut_ap_name", None) |
| ap_pass = dut_wifi_info.get("wifi_dut_ap_pass", "") |
| |
| test_args["ssid"] = ap_name |
| test_args["wifipass"] = ap_pass |
| |
| suite_job_id, error = self.afe_connector.run_suite( |
| build_target, |
| build, |
| builds, |
| suite, |
| suite_timeout_mins, |
| pool, |
| model, |
| suite_args=suite_args, |
| test_args=test_args, |
| ) |
| |
| if not suite_job_id: |
| raise MoblabRpcError("Encountered error starting suite") |
| |
| return "Suite started (suite job ID: {})".format(suite_job_id) |
| |
| async def provision_duts(self, pool, milestone, build_version): |
| ModelBoard = namedtuple("ModelBoard", ["model", "board"]) |
| |
| boards = {} |
| duts = await self.list_duts() |
| # filter only enrolled duts. Red dut shouldn't be enrolled. |
| duts = filter(lambda x: x.get("enrolled"), duts) |
| available_boards = self.list_build_targets() |
| |
| for dut in duts: |
| labels = dut.get("labels", []) |
| pools = LabelsParser.extract_pool_from_labels(labels) |
| model = LabelsParser.extract_model_from_labels(labels) |
| board = LabelsParser.extract_build_target_from_labels(labels) |
| |
| # filter yellow duts. |
| if not board or (board and board not in available_boards): |
| continue |
| |
| if pool and pool not in pools: |
| continue |
| |
| modelBoardPair = ModelBoard(model=model, board=board) |
| if modelBoardPair in boards: |
| boards[modelBoardPair] += 1 |
| else: |
| boards[modelBoardPair] = 1 |
| |
| messages = [] |
| for modelBoardPair, number in boards.items(): |
| try: |
| self.run_suite( |
| modelBoardPair.board, |
| modelBoardPair.model, |
| milestone, |
| build_version, |
| "provision", |
| pool=pool, |
| suite_args={"num_required": number}, |
| ) |
| except MoblabRpcError as ex: |
| messages.append(str(ex)) |
| return ", ".join(messages) |
| |
| """ |
| Check duts doesn't flash a test image. |
| If there has one dut without test image, it will raise exception. Otherwise, this function does nothings. |
| """ |
| |
| def check_duts_without_test_image(self, ips): |
| # test duts by given ips. Saving call `find_duts` time. |
| disconnected_dut_ips = list( |
| map( |
| lambda dut: dut[0], |
| filter( |
| lambda dut: not dut[1], |
| self.dut_connector.get_duts(ip_addresses=ips), |
| ), |
| ) |
| ) |
| for ip in disconnected_dut_ips: |
| raise MoblabRpcError("dut (ip: %s) without test image" % ip) |
| |
| async def enroll_duts(self, ip_addresses): |
| """ |
| There have two ways to handle dut without test image. |
| 1. rasie exception. |
| 2. filter dut without test image from request ips. |
| """ |
| self.check_duts_without_test_image(ip_addresses) |
| if _USE_DUT_MANAGER_FEATURE_FLAG: |
| tasks = [ |
| DutManagerRpcConnectorAsync.create_managed_dut(ip) |
| for ip in ip_addresses |
| ] |
| await asyncio.gather(*tasks, return_exceptions=True) |
| else: |
| self.afe_connector.enroll_duts(ip_addresses) |
| |
| self.pause_service.ensure_duts_pause_state(list(ip_addresses)) |
| |
| def unenroll_duts(self, ip_addresses): |
| if _USE_DUT_MANAGER_FEATURE_FLAG: |
| for ip in ip_addresses: |
| DutManagerRpcConnector.delete_managed_dut(ip) |
| else: |
| self.afe_connector.unenroll_duts(ip_addresses) |
| |
| def list_connected_duts_firmware(self): |
| duts = self.dut_connector.get_connected_dut_firmware() |
| return duts |
| |
| def _get_managed_dut_from_dut_manager(self, dut_ip): |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_ip) |
| dut = self._convert_managed_dut_to_dut_dict(managed_dut) |
| return dut |
| |
| def get_dut_details(self, dut: str): |
| dut_details = ( |
| self._get_managed_dut_from_dut_manager(dut) |
| if _USE_DUT_MANAGER_FEATURE_FLAG |
| else self.afe_connector.get_connected_devices( |
| dut, get_current_jobs=True |
| )[0] |
| ) |
| self._hydrate_dut_details_with_mac([dut_details]) |
| return dut_details |
| |
| def _hydrate_dut_details_with_mac(self, duts_details): |
| dut_ip_to_mac_map = { |
| ip: mac |
| for ( |
| ip, |
| mac, |
| ) in self.dut_connector.get_all_known_mac_addresses().items() |
| } |
| for dut in duts_details: |
| if dut["hostname"] in dut_ip_to_mac_map: |
| dut["mac"] = dut_ip_to_mac_map[dut["hostname"]] |
| |
| def update_duts_firmware(self, ip_addresses): |
| return self.dut_connector.update_firmware(ip_addresses) |
| |
| def _is_build_staged(self, build_version, build_target, milestone, model): |
| """ |
| Checks if targeted build is staged. Defaults to checking staging |
| status via Moblab API, falls back to checking the configured GCS |
| bucket directly. |
| Args: |
| build_target (string): AKA board of the target DUT. |
| build_version (string): Build version to be applied to DUT for |
| testing. |
| milestone (string): Milestone the build version belongs to. |
| model (string): Model of the DUT. |
| Returns: |
| Boolean which is true iff build is already staged on Moblab. |
| """ |
| try: |
| return self.moblab_build_connector.check_build_stage_status( |
| build_target, model, build_version, self.moblab_bucket_name |
| ) |
| except google.api_core.exceptions.GoogleAPIError: |
| logging.exception( |
| ( |
| "Encountered API error while checking build stage status " |
| "for {}/{} on model {}" |
| ).format(build_target, build_version, model) |
| ) |
| return ( |
| build_version |
| in self.build_connector.get_builds_for_milestone( |
| build_target, milestone |
| ) |
| ) |
| |
| def _stage_build(self, model, build_target, build_version): |
| """ |
| Applies the specified build artifacts to the bucket configured |
| for this Moblab. |
| |
| Args: |
| model (string): Model of the target DUT. |
| build_target (string): AKA board of the target DUT. |
| build_version (string): Build version to be applied to DUT for |
| testing. |
| Returns: |
| gcs bucket name configured on this moblab. |
| """ |
| self.moblab_build_connector.stage_build( |
| build_target, model, build_version, self.moblab_bucket_name |
| ) |
| |
| sleep_interval = 1 |
| end_time = time.time() + MAX_BUILD_STAGING_SECONDS |
| while time.time() < end_time: |
| if self.moblab_build_connector.check_build_stage_status( |
| build_target, model, build_version, self.moblab_bucket_name |
| ): |
| return self.moblab_bucket_name |
| time.sleep(sleep_interval) |
| # every failure, increase wait interval by 1.5 times |
| # ( exponential backoff, of sorts ) |
| sleep_interval *= 1.5 |
| raise MoblabRpcError( |
| "Build stage did not finish within {} seconds.".format( |
| MAX_BUILD_STAGING_SECONDS |
| ) |
| ) |
| |
| def list_usable_build_targets(self, model): |
| """ |
| Returns build targets associated with the given model. |
| Args: |
| mode(str): DUT model to return associated build_target(s) for. |
| Returns: |
| List of build target strings. |
| """ |
| afe_hosts = self.afe_connector.get_connected_devices() |
| build_targets = set() |
| for host in afe_hosts: |
| if ( |
| LabelsParser.extract_model_from_labels(host.get("labels", [])) |
| == model |
| ): |
| build_targets.add( |
| LabelsParser.extract_build_target_from_labels( |
| host.get("labels", []) |
| ) |
| ) |
| return list(build_targets) |
| |
| def list_build_targets(self): |
| """List all build targets that this account has access to. |
| |
| Returns: |
| List of build target strings. |
| """ |
| try: |
| return self.moblab_build_connector.list_build_targets() |
| except moblab_build_connector.MoblabBuildConnectorException: |
| logging.exception( |
| "Encountered error trying to list build targets." |
| ) |
| |
| def list_connected_build_targets(self): |
| """List all connected build targets on this moblab. |
| |
| Returns: |
| List of build target strings. |
| """ |
| afe_hosts = self.afe_connector.get_connected_devices() |
| build_targets = set() |
| for host in afe_hosts: |
| build_targets.add( |
| LabelsParser.extract_build_target_from_labels( |
| host.get("labels", []) |
| ) |
| ) |
| return list(build_targets) |
| |
| def list_models(self, build_target): |
| """Lists all models for the given build target. |
| If build target is set to '*' it will list models for all build targets. |
| |
| Returns: |
| Dictionary of models as keys and build targets set as value. |
| """ |
| try: |
| models_response = self.moblab_build_connector.list_models( |
| build_target |
| ) |
| results = defaultdict(set) |
| for m in models_response: |
| ( |
| build_target, |
| model, |
| ) = LabelsParser.extrat_model_build_target_from_api_path(m) |
| if model and build_target: |
| results[model].add(build_target) |
| return results |
| except moblab_build_connector.MoblabBuildConnectorException: |
| logging.exception( |
| "Encountered error trying to list models for build target: {}.".format( |
| build_target |
| ) |
| ) |
| |
| def _key_getter(self, data): |
| s = "" |
| if "model" in data: |
| s += data["model"] |
| if "build_target" in data: |
| s += data["build_target"] |
| if "milestone" in data: |
| s += data["milestone"] |
| if "label" in data: |
| s += data["label"] |
| return hash(s) |
| |
| def list_usable_milestones(self, build_target, model, force_update=False): |
| """ |
| Returns milestones available for a given model/build_target, both |
| staged and unstaged (if target bucket is in US, otherwise will only |
| return staged milestones). |
| |
| In this context, 'remote' means unstaged/from API call (i.e., remote |
| milestones are those that are fetched from the Moblab API rather than |
| from existing build items staged in the partner bucket ). |
| |
| Args: |
| build_target(string): build_target (AKA board) to fetch milestones |
| for. |
| model(string): model to fetch milestones for. |
| force_update: force update the cache. |
| Returns: |
| A list of tuples where the first item is the milestone and the |
| second is a boolean which is true iff at least one build of the |
| milestone is staged. |
| """ |
| |
| def _list_usable_milestones(): |
| bucket_milestones = self.build_connector.get_milestones_available( |
| build_target |
| ) |
| |
| remote_milestones = [] |
| if not self._is_moblab_bucket_in_asia(): |
| try: |
| remote_milestones = ( |
| self.moblab_build_connector.list_available_milestones( |
| build_target, |
| model, |
| ) |
| ) |
| except moblab_build_connector.MoblabBuildConnectorException: |
| logging.exception( |
| "Encountered error trying to fetch remote milestones." |
| ) |
| |
| milestone_results = [ |
| MilestonePair(milestone=milestone, is_staged=True) |
| for milestone in bucket_milestones |
| ] |
| milestone_results.extend( |
| [ |
| MilestonePair(milestone=milestone, is_staged=False) |
| for milestone in list( |
| set(remote_milestones).difference( |
| set(bucket_milestones) |
| ) |
| ) |
| ] |
| ) |
| |
| milestone_results.sort( |
| key=lambda x: int(x[0].strip("R")), reverse=True |
| ) |
| return BuildResults( |
| results=milestone_results, |
| is_incomplete=( |
| len(remote_milestones) |
| == moblab_build_connector.DEFAULT_PAGE_SIZE |
| ), |
| ) |
| |
| # filter boards don't have a permission |
| if build_target not in self.list_build_targets(): |
| return BuildResults(results=[], is_incomplete=False) |
| |
| return self.milestone_cache.get( |
| key=self._key_getter( |
| {"model": model, "build_target": build_target} |
| ), |
| or_default=_list_usable_milestones, |
| force_update=force_update, |
| ) |
| |
| def list_usable_build_versions( |
| self, build_target, milestone, model, label, force_update=False |
| ): |
| """ |
| Returns build versions available for a given |
| model/build_target/milestone, both staged and unstaged (if target |
| bucket is in US, otherwise will only return staged build versions). |
| |
| In this context, 'remote' means unstaged/from API call (i.e., remote |
| build versions are those that are fetched from the Moblab API rather |
| than from existing build items staged in the partner bucket ). |
| |
| Args: |
| build_target(string): build_target (AKA board) to return build |
| versions for. |
| model(string): model to return build versions for. |
| milestone(string): milestone to return build versions for. |
| Returns: |
| A list of tuples where the first item is the build_version and the |
| second is a boolean which is true iff the build is staged. |
| """ |
| |
| def _list_usable_build_versions(): |
| bucket_build_versions = ( |
| self.build_connector.get_builds_for_milestone( |
| build_target, milestone |
| ) |
| ) |
| |
| build_version_results = [ |
| BuildVersionPair( |
| build_version=build_version, |
| status=moblab_build_connector.BuildStatus.AVAILABLE, |
| is_staged=True, |
| labels=[], |
| ) |
| for build_version in bucket_build_versions |
| ] |
| |
| remote_build_versions = [] |
| if not self._is_moblab_bucket_in_asia(): |
| try: |
| remote_build_versions = ( |
| self.moblab_build_connector.list_builds_for_milestone( |
| build_target, |
| model, |
| milestone, |
| label, |
| ) |
| ) |
| except moblab_build_connector.MoblabBuildConnectorException: |
| logging.exception( |
| "Encountered error trying to fetch remote build versions." |
| ) |
| |
| if label: |
| build_version_results = [] |
| staged_builds_set = set(bucket_build_versions) |
| for build_version in remote_build_versions: |
| if build_version.version not in staged_builds_set: |
| build_version_results.append( |
| BuildVersionPair( |
| build_version=build_version.version, |
| status=build_version.status, |
| is_staged=False, |
| labels=build_version.labels, |
| ) |
| ) |
| |
| build_version_results.sort( |
| key=lambda x: Version(x[0]), |
| reverse=True, |
| ) |
| return BuildResults( |
| results=build_version_results, |
| is_incomplete=( |
| len(remote_build_versions) |
| == moblab_build_connector.DEFAULT_PAGE_SIZE |
| ), |
| ) |
| |
| data = { |
| "model": model, |
| "build_target": build_target, |
| "milestone": milestone, |
| "label": label, |
| } |
| return self.build_version_cache.get( |
| key=self._key_getter(data), |
| or_default=_list_usable_build_versions, |
| force_update=force_update, |
| ) |
| |
| def get_num_jobs( |
| self, |
| id_filter=None, |
| name_filter=None, |
| created_time_lt=None, |
| created_time_gt=None, |
| sub=False, |
| suite=False, |
| not_yet_run=False, |
| running=False, |
| finished=False, |
| completed_status=None, |
| parent_id_filter=None, |
| dut_hostname_filter=None, |
| ): |
| # TO_DO(wangmichael): bifurcating on dut_filter is necessary because |
| # the Autotest backend that we currently invoke only accepts dut_filter |
| # on calls to get_num_host_queue_entries, not get_num_jobs. Even |
| # though, in both cases, the results are packaged and returned as jobs. |
| # This may change when Autotest RPC itself is refactored. |
| if dut_hostname_filter: |
| try: |
| dut_id = self.afe_connector.get_connected_devices( |
| hostname=dut_hostname_filter |
| )[0]["id"] |
| except (IndexError, KeyError): |
| logging.error( |
| "Unable to find AFE id for dut: {}".format( |
| dut_hostname_filter |
| ) |
| ) |
| return |
| return self.afe_connector.get_num_host_queue_entries( |
| host_id=dut_id |
| ) |
| else: |
| return self.afe_connector.get_num_jobs( |
| id_filter, |
| name_filter, |
| created_time_lt, |
| created_time_gt, |
| sub, |
| suite, |
| not_yet_run, |
| running, |
| finished, |
| completed_status, |
| parent_id_filter, |
| ) |
| |
| def get_jobs( |
| self, |
| query_start, |
| query_limit, |
| id_filter=None, |
| name_filter=None, |
| created_time_lt=None, |
| created_time_gt=None, |
| sub=False, |
| suite=False, |
| not_yet_run=False, |
| running=False, |
| finished=False, |
| completed_status=None, |
| parent_id_filter=None, |
| dut_hostname_filter=None, |
| sort_by=None, |
| ): |
| # TO_DO(wangmichael): bifurcating on dut_filter is necessary because |
| # the Autotest backend that we currently invoke only accepts dut_filter |
| # on calls to get_host_queue_entries, not get_jobs. Even though, |
| # in both cases, the results are packaged and returned as jobs. This |
| # may change when Autotest RPC itself is refactored. |
| jobs = [] |
| |
| if dut_hostname_filter: |
| try: |
| dut_id = self.afe_connector.get_connected_devices( |
| hostname=dut_hostname_filter |
| )[0]["id"] |
| except (IndexError, KeyError): |
| logging.error( |
| "Unable to find AFE id for dut: {}".format( |
| dut_hostname_filter |
| ) |
| ) |
| return |
| hqes, _ = self.afe_connector.get_host_queue_entries( |
| host_id=dut_id, |
| query_start=query_start, |
| query_limit=query_limit, |
| sort_by=sort_by, |
| ) |
| |
| # Extracting jobs information from HQE entries. |
| for hqe in hqes: |
| job = hqe["job"] |
| host = hqe.get("host") |
| if host: |
| job["hostname"] = host.get("hostname", "--") |
| job["status"] = hqe.get("status") |
| jobs.append(job) |
| |
| else: |
| jobs = self.afe_connector.get_jobs( |
| query_start, |
| query_limit, |
| id_filter, |
| name_filter, |
| created_time_lt, |
| created_time_gt, |
| sub, |
| suite, |
| not_yet_run, |
| running, |
| finished, |
| completed_status, |
| parent_id_filter, |
| sort_by, |
| ) |
| job_ids = [job["id"] for job in jobs] |
| |
| jobs_upload_status = ( |
| self.upload_status_connector.get_jobs_upload_status(job_ids) |
| ) |
| for job in jobs: |
| if job["id"] in jobs_upload_status: |
| state = jobs_upload_status[job["id"]] |
| job["upload_status"] = state.status |
| job["attempt_number"] = state.attempt_number |
| job["last_falure_reason"] = state.last_falure_reason |
| |
| return jobs |
| |
| def get_job_ids( |
| self, |
| query_start, |
| query_limit, |
| id_filter=None, |
| name_filter=None, |
| created_time_lt=None, |
| created_time_gt=None, |
| sub=False, |
| suite=False, |
| not_yet_run=False, |
| running=False, |
| finished=False, |
| completed_status=None, |
| parent_id_filter=None, |
| dut_hostname_filter=None, |
| ): |
| """Returns the job ID's that match the request filters. |
| |
| Args: |
| query_start: Int index for start of returned values. |
| query_limit: Int for max number of values to return. |
| id_filter: String filter for job id. |
| name_filter: String filter for job name. |
| created_time_lt: A unix timestamp in seconds granularity the sets |
| upper bound for jobs returned. |
| created_time_gt: A unix timestamp in seconds granularity the sets |
| lower bound for jobs returned. |
| sub: Boolean, true iff child jobs to be returned. |
| suite: Boolean, true iff parent jobs to be returned. |
| not_yet_run: Boolean, true iff queued jobs to be returned. |
| running: Boolean, true iff running jobs to be returned. |
| finished: Boolean, true iff only finished jobs to be returned. |
| parent_id_filter: String filter for parent job id. |
| dut_hostname_filter: String filter for DUT associated with job. |
| |
| Returns: |
| List of job ID numbers. |
| """ |
| return [ |
| job["id"] |
| for job in self.get_jobs( |
| query_start, |
| query_limit, |
| id_filter, |
| name_filter, |
| created_time_lt, |
| created_time_gt, |
| sub, |
| suite, |
| not_yet_run, |
| running, |
| finished, |
| completed_status, |
| parent_id_filter, |
| dut_hostname_filter, |
| ) |
| ] |
| |
| def abort_jobs(self, job_ids): |
| result = self.afe_connector.abort_host_queue_entries(job_ids) |
| if result: |
| aborted_jobs = [str(item["Job"]) for item in result] |
| if len(aborted_jobs) > 15: |
| return ( |
| "Aborted {} jobs, please refresh table" " to see results." |
| ).format(len(aborted_jobs)) |
| else: |
| return ( |
| "Aborted the following jobs: {}\nPlease refresh " |
| "table to see results." |
| ).format(", ".join(aborted_jobs)) |
| else: |
| return ( |
| "No jobs aborted (were selected jobs already " |
| "aborted/finished)?" |
| ) |
| |
| def get_job_details(self, job_id): |
| return self.afe_connector.get_jobs_by_ids([job_id]) |
| |
| def get_job_history(self, job_id): |
| return self.afe_connector.get_job_history(job_id) |
| |
| def get_associated_duts(self, job_id): |
| hqes, _ = self.afe_connector.get_host_queue_entries(job_id=job_id) |
| return hqes |
| |
| def get_num_dut_tasks(self, ip, start_time_gt, end_time_lt): |
| """ |
| Given a DUT's IP, returns a number of tasks (e.g. repairs, |
| reverifies) associated with that DUT. |
| Args: |
| ip (string): DUT IP. |
| start_time_gt(int): Seconds timestamp of a filter to DUT tasks by |
| start time. |
| end_time_lt(int): Seconds timestamp of a filter to DUT tasks by |
| end time. |
| Returns: |
| Returns number of task associated with DUT which match query |
| parameters. If DUT itself is not found, returns -1. |
| """ |
| try: |
| if _USE_DUT_MANAGER_FEATURE_FLAG: |
| return len( |
| self._get_dut_tasks_from_dut_manager( |
| ip, start_time_gt, end_time_lt |
| ) |
| ) |
| |
| dut_id = self.afe_connector.get_connected_devices(hostname=ip)[0][ |
| "id" |
| ] |
| return self.afe_connector.get_num_special_tasks( |
| host_id_list=[dut_id], |
| start_time_gt=start_time_gt, |
| end_time_lt=end_time_lt, |
| ) |
| except (IndexError, KeyError): |
| logging.error("Unable to find AFE id for dut: {}".format(dut_id)) |
| return -1 |
| |
| def _get_dut_tasks_from_dut_manager( |
| self, dut_ip: str, query_start: int, query_limit: int |
| ): |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_ip) |
| tasks = managed_dut.history |
| if query_limit and query_start + query_limit <= len(tasks): |
| query_end = query_start + query_limit |
| tasks = tasks[query_start:query_end] |
| else: |
| tasks = tasks[query_start:] |
| |
| result = [] |
| for t in tasks: |
| task = json.loads(t.note) |
| task_dict = {} |
| task_dict["task"] = DUT_MANAGER_TO_AFE_DUT_STATUS_MAPPINGS[t.state] |
| task_dict["time_started"] = str(t.start_time.ToDatetime()) |
| task_dict["time_finished"] = str(t.end_time.ToDatetime()) |
| task_dict["lease_owner"] = str(t.lease_owner) |
| task_dict["note"] = t.note |
| task_dict["task_log_id"] = "%s-%s" % (task["id"], task["task"]) |
| |
| result.append(task_dict) |
| return result |
| |
| def get_dut_tasks( |
| self, ip, query_start, query_limit, start_time_gt, end_time_lt |
| ): |
| """ |
| Given a DUT's IP, returns a list of tasks (e.g. repairs, |
| reverifies) requested for that DUT. If DUT is not found, returns |
| None. |
| Args: |
| query_start(int): Parameter specifying index on where to start when |
| returning paged results. |
| query_limit(int): Upper limit on number of results to return. |
| start_time_gt(int): Seconds timestamp of a filter to DUT tasks by |
| start time. |
| end_time_lt(int): Seconds timestamp of a filter to DUT tasks by |
| end time. |
| Returns: |
| A list of dicts, each dict specifying a task associated with the |
| queried DUT. If DUT is not found returns None. |
| """ |
| try: |
| if _USE_DUT_MANAGER_FEATURE_FLAG: |
| return self._get_dut_tasks_from_dut_manager( |
| ip, query_start, query_limit |
| ) |
| |
| dut_id = self.afe_connector.get_connected_devices(hostname=ip)[0][ |
| "id" |
| ] |
| return self.afe_connector.get_special_tasks( |
| host_id_list=[dut_id], |
| query_start=query_start, |
| query_limit=query_limit, |
| start_time_gt=start_time_gt, |
| end_time_lt=end_time_lt, |
| ) |
| except (IndexError, KeyError): |
| logging.error("Unable to find AFE id for dut: {}".format(dut_id)) |
| |
| def add_label_to_managed_dut(self, dut_hostname, label): |
| # Get the current instance of managed dut and append the label as tag |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname) |
| add_label = DutAttribute(id=DutAttribute.Id(value=label)) |
| managed_dut.tag.dut_attributes.append(add_label) |
| field_mask = FieldMask(paths=["tag"]) |
| DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask) |
| return True, "" # Status, Message |
| |
| def remove_label_from_managed_dut(self, dut_hostname, label): |
| # Get the current instance of managed dut and remove the label as tag |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname) |
| remove_label = DutAttribute(id=DutAttribute.Id(value=label)) |
| try: |
| managed_dut.tag.dut_attributes.remove(remove_label) |
| except ValueError: |
| return False, "The label {} does not exist to remove".format(label) |
| |
| field_mask = FieldMask(paths=["tag"]) |
| DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask) |
| return True, "" |
| |
| def add_attribute_to_managed_dut(self, dut_hostname, key, value): |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname) |
| add_attribute = DutAttribute( |
| id=DutAttribute.Id(value=key), field_path=value |
| ) |
| managed_dut.tag.dut_attributes.append(add_attribute) |
| field_mask = FieldMask(paths=["tag"]) |
| DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask) |
| return True, "" |
| |
| def remove_attribute_from_managed_dut(self, dut_hostname, key): |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname) |
| |
| # Find the value given the key, this is needed to distinguish |
| # attributes from labels |
| values = [ |
| tag.field_path |
| for tag in managed_dut.tag.dut_attributes |
| if tag and tag.id.value == key |
| ] |
| if not values: |
| return ( |
| False, |
| "The attribute key {} does not exist to remove".format(key), |
| ) |
| |
| remove_attribute = DutAttribute( |
| id=DutAttribute.Id(value=key), field_path=values[0] |
| ) |
| managed_dut.tag.dut_attributes.remove(remove_attribute) |
| field_mask = FieldMask(paths=["tag"]) |
| DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask) |
| return True, "" |
| |
| def add_pool_to_managed_dut(self, dut_identifier, pool_name): |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_identifier) |
| add_pool = Pool(name=pool_name) |
| managed_dut.pool.append(add_pool) |
| field_mask = FieldMask(paths=["pool"]) |
| DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask) |
| return True, "" |
| |
| def remove_pool_from_managed_dut(self, dut_identifier, pool_name): |
| managed_dut = DutManagerRpcConnector.get_managed_dut(dut_identifier) |
| remove_pool = Pool(name=pool_name) |
| try: |
| managed_dut.pool.remove(remove_pool) |
| except ValueError: |
| return False, "The pool {} does not exist to remove".format( |
| pool_name |
| ) |
| |
| field_mask = FieldMask(paths=["pool"]) |
| DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask) |
| return True, "" |
| |
| def _apply_edit_to_duts( |
| self, dut_hostnames, afe_edit_func, success_msg, *args |
| ): |
| """ |
| A helper function which contains the repeated logic involved in |
| applying an edit to multiple DUTs. This logic is repeated for |
| adding and removing attributes and labels. |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames of the DUT's to be targeted. |
| afe_edit_func((*args) => (tuple(bool,string))): A reference to a |
| function of AFEConnector which takes in *args and returns a |
| tuple containing a bool indicating success and a results |
| message string. This function does the 'edit' to the DUT's. |
| success_msg(int): A format-able string to be returned in success |
| cases; It should have exactly as many format indexes ({}) as |
| there are arguments in *args. |
| *args: Arguments which are meant to be passed into afe_edit_func. |
| Returns: |
| String message summarizing results. |
| """ |
| failures = {} |
| for dut_hostname in dut_hostnames: |
| is_success, message = afe_edit_func(dut_hostname, *args) |
| if not is_success: |
| failures[dut_hostname] = message |
| |
| response_message = success_msg.format( |
| *args, len(dut_hostnames) - len(failures.keys()) |
| ) |
| |
| if failures: |
| response_message += "Failures on the following DUTs:\n" |
| for dut_hostname, message in failures.items(): |
| response_message += "{}: {}\n".format(dut_hostname, message) |
| |
| return response_message |
| |
| def add_attribute_to_duts(self, dut_hostnames, key, value): |
| """ |
| Adds attribute key:value to the selected dut_hostnames. Returns a |
| message summarizing how many DUT's the attribute was successfully |
| added to as well for which DUT's the addition failed. |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames |
| of the DUT's to be targeted. |
| key (string): Key of attribute to be added. |
| value (string): Value of attribute to be added. |
| Returns: |
| String message summarizing results. |
| """ |
| return self._apply_edit_to_duts( |
| dut_hostnames, |
| self.add_attribute_to_managed_dut |
| if _USE_DUT_MANAGER_FEATURE_FLAG |
| else self.afe_connector.add_attribute_to_host, |
| "Successfully added attribute {}:{} to {} DUT(s)\n", |
| key, |
| value, |
| ) |
| |
| def remove_attribute_from_duts(self, dut_hostnames, key): |
| """ |
| Removes attribute (referenced by key) from the selected |
| dut_hostnames. Returns a message summarizing how many DUT's the |
| attribute was successfully removed from as well for which DUT's the |
| remove failed. |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames of the DUT's to be targeted. |
| key (string): Key of attribute to be remove. |
| value (string): Value of attribute to be remove. |
| Returns: |
| String message summarizing results. |
| """ |
| return self._apply_edit_to_duts( |
| dut_hostnames, |
| self.remove_attribute_from_managed_dut |
| if _USE_DUT_MANAGER_FEATURE_FLAG |
| else self.afe_connector.remove_attribute_from_host, |
| "Successfully removed attribute {} from {} DUT(s)\n", |
| key, |
| ) |
| |
| def add_label_to_duts(self, dut_hostnames, label): |
| """ |
| Adds label to the selected dut_hostnames. Returns a message |
| summarizing how many DUT's the label was successfully added to as |
| well for which DUT's the addition failed. |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames of the DUT's to be targeted. |
| label (string): Label to be added. |
| Returns: |
| String message summarizing results. |
| """ |
| return self._apply_edit_to_duts( |
| dut_hostnames, |
| self.add_label_to_managed_dut |
| if _USE_DUT_MANAGER_FEATURE_FLAG |
| else self.afe_connector.add_label_to_host, |
| "Successfully added label {} to {} DUT(s)\n", |
| label, |
| ) |
| |
| def remove_label_from_duts(self, dut_hostnames, label): |
| """ |
| Remove label from the selected dut_hostnames. Returns a message |
| summarizing how many DUT's the label was successfully removed from |
| as well as for which DUT's the remove failed. |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames of the DUT's to be targeted. |
| label (string): Label to be removed. |
| Returns: |
| String message summarizing results. |
| """ |
| return self._apply_edit_to_duts( |
| dut_hostnames, |
| self.remove_label_from_managed_dut |
| if _USE_DUT_MANAGER_FEATURE_FLAG |
| else self.afe_connector.remove_label_from_host, |
| "Successfully removed label {} from {} DUT(s)\n", |
| label, |
| ) |
| |
| def add_pool_to_duts(self, dut_hostnames, pool): |
| """ |
| Adds pool to the selected dut_hostnames. Returns a message |
| summarizing how many DUT's the pool was successfully added to as |
| well for which DUT's the addition failed. |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames of the DUT's to be targeted. |
| pool (string): Pool to be added. |
| Returns: |
| String message summarizing results. |
| """ |
| if not _USE_DUT_MANAGER_FEATURE_FLAG: |
| return self.add_label_to_duts(dut_hostnames, "pool:" + pool) |
| |
| return self._apply_edit_to_duts( |
| dut_hostnames, |
| self.add_pool_to_managed_dut, |
| "Successfully added pool {} to {} DUT(s)\n", |
| pool, |
| ) |
| |
| def remove_pool_from_duts(self, dut_hostnames, pool): |
| """ |
| Remove pool from the selected dut_hostnames. Returns a message |
| summarizing how many DUT's the pool was successfully removed from |
| as well as for which DUT's the remove failed. |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames of the DUT's to be targeted. |
| pool (string): Pool to be removed. |
| Returns: |
| String message summarizing results. |
| """ |
| if not _USE_DUT_MANAGER_FEATURE_FLAG: |
| return self.remove_label_from_duts(dut_hostnames, "pool:" + pool) |
| |
| return self._apply_edit_to_duts( |
| dut_hostnames, |
| self.remove_pool_from_managed_dut, |
| "Successfully removed pool {} from {} DUT(s)\n", |
| pool, |
| ) |
| |
| def reverify_host(self, dut_hostnames): |
| """Starts repair run on given DUTs hostnames. |
| |
| Args: |
| dut_hostnames(string[]): A list of strings representing the |
| hostnames of the DUT's to be targeted. |
| """ |
| self.afe_connector.reverify_host(dut_hostnames) |
| return "Successfully set off reverify on {}".format(dut_hostnames) |
| |
| def repair_host(self, dut_hostname): |
| """Starts repair run on given DUT hostname. |
| |
| Args: |
| dut_hostname(string): String hostname (ex. IP address) |
| """ |
| dut_id = self._find_dut_id_given_ip(dut_hostname) |
| if dut_id: |
| self.afe_connector.repair_host(dut_id) |
| |
| return "Successfully set off repair on {}".format(dut_hostname) |
| |
| def get_cloud_configuration(self): |
| """ |
| Gets cloud configuration info. |
| Returns: |
| A dict of cloud configuration info specifically, |
| 'gs_access_key_id', 'gs_secret_access_key', 'image_storage_server' |
| """ |
| return self.afe_connector.get_cloud_storage_info() |
| |
| def get_dut_wifi_info(self): |
| """Gets DUT wifi info. |
| Returns: |
| A dict with dut_wifi_name, dut_wifi_password |
| """ |
| return self.afe_connector.get_dut_wifi_info() |
| |
| def set_dut_wifi_info(self, dut_wifi_name, dut_wifi_password): |
| """ |
| Attempts to set DUT wifi on Moblab. |
| Args: |
| dut_wifi_name (string): Wifi name |
| dut_wifi_password (string): Wifi password |
| Returns: |
| A result message string. |
| """ |
| if self.afe_connector.set_configuration_info( |
| dut_wifi_name=dut_wifi_name, |
| dut_wifi_password=dut_wifi_password, |
| ): |
| return "Successfully applied DUT Wifi info changes." |
| else: |
| return "Failed to apply DUT Wifi info changes." |
| |
| def _can_change_cloud_config(self): |
| return self.get_num_jobs() == 0 |
| |
| def _cloud_config_changing( |
| self, |
| new_is_cloud_enabled, |
| new_boto_key_id, |
| new_boto_key_secret, |
| new_gcs_bucket_url, |
| ): |
| old_config = self.get_cloud_configuration() |
| old_is_cloud_enabled = old_config.get("is_cloud_enabled", False) |
| if old_is_cloud_enabled and not new_is_cloud_enabled: |
| return False |
| |
| old_boto_key_id = old_config.get("gs_access_key_id", "") |
| old_boto_key_secret = old_config.get("gs_secret_access_key", "") |
| old_gcs_bucket_url = old_config.get("image_storage_server", "") |
| if ( |
| old_boto_key_id != new_boto_key_id |
| or old_boto_key_secret != new_boto_key_secret |
| or old_gcs_bucket_url != new_gcs_bucket_url |
| ): |
| return True |
| return False |
| |
| def set_cloud_configuration( |
| self, |
| is_cloud_enabled, |
| boto_key_id, |
| boto_key_secret, |
| gcs_bucket_url, |
| is_remote_agent_enabled, |
| is_remote_command_enabled, |
| ): |
| """ |
| Attempts to set configuration values to Moblab. |
| Args: |
| boto_key_id (string): boto ID string |
| boto_key_secret (string): boto secret string |
| gcs_bucket_url (string): GCS bucket URL |
| Returns: |
| A result message string. |
| """ |
| if is_cloud_enabled: |
| if ( |
| not self._can_change_cloud_config() |
| and self._cloud_config_changing( |
| is_cloud_enabled, |
| boto_key_id, |
| boto_key_secret, |
| gcs_bucket_url, |
| ) |
| ): |
| logging.warn( |
| "Attempt to modify Cloud Config after it was used." |
| ) |
| |
| return ( |
| "Cloud configuration changes are not allowed. " |
| "Please powerwash the Moblab to set new configuration." |
| ) |
| |
| is_valid, details = self.afe_connector.validate_cloud_storage_info( |
| boto_key_id, boto_key_secret, gcs_bucket_url |
| ) |
| |
| if is_valid: |
| if self.afe_connector.set_configuration_info( |
| is_cloud_enabled, |
| boto_key_id, |
| boto_key_secret, |
| gcs_bucket_url, |
| ): |
| message = ( |
| "Successfully applied cloud configuration values. " |
| ) |
| self._last_config_update = datetime.utcnow() |
| self.moblab_bucket_name = ( |
| self.config_connector.get_cloud_bucket( |
| force_reload=True |
| ) |
| ) |
| |
| if self.set_is_remote_agent_enabled( |
| is_remote_agent_enabled |
| ): |
| message += "Remote agent has been {}. ".format( |
| "enabled" |
| if is_remote_agent_enabled |
| else "disabled" |
| ) |
| else: |
| message += ( |
| "But failed to set remote agent as {}. ".format( |
| "enabled" |
| if is_remote_agent_enabled |
| else "disabled" |
| ) |
| ) |
| |
| if self.set_is_remote_command_enabled( |
| is_remote_command_enabled |
| ): |
| message += "Remote command has been {}.".format( |
| "enabled" |
| if is_remote_command_enabled |
| else "disabled" |
| ) |
| else: |
| message += ( |
| "But failed to set remote command as {}.".format( |
| "enabled" |
| if is_remote_command_enabled |
| else "disabled" |
| ) |
| ) |
| return message |
| else: |
| return "Failed to apply cloud configuration changes." |
| else: |
| return details |
| else: |
| if self.afe_connector.set_configuration_info(is_cloud_enabled): |
| return "Successfully disabled cloud configuration." |
| else: |
| return "Failed to commit disable cloud change." |
| |
| def get_is_update_available(self): |
| """ |
| Check to see if there is a system update or any containers that |
| have updated images. |
| |
| Scan the running containers, if the watchtower program has pulled |
| down a newer image then the container to image reference tag |
| is broken. |
| |
| Detect these image tag link breaks and assume that means we have |
| an update. Looking through update tools and forums, it seems this |
| is the way all update tools work. |
| |
| Returns: |
| True iff there is an update available. |
| """ |
| |
| try: |
| response = ( |
| host_connector.HostServicesConnector.get_system_update_status() |
| ) |
| if response.current_op.find("IDLE") == -1: |
| return True |
| except host_connector.HostServicesException: |
| logging.exception("Get system update status call failed.") |
| |
| client = docker.from_env(timeout=300) |
| for container in client.containers.list(): |
| if len(container.image.tags) == 0: |
| return True |
| return False |
| |
| def _update_docker(self): |
| """ |
| Sets off an update of all docker containers, using watchtower |
| |
| Returns: String result message. |
| """ |
| client = docker.from_env(timeout=600) |
| |
| try: |
| container = client.containers.get("update") |
| raise MoblabRpcError("Already updating please wait.") |
| except docker.errors.NotFound: |
| pass |
| |
| label = os.environ.get("WATCHTOWER_TAG", "release") |
| container = client.containers.run( |
| MOBLAB_DOCKER_REGISTRY + "watchtower:%s" % label, |
| volumes=["/var/run/docker.sock:/var/run/docker.sock"], |
| name="update", |
| detach=True, |
| command="--run-once --include-restarting", |
| ) |
| |
| try: |
| container.wait(timeout=600) |
| except requests.exceptions.ReadTimeout: |
| logging.exception("Watchtower update raised an exception.") |
| container.stop() |
| finally: |
| container.remove() |
| return "Update finished." |
| |
| def _update_system(self): |
| """ |
| Initiates system update if it is available. |
| Returns: |
| An error message. |
| """ |
| try: |
| response = ( |
| host_connector.HostServicesConnector.install_system_update() |
| ) |
| return response.error |
| except host_connector.HostServicesException: |
| logging.exception("Install system update raised an exception.") |
| return "Failed to install update." |
| |
| def update_moblab(self): |
| """ |
| Initiates a system update if available, |
| otherwise kicks docker images update. |
| |
| Returns: |
| Update status string. |
| """ |
| return_message = self._update_system() |
| # if a message was returned then reboot was not requested |
| if return_message: |
| return self._update_docker() |
| return "Updating system." |
| |
| def get_is_remote_agent_enabled(self): |
| """ |
| Check to see if the remote agent is enabled. |
| |
| Returns: |
| True iff remote agent is enabled. |
| """ |
| return self.config_connector.get_is_remote_agent_enabled() |
| |
| def set_is_remote_agent_enabled(self, is_enabled): |
| """ |
| Attempts to set if the remote agent is enabled. |
| |
| Args: |
| is_enabled (bool): enable the remote agent or not |
| Returns: |
| True iff the set operation successfully. |
| """ |
| return self.afe_connector.set_is_remote_agent_enabled(is_enabled) |
| |
| def get_is_remote_command_enabled(self): |
| """ |
| Check to see if the remote agent is enabled. |
| |
| Returns: |
| True iff remote agent is enabled. |
| """ |
| return self.config_connector.is_remote_task_scheduler_enabled() |
| |
| def set_is_remote_command_enabled(self, is_enabled): |
| """ |
| Attempts to set if the remote command is enabled. |
| |
| Args: |
| is_enabled (bool): enable the remote command or not |
| Returns: |
| True iff the set operation successfully. |
| """ |
| return self.afe_connector.set_is_remote_command_enabled(is_enabled) |
| |
| def download_service_account(self): |
| """Download the service account from the account bucket. |
| |
| To avoid users having to enter two different credentials, they enter |
| the boto key and secret, then we download the API key from a known |
| file in their bucket. |
| |
| This function downloads that file, as it is getting the API key we |
| have to use gsutil vs cloud storage API to download. |
| |
| """ |
| self.moblab_bucket_name = self.config_connector.get_cloud_bucket() |
| logging.info("Using bucket: %s", self.moblab_bucket_name) |
| if not self.moblab_bucket_name: |
| raise MoblabRpcError( |
| "Need bucket name to copy service account from." |
| ) |
| try: |
| subprocess.check_call( |
| [ |
| "gsutil", |
| "cp", |
| "gs://%s/%s" |
| % ( |
| self.moblab_bucket_name, |
| "pubsub-key-do-not-delete.json", |
| ), |
| "/home/moblab/.service_account.json", |
| ], |
| ) |
| |
| except subprocess.CalledProcessError: |
| logging.exception( |
| "Failed to get the service account from %s", |
| self.moblab_bucket_name, |
| ) |
| raise MoblabRpcError( |
| "Failed to get the service account from %s" |
| % self.moblab_bucket_name, |
| ) |
| |
| def add_servo(self, dut_hostname, servo_serial_number): |
| """Configure a DUT to have a servo connected to it. |
| |
| DUT's can be connected to the moblab via a servo, for the testing |
| infrastructure to work there needs to be attributes added to the DUT |
| to specify the servo serial number, servo hostname and servo host port. |
| |
| Serial number is provided by the user hostname and port are generated. |
| |
| Then verify is called which collects informationtion about the DUT |
| similar to how add dut works, labels like servo_state:WORKING will be |
| added to the dut by that process. |
| """ |
| self.add_attribute_to_managed_dut( |
| dut_hostname, "servo_serial", servo_serial_number |
| ) |
| self.add_attribute_to_managed_dut(dut_hostname, "servo_port", "9999") |
| self.add_attribute_to_managed_dut( |
| dut_hostname, "servo_host", "%s_docker_servod" % dut_hostname |
| ) |
| return self.reverify_host([dut_hostname]) |