blob: d217bb1284ee7f787b54885f40f2b0a23d4ee481 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import asyncio
import docker
import google.api_core
import os
import re
import requests
import subprocess
import time
import logging
import json
from collections import namedtuple, defaultdict
from datetime import datetime
from google.cloud import storage
from google.protobuf.field_mask_pb2 import FieldMask
from packaging.version import Version
from moblab_common import build_connector
from moblab_common import dut_connector
from moblab_common import host_connector
from moblab_common import moblab_build_connector
from moblab_common import afe_connector
from moblab_common import config_connector
from moblab_common import result_upload_status_connector
from dut_manager_connector import (
DutManagerRpcConnector,
DutManagerRpcConnectorAsync,
)
from host_scheduler_pause_service import HostSchedulerPauseService
from lab.managed_dut_pb2 import ManagedDut, Pool
from chromiumos.test.api.dut_attribute_pb2 import DutAttribute
from cache import Cache
MilestonePair = namedtuple("MilestonePair", ["milestone", "is_staged"])
BuildVersionPair = namedtuple(
"BuildVersionPair", ["build_version", "status", "is_staged", "labels"]
)
BuildResults = namedtuple("BuildResults", ["results", "is_incomplete"])
_DEFAULT_SUITE_TIMEOUT_MINS = 1440
_SUITE_TIMEOUT_MAP = {
"hardware_storagequal": 40320,
"hardware_storagequal_quick": 40320,
"storage_qual_v2_xs": 2160, # 1.5 days
"storage_qual_v2_s": 4320, # 3 days
"storage_qual_v2_m": 7200, # 5 days
"storage_qual_v2_l": 15840, # 11 days
"storage_qual_v2_xl": 28800, # 20 days
"cts": 2880, # 2 days
"gts": 2880, # 2 days
"performance_cuj": 10080, # 7 days
"performance_cuj_experimental": 10080, # 7 days
"mtbf": 14400, # 10 days
}
_STORAGE_QUAL_VARIATIONS = {
0: "hardware_storagequal",
1: "hardware_storagequal_quick",
2: "hardware_storagequal_external",
3: "storage_qual_v2_quick",
4: "storage_qual_v2_xs",
5: "storage_qual_v2_s",
6: "storage_qual_v2_m",
7: "storage_qual_v2_l",
8: "storage_qual_v2_xl",
}
HARDWARE_AVL_BUG_ID = "bug_id"
HARDWARE_AVL_PART_NO = "part_id"
STORAGE_QUAL_DISK_SIZE_GB = "tast_disk_size_gb"
STORAGE_QUAL_DUAL_NAMESPACE = "tast_storage_slc_qual"
STORAGE_QUAL_BLOCK_TIMEOUT = "tast_stress_block_timeout"
STORAGE_QUAL_FOLLOW_UP = "tast_followup_qual"
MOBLAB_DOCKER_REGISTRY = "gcr.io/chromeos-partner-moblab/"
MAX_BUILD_STAGING_SECONDS = 300 # 5 minutes
DUT_MANAGER_TO_AFE_DUT_STATUS_MAPPINGS = {
ManagedDut.ManagedState.VERIFY: "Verifying",
ManagedDut.ManagedState.LEASED: "Running",
ManagedDut.ManagedState.READY: "Ready",
ManagedDut.ManagedState.FAILED: "Repair Failed",
ManagedDut.ManagedState.RESET: "Resetting",
ManagedDut.ManagedState.PROVISION: "Provisioning",
ManagedDut.ManagedState.UNKNOWN: "Unknown",
}
_USE_DUT_MANAGER_FEATURE_FLAG = True # True
class MoblabRpcError(Exception):
pass
class LabelsParser(object):
_model_re = re.compile(r"^model:(.*)")
_build_target_re = re.compile(r"^board:(.*)")
_pool_re = re.compile(r"^pool:(.*)")
_build_target_model_re = re.compile(r"^buildTargets/(\w+)/models/(\w+)$")
@classmethod
def _extract_from_labels(cls, regexp, labels):
result = [
m.group(1) for m in [regexp.match(label) for label in labels] if m
]
return list(set(result))
@classmethod
def extract_model_from_labels(cls, labels):
try:
return cls._extract_from_labels(cls._model_re, labels)[0]
except IndexError:
logging.exception("Failed to parse model from label tags of DUT.")
return ""
@classmethod
def extract_build_target_from_labels(cls, labels):
try:
return cls._extract_from_labels(cls._build_target_re, labels)[0]
except IndexError:
logging.exception(
"Failed to parse build target from label tags of DUT."
)
return ""
@classmethod
def extract_pool_from_labels(cls, labels):
return cls._extract_from_labels(cls._pool_re, labels)
@classmethod
def extrat_model_build_target_from_api_path(cls, models):
"""Extracts model & build target from moblab API listModels path"""
build_target, model = None, None
match = cls._build_target_model_re.match(models)
if match:
build_target, model = match.group(1), match.group(2)
return (build_target, model)
@classmethod
def extract_model_and_build_target_from_labels(cls, labels):
"""Extract model & build target from labels"""
model = None
board = None
for label in labels:
model = (
cls._model_re.match(label).group(1)
if cls._model_re.match(label)
else model
)
board = (
cls._build_target_re.match(label).group(1)
if cls._build_target_re.match(label)
else board
)
return (model, board)
class MoblabService(object):
_last_config_update = None
_last_build_connector_update = None
_last_moblab_build_connector_update = None
def __init__(self):
self.afe_connector = afe_connector.AFEConnector()
self.config_connector = config_connector.MoblabConfigConnector(
self.afe_connector
)
self.setup_devserver_connector()
self.dut_connector = dut_connector.MoblabDUTConnector()
self.pause_service = HostSchedulerPauseService()
self.upload_status_connector = (
result_upload_status_connector.ResultUploadStatusConnector()
)
self.milestone_cache = Cache(maxsize=50)
self.build_version_cache = Cache(maxsize=30)
def _find_dut_id_given_ip(self, dut_hostname):
"""Given an ip address, will return the id given to the DUT by AFE.
Raises:
MoblabRpcError: if unable to find the DUT.
"""
try:
return self.afe_connector.get_connected_devices(
hostname=dut_hostname
)[0]["id"]
except (IndexError, KeyError):
msg = "Unable to find AFE id for dut: {}".format(dut_hostname)
logging.error(msg)
raise MoblabRpcError(msg)
def setup_devserver_connector(self):
self.moblab_bucket_name = self.config_connector.get_cloud_bucket()
logging.info("Using bucket: %s", self.moblab_bucket_name)
self._init_build_connector(raise_error=False)
def _init_build_connector(self, raise_error=True):
try:
self._build_connector = build_connector.MoblabBuildConnector(
self.moblab_bucket_name
)
except build_connector.MoblabBuildConnectorException:
logging.exception("Failed to initialize MoblabBuildConnector.")
if raise_error:
raise
def _init_moblab_build_connector(self, raise_error=True):
"""
Creates a MoblabBuildConnector for Moblab API calls to get build items.
Args:
raise_error: if true, will raise any exception encountered during
init attempt.
Returns:
MoblabBuildConnector.
"""
try:
self._moblab_build_connector = (
moblab_build_connector.MoblabBuildConnector()
)
except build_connector.MoblabBuildConnectorException:
logging.exception("Failed to initialize MoblabBuildConnector.")
if raise_error:
raise
@property
def build_connector(self):
"""
Returns a build_connector.MoblabBuildConnector instance ( distinct from
moblab_build_connector.MoblabBuildConnector ),
TODO(wangmichael) change naming so that difference between two
connectors is more clear.
"""
if (
not hasattr(self, "_build_connector")
or not self._build_connector
or self._last_config_update != self._last_build_connector_update
):
self.moblab_bucket_name = self.config_connector.get_cloud_bucket(
force_reload=True
)
self._init_build_connector()
self._last_build_connector_update = self._last_config_update
return self._build_connector
@property
def moblab_build_connector(self):
"""
Returns a moblab.build_connector.MoblabBuildConnector instance
( distinct from build_connector.MoblabBuildConnector ),
"""
if (
not hasattr(self, "_moblab_build_connector")
or not self._moblab_build_connector
or self._last_config_update
!= self._last_moblab_build_connector_update
):
self.moblab_bucket_name = self.config_connector.get_cloud_bucket(
force_reload=True
)
self._init_moblab_build_connector()
self._last_moblab_build_connector_update = self._last_config_update
return self._moblab_build_connector
def _is_moblab_bucket_in_asia(self):
if not self.moblab_bucket_name:
raise MoblabRpcError(
(
"Attempted to check bucket location while bucket has not"
" been configured."
)
)
return self._build_connector.is_moblab_bucket_in_asia(
self.moblab_bucket_name
)
def get_connected_devices(self):
connected_duts = self.dut_connector.get_connected_duts()
afe_formatted_duts = []
for dut in connected_duts:
afe_formatted_duts.append(
{"ip": dut[0], "isConnected": dut[1], "error": dut[2]}
)
return afe_formatted_duts
def _convert_managed_dut_to_dut_dict(self, managed_dut):
dut = {}
dut["hostname"] = managed_dut.name.ip_address
dut["status"] = DUT_MANAGER_TO_AFE_DUT_STATUS_MAPPINGS[
managed_dut.state
]
labels = [
a.id.value
for a in managed_dut.tag.dut_attributes
if not a.field_path
]
attributes = {
a.id.value: a.field_path
for a in managed_dut.tag.dut_attributes
if a.field_path
}
dut["labels"] = labels
dut["attributes"] = attributes
return dut
async def _list_duts_from_dut_manager(self):
managed_duts = await DutManagerRpcConnectorAsync.list_managed_duts()
duts = []
for managed_dut in managed_duts:
dut = self._convert_managed_dut_to_dut_dict(managed_dut)
duts.append(dut)
return duts
async def list_duts(self):
"""Returns the merge of enrolled and connected DUTs"""
managed_duts = (
await self._list_duts_from_dut_manager()
if _USE_DUT_MANAGER_FEATURE_FLAG
else self.afe_connector.get_connected_devices()
)
potential_hosts = self.get_connected_devices()
connected_ips = []
connected_duts = []
for dut in managed_duts:
dut["enrolled"] = True
connected_ips.append(dut["hostname"])
connected_duts.append(dut)
for dut in potential_hosts:
if dut["ip"] not in connected_ips:
dut["hostname"] = dut["ip"]
dut["enrolled"] = False
dut["status"] = "Not Enrolled"
connected_duts.append(dut)
self._hydrate_dut_details_with_mac(connected_duts)
return connected_duts
def _default_suite_timeout_mins(self, suite):
if suite in _SUITE_TIMEOUT_MAP:
return _SUITE_TIMEOUT_MAP[suite]
else:
return _DEFAULT_SUITE_TIMEOUT_MINS
def run_storage_qual_suite(
self,
build_target,
model,
milestone,
build_version,
bug_id,
part_id,
variation,
pool=None,
disk_size_gb=0,
is_dual_namespace=False,
is_pre_qualified=False,
):
"""Starts storage qual suite run. Formats arguments s.t. they can be
ingested by AFE.
Args:
build_target: Ex. octopus
model: String identifier for the specific model to be run.
milestone: String, Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
bug_id: Numeric string identifying the associated
buganizer issue.
part_id: Numeric string identifying relevant part number.
variation: RunStorageQualificationSuiteRequest.Variation
pool: String identifier for which pool of DUT's to run tests with.
disk_size_gb: Size of main storage disk (in whole GB).
is_dual_namespace: Dual-namespace test requested.
is_pre_qualified: Has the component been previously qualified.
Returns:
String response message.
"""
avl_args = {
HARDWARE_AVL_BUG_ID: bug_id,
HARDWARE_AVL_PART_NO: part_id,
STORAGE_QUAL_DISK_SIZE_GB: disk_size_gb,
STORAGE_QUAL_DUAL_NAMESPACE: is_dual_namespace,
}
# reduce the time block and specify to tast that it is follow up
# if the component is pre-qualified
if is_pre_qualified:
avl_args[STORAGE_QUAL_BLOCK_TIMEOUT] = "30m"
avl_args[STORAGE_QUAL_FOLLOW_UP] = True
return self.run_suite(
build_target,
model,
milestone,
build_version,
_STORAGE_QUAL_VARIATIONS[variation],
pool,
test_args=avl_args,
)
def run_memory_qual_suite(
self,
build_target,
model,
milestone,
build_version,
bug_id,
part_id,
pool=None,
):
"""Starts memory qual suite run. Formats arguments s.t. they can be
ingested by AFE.
Args:
build_target: Ex. octopus
model: String identifer for the specific model to be run.
milestone: String, Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
bug_id: Numeric string identifying the associated buganizer issue.
part_id: Numeric string identifying relevant part number.
pool: String identifier for which pool of DUT's to run tests with.
Returns:
String response message.
"""
avl_args = {HARDWARE_AVL_BUG_ID: bug_id, HARDWARE_AVL_PART_NO: part_id}
return self.run_suite(
build_target,
model,
milestone,
build_version,
"hardware_memoryqual", # suite
pool,
test_args=avl_args,
)
def run_cts_suite(
self,
build_target,
model,
milestone,
build_version,
suite,
pool=None,
autotest_tests=None,
):
"""Starts cts suite run. Formats arguments s.t. they can be ingested by
AFE.
Args:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
model: String identfier for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
specific_modules: List of strings of specific module names to
run.
Returns:
String response message.
"""
suite_args = {}
if autotest_tests and len(autotest_tests):
suite_args["tests"] = [test.strip() for test in autotest_tests]
return self.run_suite(
build_target,
model,
milestone,
build_version,
suite,
pool,
suite_args=suite_args,
)
def run_gts_suite(
self,
build_target,
model,
milestone,
build_version,
pool=None,
autotest_tests=None,
):
"""Starts gts suite run. Formats arguments s.t. they can be ingested by
AFE.
Args:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
model: String identifer for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
specific_modules: Comma-separated string of specific module names
to run.
Returns:
String response message.
"""
suite_args = {}
if autotest_tests:
suite_args["tests"] = [test.strip() for test in autotest_tests]
return self.run_suite(
build_target,
model,
milestone,
build_version,
"gts", # suite
pool,
suite_args=suite_args,
)
def run_faft_suite(
self,
build_target,
model,
milestone,
build_version,
suite,
pool=None,
tests=None,
):
"""Starts faft suite run. Formats arguments s.t. they can be ingested by
AFE.
Args:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
model: String identfier for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
tests: List of strings of specific test names to
run.
Returns:
String response message.
"""
suite_args = {}
if tests and len(tests):
suite_args["tests"] = [test.strip() for test in tests]
return self.run_suite(
build_target,
model,
milestone,
build_version,
suite,
pool,
suite_args=suite_args,
)
def run_fwupd_suite(
self,
build_target,
model,
milestone,
build_version,
suite,
pool=None,
test_args=None,
):
"""Starts fwupd suite run. Formats arguments s.t. they can be ingested by
AFE.
Args:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
model: String identfier for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
test_args: List of strings of specific test names to
run.
Returns:
String response message.
"""
autotest_args = {}
# this is expected to work, but is tested for now
if test_args and len(test_args):
for test_arg in test_args:
[key, val] = test_arg.split("=", 1)
autotest_args[key] = val
return self.run_suite(
build_target,
model,
milestone,
build_version,
suite,
pool,
suite_args=autotest_args,
test_args=autotest_args,
)
def run_suite(
self,
build_target,
model,
milestone,
build_version,
suite,
pool=None,
suite_args={},
test_args={},
suite_timeout_mins=None,
extra_builds={},
):
"""Starts suite run. Formats input arguments s.t. that they can be used
by an AFE suite run command.
Args:
build_target: Ex. octopus
milestone: Numeric string Ex. 80
build_version: A numeric version string of the form: '00000.00.0'
suite: String identifier for the suite to be run.
model: String identifer for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
suite_args: Dict containing key=val pairs to be used by suite
control file.
test_args: Dict containing key=val pairs to be used by test control
file.
Returns:
String response message.
"""
# additional tags -d/-a will appear in the build_version for the local
# builds. Quick way to check if it is local build
# Example for local build: R102-14590.0.0-d022_03_22_2322-a1
# try jobs does not have these -d/-a tags. So moving to a less
# stricter check
if "-" in build_version or "_" in build_version:
build = "{}-local/R{}-{}".format(
build_target, milestone, build_version
)
logging.info(
"maybe this is a local build version, going to check in folder:"
+ build
)
else:
build = "{}-release/R{}-{}".format(
build_target, milestone, build_version
)
if not self._is_build_staged(
build_version, build_target, milestone, model
):
self._stage_build(model, build_target, build_version)
builds = {"cros-version": build}
builds.update(extra_builds)
if not suite_timeout_mins:
suite_timeout_mins = self._default_suite_timeout_mins(suite)
dut_wifi_info = self.get_dut_wifi_info()
ap_name = dut_wifi_info.get("wifi_dut_ap_name", None)
ap_pass = dut_wifi_info.get("wifi_dut_ap_pass", "")
test_args["ssid"] = ap_name
test_args["wifipass"] = ap_pass
suite_job_id, error = self.afe_connector.run_suite(
build_target,
build,
builds,
suite,
suite_timeout_mins,
pool,
model,
suite_args=suite_args,
test_args=test_args,
)
if not suite_job_id:
raise MoblabRpcError("Encountered error starting suite")
return "Suite started (suite job ID: {})".format(suite_job_id)
async def provision_duts(self, pool, milestone, build_version):
ModelBoard = namedtuple("ModelBoard", ["model", "board"])
boards = {}
duts = await self.list_duts()
# filter only enrolled duts. Red dut shouldn't be enrolled.
duts = filter(lambda x: x.get("enrolled"), duts)
available_boards = self.list_build_targets()
for dut in duts:
labels = dut.get("labels", [])
pools = LabelsParser.extract_pool_from_labels(labels)
model = LabelsParser.extract_model_from_labels(labels)
board = LabelsParser.extract_build_target_from_labels(labels)
# filter yellow duts.
if not board or (board and board not in available_boards):
continue
if pool and pool not in pools:
continue
modelBoardPair = ModelBoard(model=model, board=board)
if modelBoardPair in boards:
boards[modelBoardPair] += 1
else:
boards[modelBoardPair] = 1
messages = []
for modelBoardPair, number in boards.items():
try:
self.run_suite(
modelBoardPair.board,
modelBoardPair.model,
milestone,
build_version,
"provision",
pool=pool,
suite_args={"num_required": number},
)
except MoblabRpcError as ex:
messages.append(str(ex))
return ", ".join(messages)
"""
Check duts doesn't flash a test image.
If there has one dut without test image, it will raise exception. Otherwise, this function does nothings.
"""
def check_duts_without_test_image(self, ips):
# test duts by given ips. Saving call `find_duts` time.
disconnected_dut_ips = list(
map(
lambda dut: dut[0],
filter(
lambda dut: not dut[1],
self.dut_connector.get_duts(ip_addresses=ips),
),
)
)
for ip in disconnected_dut_ips:
raise MoblabRpcError("dut (ip: %s) without test image" % ip)
async def enroll_duts(self, ip_addresses):
"""
There have two ways to handle dut without test image.
1. rasie exception.
2. filter dut without test image from request ips.
"""
self.check_duts_without_test_image(ip_addresses)
if _USE_DUT_MANAGER_FEATURE_FLAG:
tasks = [
DutManagerRpcConnectorAsync.create_managed_dut(ip)
for ip in ip_addresses
]
await asyncio.gather(*tasks, return_exceptions=True)
else:
self.afe_connector.enroll_duts(ip_addresses)
self.pause_service.ensure_duts_pause_state(list(ip_addresses))
def unenroll_duts(self, ip_addresses):
if _USE_DUT_MANAGER_FEATURE_FLAG:
for ip in ip_addresses:
DutManagerRpcConnector.delete_managed_dut(ip)
else:
self.afe_connector.unenroll_duts(ip_addresses)
def list_connected_duts_firmware(self):
duts = self.dut_connector.get_connected_dut_firmware()
return duts
def _get_managed_dut_from_dut_manager(self, dut_ip):
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_ip)
dut = self._convert_managed_dut_to_dut_dict(managed_dut)
return dut
def get_dut_details(self, dut: str):
dut_details = (
self._get_managed_dut_from_dut_manager(dut)
if _USE_DUT_MANAGER_FEATURE_FLAG
else self.afe_connector.get_connected_devices(
dut, get_current_jobs=True
)[0]
)
self._hydrate_dut_details_with_mac([dut_details])
return dut_details
def _hydrate_dut_details_with_mac(self, duts_details):
dut_ip_to_mac_map = {
ip: mac
for (
ip,
mac,
) in self.dut_connector.get_all_known_mac_addresses().items()
}
for dut in duts_details:
if dut["hostname"] in dut_ip_to_mac_map:
dut["mac"] = dut_ip_to_mac_map[dut["hostname"]]
def update_duts_firmware(self, ip_addresses):
return self.dut_connector.update_firmware(ip_addresses)
def _is_build_staged(self, build_version, build_target, milestone, model):
"""
Checks if targeted build is staged. Defaults to checking staging
status via Moblab API, falls back to checking the configured GCS
bucket directly.
Args:
build_target (string): AKA board of the target DUT.
build_version (string): Build version to be applied to DUT for
testing.
milestone (string): Milestone the build version belongs to.
model (string): Model of the DUT.
Returns:
Boolean which is true iff build is already staged on Moblab.
"""
try:
return self.moblab_build_connector.check_build_stage_status(
build_target, model, build_version, self.moblab_bucket_name
)
except google.api_core.exceptions.GoogleAPIError:
logging.exception(
(
"Encountered API error while checking build stage status "
"for {}/{} on model {}"
).format(build_target, build_version, model)
)
return (
build_version
in self.build_connector.get_builds_for_milestone(
build_target, milestone
)
)
def _stage_build(self, model, build_target, build_version):
"""
Applies the specified build artifacts to the bucket configured
for this Moblab.
Args:
model (string): Model of the target DUT.
build_target (string): AKA board of the target DUT.
build_version (string): Build version to be applied to DUT for
testing.
Returns:
gcs bucket name configured on this moblab.
"""
self.moblab_build_connector.stage_build(
build_target, model, build_version, self.moblab_bucket_name
)
sleep_interval = 1
end_time = time.time() + MAX_BUILD_STAGING_SECONDS
while time.time() < end_time:
if self.moblab_build_connector.check_build_stage_status(
build_target, model, build_version, self.moblab_bucket_name
):
return self.moblab_bucket_name
time.sleep(sleep_interval)
# every failure, increase wait interval by 1.5 times
# ( exponential backoff, of sorts )
sleep_interval *= 1.5
raise MoblabRpcError(
"Build stage did not finish within {} seconds.".format(
MAX_BUILD_STAGING_SECONDS
)
)
def list_usable_build_targets(self, model):
"""
Returns build targets associated with the given model.
Args:
mode(str): DUT model to return associated build_target(s) for.
Returns:
List of build target strings.
"""
afe_hosts = self.afe_connector.get_connected_devices()
build_targets = set()
for host in afe_hosts:
if (
LabelsParser.extract_model_from_labels(host.get("labels", []))
== model
):
build_targets.add(
LabelsParser.extract_build_target_from_labels(
host.get("labels", [])
)
)
return list(build_targets)
def list_build_targets(self):
"""List all build targets that this account has access to.
Returns:
List of build target strings.
"""
try:
return self.moblab_build_connector.list_build_targets()
except moblab_build_connector.MoblabBuildConnectorException:
logging.exception(
"Encountered error trying to list build targets."
)
def list_connected_build_targets(self):
"""List all connected build targets on this moblab.
Returns:
List of build target strings.
"""
afe_hosts = self.afe_connector.get_connected_devices()
build_targets = set()
for host in afe_hosts:
build_targets.add(
LabelsParser.extract_build_target_from_labels(
host.get("labels", [])
)
)
return list(build_targets)
def list_models(self, build_target):
"""Lists all models for the given build target.
If build target is set to '*' it will list models for all build targets.
Returns:
Dictionary of models as keys and build targets set as value.
"""
try:
models_response = self.moblab_build_connector.list_models(
build_target
)
results = defaultdict(set)
for m in models_response:
(
build_target,
model,
) = LabelsParser.extrat_model_build_target_from_api_path(m)
if model and build_target:
results[model].add(build_target)
return results
except moblab_build_connector.MoblabBuildConnectorException:
logging.exception(
"Encountered error trying to list models for build target: {}.".format(
build_target
)
)
def _key_getter(self, data):
s = ""
if "model" in data:
s += data["model"]
if "build_target" in data:
s += data["build_target"]
if "milestone" in data:
s += data["milestone"]
if "label" in data:
s += data["label"]
return hash(s)
def list_usable_milestones(self, build_target, model, force_update=False):
"""
Returns milestones available for a given model/build_target, both
staged and unstaged (if target bucket is in US, otherwise will only
return staged milestones).
In this context, 'remote' means unstaged/from API call (i.e., remote
milestones are those that are fetched from the Moblab API rather than
from existing build items staged in the partner bucket ).
Args:
build_target(string): build_target (AKA board) to fetch milestones
for.
model(string): model to fetch milestones for.
force_update: force update the cache.
Returns:
A list of tuples where the first item is the milestone and the
second is a boolean which is true iff at least one build of the
milestone is staged.
"""
def _list_usable_milestones():
bucket_milestones = self.build_connector.get_milestones_available(
build_target
)
remote_milestones = []
if not self._is_moblab_bucket_in_asia():
try:
remote_milestones = (
self.moblab_build_connector.list_available_milestones(
build_target,
model,
)
)
except moblab_build_connector.MoblabBuildConnectorException:
logging.exception(
"Encountered error trying to fetch remote milestones."
)
milestone_results = [
MilestonePair(milestone=milestone, is_staged=True)
for milestone in bucket_milestones
]
milestone_results.extend(
[
MilestonePair(milestone=milestone, is_staged=False)
for milestone in list(
set(remote_milestones).difference(
set(bucket_milestones)
)
)
]
)
milestone_results.sort(
key=lambda x: int(x[0].strip("R")), reverse=True
)
return BuildResults(
results=milestone_results,
is_incomplete=(
len(remote_milestones)
== moblab_build_connector.DEFAULT_PAGE_SIZE
),
)
# filter boards don't have a permission
if build_target not in self.list_build_targets():
return BuildResults(results=[], is_incomplete=False)
return self.milestone_cache.get(
key=self._key_getter(
{"model": model, "build_target": build_target}
),
or_default=_list_usable_milestones,
force_update=force_update,
)
def list_usable_build_versions(
self, build_target, milestone, model, label, force_update=False
):
"""
Returns build versions available for a given
model/build_target/milestone, both staged and unstaged (if target
bucket is in US, otherwise will only return staged build versions).
In this context, 'remote' means unstaged/from API call (i.e., remote
build versions are those that are fetched from the Moblab API rather
than from existing build items staged in the partner bucket ).
Args:
build_target(string): build_target (AKA board) to return build
versions for.
model(string): model to return build versions for.
milestone(string): milestone to return build versions for.
Returns:
A list of tuples where the first item is the build_version and the
second is a boolean which is true iff the build is staged.
"""
def _list_usable_build_versions():
bucket_build_versions = (
self.build_connector.get_builds_for_milestone(
build_target, milestone
)
)
build_version_results = [
BuildVersionPair(
build_version=build_version,
status=moblab_build_connector.BuildStatus.AVAILABLE,
is_staged=True,
labels=[],
)
for build_version in bucket_build_versions
]
remote_build_versions = []
if not self._is_moblab_bucket_in_asia():
try:
remote_build_versions = (
self.moblab_build_connector.list_builds_for_milestone(
build_target,
model,
milestone,
label,
)
)
except moblab_build_connector.MoblabBuildConnectorException:
logging.exception(
"Encountered error trying to fetch remote build versions."
)
if label:
build_version_results = []
staged_builds_set = set(bucket_build_versions)
for build_version in remote_build_versions:
if build_version.version not in staged_builds_set:
build_version_results.append(
BuildVersionPair(
build_version=build_version.version,
status=build_version.status,
is_staged=False,
labels=build_version.labels,
)
)
build_version_results.sort(
key=lambda x: Version(x[0]),
reverse=True,
)
return BuildResults(
results=build_version_results,
is_incomplete=(
len(remote_build_versions)
== moblab_build_connector.DEFAULT_PAGE_SIZE
),
)
data = {
"model": model,
"build_target": build_target,
"milestone": milestone,
"label": label,
}
return self.build_version_cache.get(
key=self._key_getter(data),
or_default=_list_usable_build_versions,
force_update=force_update,
)
def get_num_jobs(
self,
id_filter=None,
name_filter=None,
created_time_lt=None,
created_time_gt=None,
sub=False,
suite=False,
not_yet_run=False,
running=False,
finished=False,
completed_status=None,
parent_id_filter=None,
dut_hostname_filter=None,
):
# TO_DO(wangmichael): bifurcating on dut_filter is necessary because
# the Autotest backend that we currently invoke only accepts dut_filter
# on calls to get_num_host_queue_entries, not get_num_jobs. Even
# though, in both cases, the results are packaged and returned as jobs.
# This may change when Autotest RPC itself is refactored.
if dut_hostname_filter:
try:
dut_id = self.afe_connector.get_connected_devices(
hostname=dut_hostname_filter
)[0]["id"]
except (IndexError, KeyError):
logging.error(
"Unable to find AFE id for dut: {}".format(
dut_hostname_filter
)
)
return
return self.afe_connector.get_num_host_queue_entries(
host_id=dut_id
)
else:
return self.afe_connector.get_num_jobs(
id_filter,
name_filter,
created_time_lt,
created_time_gt,
sub,
suite,
not_yet_run,
running,
finished,
completed_status,
parent_id_filter,
)
def get_jobs(
self,
query_start,
query_limit,
id_filter=None,
name_filter=None,
created_time_lt=None,
created_time_gt=None,
sub=False,
suite=False,
not_yet_run=False,
running=False,
finished=False,
completed_status=None,
parent_id_filter=None,
dut_hostname_filter=None,
sort_by=None,
):
# TO_DO(wangmichael): bifurcating on dut_filter is necessary because
# the Autotest backend that we currently invoke only accepts dut_filter
# on calls to get_host_queue_entries, not get_jobs. Even though,
# in both cases, the results are packaged and returned as jobs. This
# may change when Autotest RPC itself is refactored.
jobs = []
if dut_hostname_filter:
try:
dut_id = self.afe_connector.get_connected_devices(
hostname=dut_hostname_filter
)[0]["id"]
except (IndexError, KeyError):
logging.error(
"Unable to find AFE id for dut: {}".format(
dut_hostname_filter
)
)
return
hqes, _ = self.afe_connector.get_host_queue_entries(
host_id=dut_id,
query_start=query_start,
query_limit=query_limit,
sort_by=sort_by,
)
# Extracting jobs information from HQE entries.
for hqe in hqes:
job = hqe["job"]
host = hqe.get("host")
if host:
job["hostname"] = host.get("hostname", "--")
job["status"] = hqe.get("status")
jobs.append(job)
else:
jobs = self.afe_connector.get_jobs(
query_start,
query_limit,
id_filter,
name_filter,
created_time_lt,
created_time_gt,
sub,
suite,
not_yet_run,
running,
finished,
completed_status,
parent_id_filter,
sort_by,
)
job_ids = [job["id"] for job in jobs]
jobs_upload_status = (
self.upload_status_connector.get_jobs_upload_status(job_ids)
)
for job in jobs:
if job["id"] in jobs_upload_status:
state = jobs_upload_status[job["id"]]
job["upload_status"] = state.status
job["attempt_number"] = state.attempt_number
job["last_falure_reason"] = state.last_falure_reason
return jobs
def get_job_ids(
self,
query_start,
query_limit,
id_filter=None,
name_filter=None,
created_time_lt=None,
created_time_gt=None,
sub=False,
suite=False,
not_yet_run=False,
running=False,
finished=False,
completed_status=None,
parent_id_filter=None,
dut_hostname_filter=None,
):
"""Returns the job ID's that match the request filters.
Args:
query_start: Int index for start of returned values.
query_limit: Int for max number of values to return.
id_filter: String filter for job id.
name_filter: String filter for job name.
created_time_lt: A unix timestamp in seconds granularity the sets
upper bound for jobs returned.
created_time_gt: A unix timestamp in seconds granularity the sets
lower bound for jobs returned.
sub: Boolean, true iff child jobs to be returned.
suite: Boolean, true iff parent jobs to be returned.
not_yet_run: Boolean, true iff queued jobs to be returned.
running: Boolean, true iff running jobs to be returned.
finished: Boolean, true iff only finished jobs to be returned.
parent_id_filter: String filter for parent job id.
dut_hostname_filter: String filter for DUT associated with job.
Returns:
List of job ID numbers.
"""
return [
job["id"]
for job in self.get_jobs(
query_start,
query_limit,
id_filter,
name_filter,
created_time_lt,
created_time_gt,
sub,
suite,
not_yet_run,
running,
finished,
completed_status,
parent_id_filter,
dut_hostname_filter,
)
]
def abort_jobs(self, job_ids):
result = self.afe_connector.abort_host_queue_entries(job_ids)
if result:
aborted_jobs = [str(item["Job"]) for item in result]
if len(aborted_jobs) > 15:
return (
"Aborted {} jobs, please refresh table" " to see results."
).format(len(aborted_jobs))
else:
return (
"Aborted the following jobs: {}\nPlease refresh "
"table to see results."
).format(", ".join(aborted_jobs))
else:
return (
"No jobs aborted (were selected jobs already "
"aborted/finished)?"
)
def get_job_details(self, job_id):
return self.afe_connector.get_jobs_by_ids([job_id])
def get_job_history(self, job_id):
return self.afe_connector.get_job_history(job_id)
def get_associated_duts(self, job_id):
hqes, _ = self.afe_connector.get_host_queue_entries(job_id=job_id)
return hqes
def get_num_dut_tasks(self, ip, start_time_gt, end_time_lt):
"""
Given a DUT's IP, returns a number of tasks (e.g. repairs,
reverifies) associated with that DUT.
Args:
ip (string): DUT IP.
start_time_gt(int): Seconds timestamp of a filter to DUT tasks by
start time.
end_time_lt(int): Seconds timestamp of a filter to DUT tasks by
end time.
Returns:
Returns number of task associated with DUT which match query
parameters. If DUT itself is not found, returns -1.
"""
try:
if _USE_DUT_MANAGER_FEATURE_FLAG:
return len(
self._get_dut_tasks_from_dut_manager(
ip, start_time_gt, end_time_lt
)
)
dut_id = self.afe_connector.get_connected_devices(hostname=ip)[0][
"id"
]
return self.afe_connector.get_num_special_tasks(
host_id_list=[dut_id],
start_time_gt=start_time_gt,
end_time_lt=end_time_lt,
)
except (IndexError, KeyError):
logging.error("Unable to find AFE id for dut: {}".format(dut_id))
return -1
def _get_dut_tasks_from_dut_manager(
self, dut_ip: str, query_start: int, query_limit: int
):
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_ip)
tasks = managed_dut.history
if query_limit and query_start + query_limit <= len(tasks):
query_end = query_start + query_limit
tasks = tasks[query_start:query_end]
else:
tasks = tasks[query_start:]
result = []
for t in tasks:
task = json.loads(t.note)
task_dict = {}
task_dict["task"] = DUT_MANAGER_TO_AFE_DUT_STATUS_MAPPINGS[t.state]
task_dict["time_started"] = str(t.start_time.ToDatetime())
task_dict["time_finished"] = str(t.end_time.ToDatetime())
task_dict["lease_owner"] = str(t.lease_owner)
task_dict["note"] = t.note
task_dict["task_log_id"] = "%s-%s" % (task["id"], task["task"])
result.append(task_dict)
return result
def get_dut_tasks(
self, ip, query_start, query_limit, start_time_gt, end_time_lt
):
"""
Given a DUT's IP, returns a list of tasks (e.g. repairs,
reverifies) requested for that DUT. If DUT is not found, returns
None.
Args:
query_start(int): Parameter specifying index on where to start when
returning paged results.
query_limit(int): Upper limit on number of results to return.
start_time_gt(int): Seconds timestamp of a filter to DUT tasks by
start time.
end_time_lt(int): Seconds timestamp of a filter to DUT tasks by
end time.
Returns:
A list of dicts, each dict specifying a task associated with the
queried DUT. If DUT is not found returns None.
"""
try:
if _USE_DUT_MANAGER_FEATURE_FLAG:
return self._get_dut_tasks_from_dut_manager(
ip, query_start, query_limit
)
dut_id = self.afe_connector.get_connected_devices(hostname=ip)[0][
"id"
]
return self.afe_connector.get_special_tasks(
host_id_list=[dut_id],
query_start=query_start,
query_limit=query_limit,
start_time_gt=start_time_gt,
end_time_lt=end_time_lt,
)
except (IndexError, KeyError):
logging.error("Unable to find AFE id for dut: {}".format(dut_id))
def add_label_to_managed_dut(self, dut_hostname, label):
# Get the current instance of managed dut and append the label as tag
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname)
add_label = DutAttribute(id=DutAttribute.Id(value=label))
managed_dut.tag.dut_attributes.append(add_label)
field_mask = FieldMask(paths=["tag"])
DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask)
return True, "" # Status, Message
def remove_label_from_managed_dut(self, dut_hostname, label):
# Get the current instance of managed dut and remove the label as tag
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname)
remove_label = DutAttribute(id=DutAttribute.Id(value=label))
try:
managed_dut.tag.dut_attributes.remove(remove_label)
except ValueError:
return False, "The label {} does not exist to remove".format(label)
field_mask = FieldMask(paths=["tag"])
DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask)
return True, ""
def add_attribute_to_managed_dut(self, dut_hostname, key, value):
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname)
add_attribute = DutAttribute(
id=DutAttribute.Id(value=key), field_path=value
)
managed_dut.tag.dut_attributes.append(add_attribute)
field_mask = FieldMask(paths=["tag"])
DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask)
return True, ""
def remove_attribute_from_managed_dut(self, dut_hostname, key):
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_hostname)
# Find the value given the key, this is needed to distinguish
# attributes from labels
values = [
tag.field_path
for tag in managed_dut.tag.dut_attributes
if tag and tag.id.value == key
]
if not values:
return (
False,
"The attribute key {} does not exist to remove".format(key),
)
remove_attribute = DutAttribute(
id=DutAttribute.Id(value=key), field_path=values[0]
)
managed_dut.tag.dut_attributes.remove(remove_attribute)
field_mask = FieldMask(paths=["tag"])
DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask)
return True, ""
def add_pool_to_managed_dut(self, dut_identifier, pool_name):
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_identifier)
add_pool = Pool(name=pool_name)
managed_dut.pool.append(add_pool)
field_mask = FieldMask(paths=["pool"])
DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask)
return True, ""
def remove_pool_from_managed_dut(self, dut_identifier, pool_name):
managed_dut = DutManagerRpcConnector.get_managed_dut(dut_identifier)
remove_pool = Pool(name=pool_name)
try:
managed_dut.pool.remove(remove_pool)
except ValueError:
return False, "The pool {} does not exist to remove".format(
pool_name
)
field_mask = FieldMask(paths=["pool"])
DutManagerRpcConnector.update_managed_dut(managed_dut, field_mask)
return True, ""
def _apply_edit_to_duts(
self, dut_hostnames, afe_edit_func, success_msg, *args
):
"""
A helper function which contains the repeated logic involved in
applying an edit to multiple DUTs. This logic is repeated for
adding and removing attributes and labels.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames of the DUT's to be targeted.
afe_edit_func((*args) => (tuple(bool,string))): A reference to a
function of AFEConnector which takes in *args and returns a
tuple containing a bool indicating success and a results
message string. This function does the 'edit' to the DUT's.
success_msg(int): A format-able string to be returned in success
cases; It should have exactly as many format indexes ({}) as
there are arguments in *args.
*args: Arguments which are meant to be passed into afe_edit_func.
Returns:
String message summarizing results.
"""
failures = {}
for dut_hostname in dut_hostnames:
is_success, message = afe_edit_func(dut_hostname, *args)
if not is_success:
failures[dut_hostname] = message
response_message = success_msg.format(
*args, len(dut_hostnames) - len(failures.keys())
)
if failures:
response_message += "Failures on the following DUTs:\n"
for dut_hostname, message in failures.items():
response_message += "{}: {}\n".format(dut_hostname, message)
return response_message
def add_attribute_to_duts(self, dut_hostnames, key, value):
"""
Adds attribute key:value to the selected dut_hostnames. Returns a
message summarizing how many DUT's the attribute was successfully
added to as well for which DUT's the addition failed.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames
of the DUT's to be targeted.
key (string): Key of attribute to be added.
value (string): Value of attribute to be added.
Returns:
String message summarizing results.
"""
return self._apply_edit_to_duts(
dut_hostnames,
self.add_attribute_to_managed_dut
if _USE_DUT_MANAGER_FEATURE_FLAG
else self.afe_connector.add_attribute_to_host,
"Successfully added attribute {}:{} to {} DUT(s)\n",
key,
value,
)
def remove_attribute_from_duts(self, dut_hostnames, key):
"""
Removes attribute (referenced by key) from the selected
dut_hostnames. Returns a message summarizing how many DUT's the
attribute was successfully removed from as well for which DUT's the
remove failed.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames of the DUT's to be targeted.
key (string): Key of attribute to be remove.
value (string): Value of attribute to be remove.
Returns:
String message summarizing results.
"""
return self._apply_edit_to_duts(
dut_hostnames,
self.remove_attribute_from_managed_dut
if _USE_DUT_MANAGER_FEATURE_FLAG
else self.afe_connector.remove_attribute_from_host,
"Successfully removed attribute {} from {} DUT(s)\n",
key,
)
def add_label_to_duts(self, dut_hostnames, label):
"""
Adds label to the selected dut_hostnames. Returns a message
summarizing how many DUT's the label was successfully added to as
well for which DUT's the addition failed.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames of the DUT's to be targeted.
label (string): Label to be added.
Returns:
String message summarizing results.
"""
return self._apply_edit_to_duts(
dut_hostnames,
self.add_label_to_managed_dut
if _USE_DUT_MANAGER_FEATURE_FLAG
else self.afe_connector.add_label_to_host,
"Successfully added label {} to {} DUT(s)\n",
label,
)
def remove_label_from_duts(self, dut_hostnames, label):
"""
Remove label from the selected dut_hostnames. Returns a message
summarizing how many DUT's the label was successfully removed from
as well as for which DUT's the remove failed.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames of the DUT's to be targeted.
label (string): Label to be removed.
Returns:
String message summarizing results.
"""
return self._apply_edit_to_duts(
dut_hostnames,
self.remove_label_from_managed_dut
if _USE_DUT_MANAGER_FEATURE_FLAG
else self.afe_connector.remove_label_from_host,
"Successfully removed label {} from {} DUT(s)\n",
label,
)
def add_pool_to_duts(self, dut_hostnames, pool):
"""
Adds pool to the selected dut_hostnames. Returns a message
summarizing how many DUT's the pool was successfully added to as
well for which DUT's the addition failed.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames of the DUT's to be targeted.
pool (string): Pool to be added.
Returns:
String message summarizing results.
"""
if not _USE_DUT_MANAGER_FEATURE_FLAG:
return self.add_label_to_duts(dut_hostnames, "pool:" + pool)
return self._apply_edit_to_duts(
dut_hostnames,
self.add_pool_to_managed_dut,
"Successfully added pool {} to {} DUT(s)\n",
pool,
)
def remove_pool_from_duts(self, dut_hostnames, pool):
"""
Remove pool from the selected dut_hostnames. Returns a message
summarizing how many DUT's the pool was successfully removed from
as well as for which DUT's the remove failed.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames of the DUT's to be targeted.
pool (string): Pool to be removed.
Returns:
String message summarizing results.
"""
if not _USE_DUT_MANAGER_FEATURE_FLAG:
return self.remove_label_from_duts(dut_hostnames, "pool:" + pool)
return self._apply_edit_to_duts(
dut_hostnames,
self.remove_pool_from_managed_dut,
"Successfully removed pool {} from {} DUT(s)\n",
pool,
)
def reverify_host(self, dut_hostnames):
"""Starts repair run on given DUTs hostnames.
Args:
dut_hostnames(string[]): A list of strings representing the
hostnames of the DUT's to be targeted.
"""
self.afe_connector.reverify_host(dut_hostnames)
return "Successfully set off reverify on {}".format(dut_hostnames)
def repair_host(self, dut_hostname):
"""Starts repair run on given DUT hostname.
Args:
dut_hostname(string): String hostname (ex. IP address)
"""
dut_id = self._find_dut_id_given_ip(dut_hostname)
if dut_id:
self.afe_connector.repair_host(dut_id)
return "Successfully set off repair on {}".format(dut_hostname)
def get_cloud_configuration(self):
"""
Gets cloud configuration info.
Returns:
A dict of cloud configuration info specifically,
'gs_access_key_id', 'gs_secret_access_key', 'image_storage_server'
"""
return self.afe_connector.get_cloud_storage_info()
def get_dut_wifi_info(self):
"""Gets DUT wifi info.
Returns:
A dict with dut_wifi_name, dut_wifi_password
"""
return self.afe_connector.get_dut_wifi_info()
def set_dut_wifi_info(self, dut_wifi_name, dut_wifi_password):
"""
Attempts to set DUT wifi on Moblab.
Args:
dut_wifi_name (string): Wifi name
dut_wifi_password (string): Wifi password
Returns:
A result message string.
"""
if self.afe_connector.set_configuration_info(
dut_wifi_name=dut_wifi_name,
dut_wifi_password=dut_wifi_password,
):
return "Successfully applied DUT Wifi info changes."
else:
return "Failed to apply DUT Wifi info changes."
def _can_change_cloud_config(self):
return self.get_num_jobs() == 0
def _cloud_config_changing(
self,
new_is_cloud_enabled,
new_boto_key_id,
new_boto_key_secret,
new_gcs_bucket_url,
):
old_config = self.get_cloud_configuration()
old_is_cloud_enabled = old_config.get("is_cloud_enabled", False)
if old_is_cloud_enabled and not new_is_cloud_enabled:
return False
old_boto_key_id = old_config.get("gs_access_key_id", "")
old_boto_key_secret = old_config.get("gs_secret_access_key", "")
old_gcs_bucket_url = old_config.get("image_storage_server", "")
if (
old_boto_key_id != new_boto_key_id
or old_boto_key_secret != new_boto_key_secret
or old_gcs_bucket_url != new_gcs_bucket_url
):
return True
return False
def set_cloud_configuration(
self,
is_cloud_enabled,
boto_key_id,
boto_key_secret,
gcs_bucket_url,
is_remote_agent_enabled,
is_remote_command_enabled,
):
"""
Attempts to set configuration values to Moblab.
Args:
boto_key_id (string): boto ID string
boto_key_secret (string): boto secret string
gcs_bucket_url (string): GCS bucket URL
Returns:
A result message string.
"""
if is_cloud_enabled:
if (
not self._can_change_cloud_config()
and self._cloud_config_changing(
is_cloud_enabled,
boto_key_id,
boto_key_secret,
gcs_bucket_url,
)
):
logging.warn(
"Attempt to modify Cloud Config after it was used."
)
return (
"Cloud configuration changes are not allowed. "
"Please powerwash the Moblab to set new configuration."
)
is_valid, details = self.afe_connector.validate_cloud_storage_info(
boto_key_id, boto_key_secret, gcs_bucket_url
)
if is_valid:
if self.afe_connector.set_configuration_info(
is_cloud_enabled,
boto_key_id,
boto_key_secret,
gcs_bucket_url,
):
message = (
"Successfully applied cloud configuration values. "
)
self._last_config_update = datetime.utcnow()
self.moblab_bucket_name = (
self.config_connector.get_cloud_bucket(
force_reload=True
)
)
if self.set_is_remote_agent_enabled(
is_remote_agent_enabled
):
message += "Remote agent has been {}. ".format(
"enabled"
if is_remote_agent_enabled
else "disabled"
)
else:
message += (
"But failed to set remote agent as {}. ".format(
"enabled"
if is_remote_agent_enabled
else "disabled"
)
)
if self.set_is_remote_command_enabled(
is_remote_command_enabled
):
message += "Remote command has been {}.".format(
"enabled"
if is_remote_command_enabled
else "disabled"
)
else:
message += (
"But failed to set remote command as {}.".format(
"enabled"
if is_remote_command_enabled
else "disabled"
)
)
return message
else:
return "Failed to apply cloud configuration changes."
else:
return details
else:
if self.afe_connector.set_configuration_info(is_cloud_enabled):
return "Successfully disabled cloud configuration."
else:
return "Failed to commit disable cloud change."
def get_is_update_available(self):
"""
Check to see if there is a system update or any containers that
have updated images.
Scan the running containers, if the watchtower program has pulled
down a newer image then the container to image reference tag
is broken.
Detect these image tag link breaks and assume that means we have
an update. Looking through update tools and forums, it seems this
is the way all update tools work.
Returns:
True iff there is an update available.
"""
try:
response = (
host_connector.HostServicesConnector.get_system_update_status()
)
if response.current_op.find("IDLE") == -1:
return True
except host_connector.HostServicesException:
logging.exception("Get system update status call failed.")
client = docker.from_env(timeout=300)
for container in client.containers.list():
if len(container.image.tags) == 0:
return True
return False
def _update_docker(self):
"""
Sets off an update of all docker containers, using watchtower
Returns: String result message.
"""
client = docker.from_env(timeout=600)
try:
container = client.containers.get("update")
raise MoblabRpcError("Already updating please wait.")
except docker.errors.NotFound:
pass
label = os.environ.get("WATCHTOWER_TAG", "release")
container = client.containers.run(
MOBLAB_DOCKER_REGISTRY + "watchtower:%s" % label,
volumes=["/var/run/docker.sock:/var/run/docker.sock"],
name="update",
detach=True,
command="--run-once --include-restarting",
)
try:
container.wait(timeout=600)
except requests.exceptions.ReadTimeout:
logging.exception("Watchtower update raised an exception.")
container.stop()
finally:
container.remove()
return "Update finished."
def _update_system(self):
"""
Initiates system update if it is available.
Returns:
An error message.
"""
try:
response = (
host_connector.HostServicesConnector.install_system_update()
)
return response.error
except host_connector.HostServicesException:
logging.exception("Install system update raised an exception.")
return "Failed to install update."
def update_moblab(self):
"""
Initiates a system update if available,
otherwise kicks docker images update.
Returns:
Update status string.
"""
return_message = self._update_system()
# if a message was returned then reboot was not requested
if return_message:
return self._update_docker()
return "Updating system."
def get_is_remote_agent_enabled(self):
"""
Check to see if the remote agent is enabled.
Returns:
True iff remote agent is enabled.
"""
return self.config_connector.get_is_remote_agent_enabled()
def set_is_remote_agent_enabled(self, is_enabled):
"""
Attempts to set if the remote agent is enabled.
Args:
is_enabled (bool): enable the remote agent or not
Returns:
True iff the set operation successfully.
"""
return self.afe_connector.set_is_remote_agent_enabled(is_enabled)
def get_is_remote_command_enabled(self):
"""
Check to see if the remote agent is enabled.
Returns:
True iff remote agent is enabled.
"""
return self.config_connector.is_remote_task_scheduler_enabled()
def set_is_remote_command_enabled(self, is_enabled):
"""
Attempts to set if the remote command is enabled.
Args:
is_enabled (bool): enable the remote command or not
Returns:
True iff the set operation successfully.
"""
return self.afe_connector.set_is_remote_command_enabled(is_enabled)
def download_service_account(self):
"""Download the service account from the account bucket.
To avoid users having to enter two different credentials, they enter
the boto key and secret, then we download the API key from a known
file in their bucket.
This function downloads that file, as it is getting the API key we
have to use gsutil vs cloud storage API to download.
"""
self.moblab_bucket_name = self.config_connector.get_cloud_bucket()
logging.info("Using bucket: %s", self.moblab_bucket_name)
if not self.moblab_bucket_name:
raise MoblabRpcError(
"Need bucket name to copy service account from."
)
try:
subprocess.check_call(
[
"gsutil",
"cp",
"gs://%s/%s"
% (
self.moblab_bucket_name,
"pubsub-key-do-not-delete.json",
),
"/home/moblab/.service_account.json",
],
)
except subprocess.CalledProcessError:
logging.exception(
"Failed to get the service account from %s",
self.moblab_bucket_name,
)
raise MoblabRpcError(
"Failed to get the service account from %s"
% self.moblab_bucket_name,
)
def add_servo(self, dut_hostname, servo_serial_number):
"""Configure a DUT to have a servo connected to it.
DUT's can be connected to the moblab via a servo, for the testing
infrastructure to work there needs to be attributes added to the DUT
to specify the servo serial number, servo hostname and servo host port.
Serial number is provided by the user hostname and port are generated.
Then verify is called which collects informationtion about the DUT
similar to how add dut works, labels like servo_state:WORKING will be
added to the dut by that process.
"""
self.add_attribute_to_managed_dut(
dut_hostname, "servo_serial", servo_serial_number
)
self.add_attribute_to_managed_dut(dut_hostname, "servo_port", "9999")
self.add_attribute_to_managed_dut(
dut_hostname, "servo_host", "%s_docker_servod" % dut_hostname
)
return self.reverify_host([dut_hostname])