blob: 96638e79f8251b6c5a87c3edb97455594d8d1f35 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import calendar
import datetime
import grpc
import logging
import re
import time
# pylint: disable=import-error
import moblab_service
from storage_qual_check_service import (
StorageQualCheckService,
StorageQualCheckError,
)
# pylint: disable=import-error
from moblab_common import afe_connector
from moblab_common import config_connector
from moblab_common import feedback_connector
from moblab_common import host_connector
from moblab_common import moblab_info
from moblab_common import moblab_build_connector
from moblab_common import dut_connector_for_fwupd
# pylint: disable=import-error
from moblab_common.proto import moblabrpc_pb2
# pylint: disable=import-error
from moblab_common.proto import moblabrpc_pb2_grpc
# pylint: disable=import-error
from moblab_common.utils.constants import Constants as MoblabConstants
from moblab_common.result_upload_status_connector import Status as UploadStatus
from google.protobuf import empty_pb2
AFE_TO_MOBLAB_STATUS_MAPPINGS = {
"Queued": moblabrpc_pb2.Job.QUEUED,
"Provisioning": moblabrpc_pb2.Job.PROVISIONING,
"Running": moblabrpc_pb2.Job.RUNNING,
"Completed": moblabrpc_pb2.Job.COMPLETE,
"Failed": moblabrpc_pb2.Job.FAILED,
"Aborted": moblabrpc_pb2.Job.ABORTED,
"Starting": moblabrpc_pb2.Job.STARTING,
"Pending": moblabrpc_pb2.Job.PENDING,
"Resetting": moblabrpc_pb2.Job.RESETTING,
"Verifying": moblabrpc_pb2.Job.VERIFYING,
"Gathering": moblabrpc_pb2.Job.GATHERING,
"Parsing": moblabrpc_pb2.Job.PARSING,
"Stopped": moblabrpc_pb2.Job.STOPPED,
"Cleaning": moblabrpc_pb2.Job.CLEANING,
"Template": moblabrpc_pb2.Job.TEMPLATE,
}
MOBLAB_TO_AFE_JOBS_STATUS_MAPPINGS = {
moblabrpc_pb2.Job.FAILED_JOBS: "Failed",
moblabrpc_pb2.Job.ABORTED_JOBS: "Aborted",
moblabrpc_pb2.Job.COMPLETED_JOBS: "Completed",
}
AFE_TO_DUT_STATUS_MAPPINGS = {
"Verifying": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_VERIFYING,
"Running": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_RUNNING,
"Ready": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_READY,
"Repairing": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_REPAIRING,
"Repair Failed": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_REPAIR_FAILED,
"Cleaning": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_CLEANING,
"Pending": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_PENDING,
"Resetting": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_RESETTING,
"Provisioning": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_PROVISIONING,
"Unknown": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_UNKNOWN,
"Disconnected": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_DISCONNECTED,
"Not Enrolled": moblabrpc_pb2.ConnectedDutInfo.DUT_STATUS_NOT_ENROLLED,
}
API_TO_GRPC_BUILD_STATUS_MAP = {
moblab_build_connector.BuildStatus.AVAILABLE: moblabrpc_pb2.BuildItem.BuildStatus.BUILD_STATUS_PASS,
moblab_build_connector.BuildStatus.FAILED: moblabrpc_pb2.BuildItem.BuildStatus.BUILD_STATUS_FAIL,
moblab_build_connector.BuildStatus.RUNNING: moblabrpc_pb2.BuildItem.BuildStatus.BUILD_STATUS_RUNNING,
moblab_build_connector.BuildStatus.ABORTED: moblabrpc_pb2.BuildItem.BuildStatus.BUILD_STATUS_ABORTED,
}
class MoblabRpcService(moblabrpc_pb2_grpc.MoblabRpcServiceServicer):
def __init__(self):
super(MoblabRpcService, self).__init__()
self.afe_connector = afe_connector.AFEConnector()
self.config_connector = config_connector.MoblabConfigConnector(
self.afe_connector
)
self.service = moblab_service.MoblabService()
self.storage_qual_check_service = StorageQualCheckService()
self.dut_connector_for_fwupd = (
dut_connector_for_fwupd.MoblabDUTConnectorForFwupd()
)
def send_moblab_screenshot(self, request, context):
"""Uploads image and feedback text to GCS.
Args:
request: SendMoblabRequest object with a string email, string
description and a Base64 serialized png image screenshot.
context: grpc.server.Context
Returns:
SendMoblabResponse type with string message indicating location of
uploaded feedback.
"""
moblab_bucket_name = self.config_connector.get_cloud_bucket(
force_reload=True
)
if not moblab_bucket_name:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(
"GCS Bucket is not configured. To send feedback"
" please go to Configuration page and set GCS Bucket."
)
return moblabrpc_pb2.SendMoblabScreenshotResponse()
fc = feedback_connector.MoblabFeedbackConnector(moblab_bucket_name)
path, url = fc.upload_feedback(
request.contact_email,
request.description,
request.screenshot,
)
response = moblabrpc_pb2.SendMoblabScreenshotResponse(
message=(
"Thank you! Feedback uploaded to bucket {} under {}. "
"URL to feedback: {}"
).format(moblab_bucket_name, path, url)
)
return response
def run_suite(self, request, context):
"""Generic run suite method.
Accepts suite as a custom argument, allows for run of any suite but
with no custom arguments.
Args:
request: RunSuiteRequest object with parameters describing suite to
be run and arguments for suite run.
_context: grpc.server.Context
Returns:
RunSuiteResponse object with a string result message.
"""
try:
return moblabrpc_pb2.RunSuiteResponse(
message=self.service.run_suite(
request.build_target,
request.model,
request.milestone,
request.build_version,
request.suite,
pool=request.pool,
)
)
except moblab_service.MoblabRpcError as ex:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(ex))
return moblabrpc_pb2.RunSuiteResponse()
async def provision_duts(self, request, context):
"""Starts provision test suite.
Args:
request: ProvisionDutsRequest object with parameters describing the
build version to be provisioned on the DUTs within a pool.
context: grpc.server.Context
Returns:
Empty message if started successfully.
"""
message = await self.service.provision_duts(
request.pool,
request.milestone,
request.build_version,
)
if message:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(message)
return empty_pb2.Empty()
def stage_build(self, request, context):
"""Stage the build to the bucket configured for this moblab.
Args:
request: StageBuildRequest object with parameters describing the
model, build target and build version to be staged.
context: grpc.server.Context
Returns:
StageBuildRequest object with a string bucket name.
"""
try:
return moblabrpc_pb2.StageBuildResponse(
build_bucket=self.service._stage_build(
request.model, request.build_target, request.build_version
)
)
except moblab_service.MoblabRpcError as ex:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(ex))
raise
def run_cts_suite(self, request, _context):
"""Sets off CTS suite run.
Args:
request: RunCtsSuiteRequest object with parameters describing suite
to be
run and arguments for suite run:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
android_version: A string specifying which android version suite to
run ( Ex. cts_X )
model: String identfier for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
specific_modules: Comma-separated string of specific module names
to run.
_context: grpc.server.Context
Returns:
RunSuiteResponse object with a string result message.
"""
response = moblabrpc_pb2.RunSuiteResponse(
message=self.service.run_cts_suite(
request.build_target,
request.model,
request.milestone,
request.build_version,
request.android_version,
request.pool,
request.specific_modules,
)
)
return response
def run_gts_suite(self, request, _context):
"""Sets off GTS suite run.
Args:
request: RunGtsSuiteRequest object with parameters describing
arguments for suite run:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
model: String identfier for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
specific_modules: Comma-separated string of specific module names
to run.
_context: grpc.server.Context
Returns:
RunSuiteResponse object with a string result message.
"""
response = moblabrpc_pb2.RunSuiteResponse(
message=self.service.run_gts_suite(
request.build_target,
request.model,
request.milestone,
request.build_version,
request.pool,
request.specific_modules,
)
)
return response
async def run_storage_qualification_suite(self, request, _context):
"""Sets off hardware storage qualification suite run.
Args:
request: RunStorageQualificationSuiteRequest object with parameters
describing arguments for suite run:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
model: String identifier for the specific model to be run.
avl_process_bug_id: Numeric string identifying the associated
buganizer issue.
avl_part_number: Numeric string identifying relevant part number.
variation: RunStorageQualificationSuiteRequest.Variation
disk_size_gb: Size of main storage disk (in whole GB).
is_dual_namespace: Dual-namespace test requested.
is_pre_qualified: Has the component been previously qualified.
_context: grpc.server.Context
Returns:
RunSuiteResponse object with a string result message.
"""
try:
pool_name = request.pool
model = request.model
board = request.build_target
if pool_name == "":
pool_name = None
await self.storage_qual_check_service.check_setup(
model, board, pool_name, request.variation
)
except StorageQualCheckError as ex:
_context.set_code(grpc.StatusCode.INTERNAL)
_context.set_details(str(ex))
return moblabrpc_pb2.RunSuiteResponse()
return moblabrpc_pb2.RunSuiteResponse(
message=self.service.run_storage_qual_suite(
request.build_target,
request.model,
request.milestone,
request.build_version,
request.avl_process_bug_id,
request.avl_part_number,
request.variation,
request.pool,
request.disk_size_gb,
request.is_dual_namespace,
request.is_pre_qualified,
)
)
def run_memory_qualification_suite(self, request, _context):
"""Sets off hardware memory qualification suite run.
Args:
request: RunMemoryQualificationSuiteRequest object with parameters
describing arguments for suite run:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
model: String identifier for the specific model to be run.
avl_process_bug_id: Numeric string identifying the associated
buganizer issue.
avl_part_number: Numeric string identifying relevant part number.
_context: grpc.server.Context
Returns:
RunSuiteResponse object with a string result message.
"""
response = moblabrpc_pb2.RunSuiteResponse(
message=self.service.run_memory_qual_suite(
request.build_target,
request.model,
request.milestone,
request.build_version,
request.avl_process_bug_id,
request.avl_part_number,
request.pool,
)
)
return response
def run_faft_suite(self, request, _context):
"""Sets off FAFT suite run.
Args:
request: RunFaftSuiteRequest object with parameters describing suite
to be
run and arguments for suite run:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
suite: A string specifying which suite to
run ( Ex. faft_setup )
model: String identfier for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
specific_modules: Comma-separated string of specific test names
to run, just part of the suite.
_context: grpc.server.Context
Returns:
RunSuiteResponse object with a string result message.
"""
response = moblabrpc_pb2.RunSuiteResponse(
message=self.service.run_faft_suite(
request.build_target,
request.model,
request.milestone,
request.build_version,
request.suite,
request.pool,
request.specific_tests,
)
)
return response
def run_fwupd_suite(self, request, _context):
"""Sets off FWUPD suite run.
Args:
request: RunFWUPDSuiteRequest object with parameters describing suite
to be
run and arguments for suite run:
build_target: Ex. octopus
milestone: Ex. R80
build_version: A numeric version string of the form: '00000.00.0'
suite: A string specifying which suite to
run ( Ex. faft_setup )
model: String identfier for the specific model to be run.
pool: String identifier for which pool of DUT's to run tests with.
test_args: args string of specific test names
to run, just part of the suite.
_context: grpc.server.Context
Returns:
RunFWUPDSuiteResponse object with a string result message.
"""
response = moblabrpc_pb2.RunSuiteResponse(
message=self.service.run_fwupd_suite(
request.build_target,
request.model,
request.milestone,
request.build_version,
request.suite,
request.pool,
request.test_args,
)
)
return response
def _get_connected_dut_from_dict(self, dut_dict):
return moblabrpc_pb2.ConnectedDutInfo(
name=dut_dict.get("platform", ""),
ip=dut_dict.get("hostname", ""),
mac_addr=dut_dict.get("mac", ""),
model=moblab_service.LabelsParser.extract_model_from_labels(
dut_dict.get("labels", [])
),
build_target=moblab_service.LabelsParser.extract_build_target_from_labels(
dut_dict.get("labels", [])
),
pools=sorted(
moblab_service.LabelsParser.extract_pool_from_labels(
dut_dict.get("labels", [])
)
),
status=AFE_TO_DUT_STATUS_MAPPINGS[dut_dict["status"]],
is_enrolled=dut_dict.get("enrolled", False),
is_connected=dut_dict.get("isConnected", True),
labels=sorted(dut_dict.get("labels", [])),
attributes=sorted(
[
"%s: %s" % (key, value)
for key, value in dut_dict.get("attributes", {}).items()
]
),
error=dut_dict.get("error", ""),
)
async def list_connected_duts(self, _request, _context):
response = moblabrpc_pb2.ListConnectedDutsResponse()
duts = await self.service.list_duts()
for dut in duts:
dut_info = self._get_connected_dut_from_dict(dut)
response.duts.extend([dut_info])
return response
def get_dut_details(self, request, _context):
try:
dut = self.service.get_dut_details(request.dut)
except IndexError:
error_msg = "Unable to find DUT: {}".format(request.dut)
logging.error(error_msg)
_context.set_code(grpc.StatusCode.NOT_FOUND)
_context.set_details(error_msg)
else:
response = moblabrpc_pb2.GetDutDetailsResponse(
dut_detail=self._get_connected_dut_from_dict(dut)
)
if "current_job" in dut:
response.dut_detail.current_job = (
dut["current_job"] if dut["current_job"] else 0
)
if "current_special_task" in dut:
response.dut_detail.current_dut_task = (
dut["current_special_task"]
if dut["current_special_task"]
else ""
)
return response
def list_test_jobs(self, request, context):
pass
def list_build_targets(self, request, _context):
"""
Returns build targets associated with an account.
Args:
request: ListBuildTargetsRequest object.
_context: grpc.server.Context
Returns:
ListBuildTargetsResponse
"""
build_targets = self.service.list_build_targets()
response = moblabrpc_pb2.ListBuildTargetsResponse(
build_targets=build_targets
)
return response
def list_build_targets_by_model(self, request, _context):
"""
Returns build targets associated with the given model.
Args:
request: ListBuildTargetsByModelRequest object with model parameter.
_context: grpc.server.Context
Returns:
ListBuildTargetsByModelResponse
"""
build_targets = self.service.list_usable_build_targets(request.model)
response = moblabrpc_pb2.ListBuildTargetsByModelResponse(
build_targets=build_targets
)
return response
def list_milestones(self, request, _context):
milestones = self.service.list_usable_milestones(
request.build_target, request.model
)
response = moblabrpc_pb2.ListMilestonesResponse(
milestones=[
moblabrpc_pb2.BuildItem(
value=i.milestone,
is_staged=i.is_staged,
)
for i in milestones.results
],
is_incomplete=milestones.is_incomplete,
)
return response
def _convert_build_version_to_grpc(self, status):
if status in API_TO_GRPC_BUILD_STATUS_MAP:
return API_TO_GRPC_BUILD_STATUS_MAP[status]
return moblabrpc_pb2.BuildItem.BuildStatus.BUILD_STATUS_PASS
def list_build_versions(self, request, _context):
build_versions = self.service.list_usable_build_versions(
request.build_target, request.milestone, request.model, request.label,
)
logging.debug(
"Build versions for %s target, %s milestone, %s model: %s",
request.build_target,
request.milestone,
request.model,
build_versions,
)
response = moblabrpc_pb2.ListBuildVersionsResponse(
build_versions=[
moblabrpc_pb2.BuildItem(
value=i.build_version,
labels=i.labels,
is_staged=i.is_staged,
status=self._convert_build_version_to_grpc(i.status),
)
for i in build_versions.results
],
is_incomplete=build_versions.is_incomplete,
)
return response
def list_pools(self, request, _context):
afe_hosts = self.afe_connector.get_connected_devices()
pools = set()
model = request.model
for host in afe_hosts:
labels = host.get("labels", [])
dut_model = moblab_service.LabelsParser.extract_model_from_labels(
labels
)
if model == "" or model.lower() == dut_model.lower():
pool_names = (
moblab_service.LabelsParser.extract_pool_from_labels(
labels
)
)
pools = pools.union(pool_names)
response = moblabrpc_pb2.ListPoolsResponse(pools=list(pools))
return response
def list_models(self, _request, _context):
afe_hosts = self.afe_connector.get_connected_devices()
models = set()
for host in afe_hosts:
models.add(
moblab_service.LabelsParser.extract_model_from_labels(
host.get("labels", [])
)
)
response = moblabrpc_pb2.ListModelsResponse(models=list(models))
return response
def list_accessible_models(self, request, context):
"""Lists all models for a given build target that this account has access to."""
models_dict = self.service.list_models(request.build_target)
response = moblabrpc_pb2.ListAccessibleModelsResponse()
for model, build_targets in models_dict.items():
model_resp = moblabrpc_pb2.Model(
name=model, build_targets=list(build_targets)
)
response.models.extend([model_resp])
return response
async def enroll_duts(self, request, _context):
await self.service.enroll_duts(request.ips)
response = moblabrpc_pb2.EnrollDutsResponse()
return response
def unenroll_duts(self, request, _context):
self.service.unenroll_duts(request.ips)
response = moblabrpc_pb2.EnrollDutsResponse()
return response
def list_connected_duts_firmware(self, _request, _context):
logging.info("Firmware list requested")
duts = self.service.list_connected_duts_firmware()
response = moblabrpc_pb2.ListConnectedDutsFirmwareResponse()
for dut in duts:
dut_firmware_info = moblabrpc_pb2.ConnectedDutFirmwareInfo(
ip=dut[0],
current_firmware=dut[1],
update_firmware=dut[2],
)
response.duts.extend([dut_firmware_info])
return response
def update_duts_firmware(self, request, _context):
results = self.service.update_duts_firmware(request.ips)
response = moblabrpc_pb2.UpdateDutsFirmwareResponse()
for result in results:
command_output = moblabrpc_pb2.FirmwareUpdateCommandOutput(
ip=result[0], command_output=result[1]
)
response.outputs.extend([command_output])
return response
def get_num_jobs(self, request, _context):
num_jobs = self.service.get_num_jobs(
request.id_filter,
request.name_filter,
request.created_time_lt,
request.created_time_gt,
request.rel_filter == moblabrpc_pb2.Job.CHILD,
request.rel_filter == moblabrpc_pb2.Job.SUITE,
request.status_filter == moblabrpc_pb2.Job.QUEUED_JOBS,
request.status_filter == moblabrpc_pb2.Job.JOBS_RUNNING,
False,
self._extract_afe_jobs_status(request.status_filter),
request.parent_id_filter,
request.dut_hostname_filter,
)
response = moblabrpc_pb2.GetNumJobsResponse(num_jobs=num_jobs)
return response
def _convert_timestamp_to_utc_secs(self, ts):
if not ts:
return None
try:
dt_ts = datetime.datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
to_utc_delta_seconds = calendar.timegm(
time.gmtime()
) - calendar.timegm(time.localtime())
return (
int((dt_ts - datetime.datetime(1970, 1, 1)).total_seconds())
+ to_utc_delta_seconds
)
except (ValueError, TypeError):
logging.exception(
"Encountered an invalid string for conversion to"
+ " timestamp: {}".format(ts)
)
return None
def _extract_status_enum(self, status_str):
if status_str in AFE_TO_MOBLAB_STATUS_MAPPINGS:
return AFE_TO_MOBLAB_STATUS_MAPPINGS[status_str]
elif re.match(r"Aborted \(\s*\w+\s*\)", status_str):
# Ex. Aborted (Running)
return moblabrpc_pb2.Job.ABORTING
else:
logging.error(
'Encountered unexpected status "{}"'.format(status_str)
)
return moblabrpc_pb2.Job.STATUS_NOT_SET
def _extract_status(self, job):
if job.get("status"):
return self._extract_status_enum(job.get("status"))
elif job.get("status_counts"):
# "if there is more than one status count, error out"
if (
len(job.get("status_counts").keys()) != 1
or job.get("status_counts")[
list(job.get("status_counts").keys())[0]
]
!= 1
):
logging.error(
"Encountered multiple status values for job {}".format(job)
)
return moblabrpc_pb2.Job.STATUS_NOT_SET
return self._extract_status_enum(
list(job.get("status_counts").keys())[0]
)
else:
logging.error("Unable to find status for job {}".format(job))
return moblabrpc_pb2.Job.STATUS_NOT_SET
def _extract_afe_jobs_status(self, jobs_status_enum):
if jobs_status_enum in MOBLAB_TO_AFE_JOBS_STATUS_MAPPINGS:
return MOBLAB_TO_AFE_JOBS_STATUS_MAPPINGS[jobs_status_enum]
else:
logging.error(
'Encountered unexpected jobs status enum "{}"'.format(
jobs_status_enum
)
)
return None
def get_jobs(self, request, _context):
jobs = self.service.get_jobs(
request.query_start,
request.query_limit,
request.id_filter,
request.name_filter,
request.created_time_lt,
request.created_time_gt,
request.rel_filter == moblabrpc_pb2.Job.CHILD,
request.rel_filter == moblabrpc_pb2.Job.SUITE,
request.status_filter == moblabrpc_pb2.Job.QUEUED_JOBS,
request.status_filter == moblabrpc_pb2.Job.JOBS_RUNNING,
False,
self._extract_afe_jobs_status(request.status_filter),
request.parent_id_filter,
request.dut_hostname_filter,
request.sort_by,
)
response = moblabrpc_pb2.GetJobsResponse()
if jobs:
for job in jobs:
job_proto = moblabrpc_pb2.Job(
job_id=job.get("id", -1),
name=job.get("name", "Name not found"),
priority=job.get("priority", 0),
created_time_sec_utc=self._convert_timestamp_to_utc_secs(
job["created_on"]
),
parent_job_id=job.get("parent_job", -1),
status=self._extract_status(job),
upload_state=moblabrpc_pb2.Job.UploadState(
status=self._map_upload_status(
job.get("upload_status", "")
),
attempts_number=job.get("attempt_number", 0),
last_error=job.get("last_falure_reason", None),
),
hostname=job.get("hostname", "--"),
)
response.jobs.extend([job_proto])
return response
def _map_upload_status(self, internal_status):
result = moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_UNKNOWN
if internal_status == UploadStatus.NOT_READY.name:
result = moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_NOT_READY
if internal_status == UploadStatus.QUEUED.name:
result = moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_QUEUED
if internal_status == UploadStatus.UPLOADING.name:
result = moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_UPLOADING
if internal_status == UploadStatus.UPLOADED.name:
result = moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_UPLOADED
if internal_status == UploadStatus.UPLOAD_FAILED.name:
result = moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_UPLOAD_FAILED
if internal_status == UploadStatus.DELETED.name:
result = moblabrpc_pb2.Job.UploadStatus.UPLOAD_STATUS_DELETED
return moblabrpc_pb2.Job.UploadStatus.Name(result)
def get_job_ids(self, request, _context):
"""Returns the job ID's that match the request filters.
Args:
request: GetJobIdsRequest
_context:
Returns:
GetJobIdsResponse object with a list of ID numerics.
"""
job_ids = self.service.get_job_ids(
request.query_start,
request.query_limit,
request.id_filter,
request.name_filter,
request.created_time_lt,
request.created_time_gt,
request.rel_filter == moblabrpc_pb2.Job.CHILD,
request.rel_filter == moblabrpc_pb2.Job.SUITE,
request.status_filter == moblabrpc_pb2.Job.QUEUED_JOBS,
request.status_filter == moblabrpc_pb2.Job.JOBS_RUNNING,
False,
self._extract_afe_jobs_status(request.status_filter),
request.parent_id_filter,
request.dut_hostname_filter,
)
response = moblabrpc_pb2.GetJobIdsResponse(job_ids=job_ids)
return response
def abort_jobs(self, request, _context):
message = self.service.abort_jobs(
# converting proto RepeatedScalarFieldContainer type to python list
[job_id for job_id in request.job_ids]
)
response = moblabrpc_pb2.AbortJobsResponse()
response.message = message
return response
def _convert_timedelta_to_miliseconds(self, minutes=None, seconds=None):
if not minutes and not seconds:
return None
try:
miliseconds = 0
if minutes:
miliseconds += int(minutes) * 60 * 1000
if seconds:
miliseconds += int(seconds) * 1000
return miliseconds
except ValueError:
logging.error(
"Got invalid timedelta values {} {}".format(minutes, seconds)
)
return None
def get_job_details(self, request, _context):
try:
job = self.service.get_job_details(request.id)[0]
except IndexError:
error_msg = "Unable to find job with ID: {}".format(request.id)
logging.error(error_msg)
_context.set_code(grpc.StatusCode.NOT_FOUND)
_context.set_details(error_msg)
return
else:
execution_history = []
job_history_dicts = self.service.get_job_history(request.id)
if job_history_dicts:
execution_history = [
moblabrpc_pb2.Execution(
name=item.get("name", ""),
job_id=item.get("id", ""),
status=self._extract_status(item),
start_time_utc=self._convert_timestamp_to_utc_secs(
item.get("start_time", "")
),
end_time_utc=self._convert_timestamp_to_utc_secs(
item.get("end_time", "")
),
used_time=self._convert_timedelta_to_miliseconds(
seconds=item.get("time_used", "")
),
dut=item.get("hostname", ""),
)
for item in job_history_dicts
]
associated_duts = []
for item in self.service.get_associated_duts(request.id):
try:
dut = item["host"]["hostname"]
except (KeyError, TypeError):
dut = "hostless"
try:
dut_status = item["host"]["status"]
except (KeyError, TypeError):
dut_status = ""
job_id = int(item.get("id", ""))
status = self._extract_status(item)
associated_duts.append(
moblabrpc_pb2.Execution(
dut=dut,
dut_status=dut_status,
job_id=job_id,
status=status,
)
)
response = moblabrpc_pb2.GetJobDetailsResponse(
job_detail=moblabrpc_pb2.Job(
job_id=job.get("id"),
name=job.get("name", ""),
priority=job.get("priority", 0),
created_time_sec_utc=self._convert_timestamp_to_utc_secs(
job.get("created_on", "")
),
parent_job_id=job.get("parent_job", None),
status=self._extract_status(job),
dependencies=job.get("dependencies", ""),
server_control_file=job.get("control_file", ""),
max_runtime=self._convert_timedelta_to_miliseconds(
minutes=job.get("max_runtime_mins", "")
),
timeout=self._convert_timedelta_to_miliseconds(
minutes=job.get("timeout_mins", "")
),
execution_history=execution_history,
associated_duts=associated_duts,
)
)
return response
def get_num_dut_tasks(self, request, _context):
"""
Given a GetNumDutTasksRequest, returns a GetNumDutTasksResponse obj
which wraps an int representing the number of tasks that match filter
parameters of the request.
"""
num_dut_tasks = self.service.get_num_dut_tasks(
request.ip,
request.start_time_gt,
request.end_time_lt,
)
if num_dut_tasks < 0:
_context.set_code(grpc.StatusCode.NOT_FOUND)
return
return moblabrpc_pb2.GetNumDutTasksResponse(
num_dut_tasks=num_dut_tasks
)
def _extract_dut_task_status(self, dut_task_dict):
"""
Given a dict representing a dut_task, will extract a status enum
based on the provided boolean flags.
"""
try:
if dut_task_dict["is_aborted"]:
return moblabrpc_pb2.DutTask.ABORTED
elif dut_task_dict["success"]:
return moblabrpc_pb2.DutTask.SUCCEEDED
elif dut_task_dict["is_complete"]:
return moblabrpc_pb2.DutTask.FAILED
elif dut_task_dict["is_active"]:
return moblabrpc_pb2.DutTask.STARTED
else:
return moblabrpc_pb2.DutTask.REQUESTED
except KeyError:
logging.error(
"Failed to determine dut task status for:\n{}".format(
dut_task_dict
)
)
return moblabrpc_pb2.DutTask.STATUS_NOT_SET
def get_dut_tasks(self, request, _context):
"""
Given a GetDutTasksRequest, returns a GetDutTasksResponse obj which
wraps a list of DutTask objects that match the given filter
parameters.
"""
dut_task_list = self.service.get_dut_tasks(
request.ip,
request.query_start,
request.query_limit,
request.start_time_gt,
request.end_time_lt,
)
if dut_task_list is None:
_context.set_code(grpc.StatusCode.NOT_FOUND)
return
response = moblabrpc_pb2.GetDutTasksResponse()
for dut_task_dict in dut_task_list:
dut_task = moblabrpc_pb2.DutTask()
dut_task.id = dut_task_dict.get("id", 0)
dut_task.task = dut_task_dict.get("task", "")
dut_task.status = self._extract_dut_task_status(dut_task_dict)
dut_task.started_time_utc = self._convert_timestamp_to_utc_secs(
dut_task_dict.get("time_started", None)
)
dut_task.finished_time_utc = self._convert_timestamp_to_utc_secs(
dut_task_dict.get("time_finished", None)
)
dut_task.task_log_id = dut_task_dict.get("task_log_id")
response.tasks.extend([dut_task])
return response
def add_attribute_to_duts(self, request, _context):
"""
Given a AddAttributeToDutsRequest, returns a
AddAttributeToDutsResponse obj which wraps a message indicating
which duts the attribute was successfully added on.
(A call to ListConnectedDuts will also reflect the change)
"""
response_msg = self.service.add_attribute_to_duts(
request.dut_hostnames, request.key, request.value
)
response = moblabrpc_pb2.AddAttributeToDutsResponse()
response.message = response_msg
return response
def remove_attribute_from_duts(self, request, _context):
"""
Given a RemoveAttributeFromDutsRequest, returns a
RemoveAttributeFromDutsResponse obj which wraps a message indicating
which duts the attribute was successfully removed from.
(A call to ListConnectedDuts will also reflect the change)
"""
response_msg = self.service.remove_attribute_from_duts(
request.dut_hostnames,
request.key,
)
response = moblabrpc_pb2.RemoveAttributeFromDutsResponse()
response.message = response_msg
return response
def add_label_to_duts(self, request, _context):
"""
Given a AddLabelToDutsRequest, returns a
AddLabelToDutsResponse obj which wraps a message indicating
which duts the label was successfully added on.
(A call to ListConnectedDuts will also reflect the change)
"""
response_msg = self.service.add_label_to_duts(
request.dut_hostnames,
request.label,
)
response = moblabrpc_pb2.AddLabelToDutsResponse()
response.message = response_msg
return response
def remove_label_from_duts(self, request, _context):
"""
Given a RemoveLabelFromDutsRequest, returns a
RemoveLabelFromDutsResponse obj which wraps a message indicating
which duts the label was successfully removed from.
(A call to ListConnectedDuts will also reflect the change)
"""
response_msg = self.service.remove_label_from_duts(
request.dut_hostnames,
request.label,
)
response = moblabrpc_pb2.RemoveLabelFromDutsResponse()
response.message = response_msg
return response
def add_pool_to_duts(self, request, _context):
"""
Given a AddPoolToDutsRequest, returns a
AddPoolDutsResponse obj which wraps a message indicating
which duts the pool was successfully added on.
(A call to ListConnectedDuts will also reflect the change)
"""
response_msg = self.service.add_pool_to_duts(
request.dut_hostnames,
request.pool,
)
response = moblabrpc_pb2.AddPoolToDutsResponse()
response.message = response_msg
return response
def remove_pool_from_duts(self, request, _context):
"""
Given a RemovePoolFromDutsRequest, returns a
RemovePoolFromDutsResponse obj which wraps a message indicating
which duts the pool was successfully removed from.
(A call to ListConnectedDuts will also reflect the change)
"""
response_msg = self.service.remove_pool_from_duts(
request.dut_hostnames,
request.pool,
)
response = moblabrpc_pb2.RemovePoolFromDutsResponse()
response.message = response_msg
return response
def reverify_host(self, request, _context):
"""
Sets of reverify job on specified DUT.
Args:
request: ReverifyHostRequest with parameter:
dut_hostnames(string[])
_context: grpc.server.Context
Returns:
ReverifyHostResponse
"""
message = self.service.reverify_host(
[hostname for hostname in request.dut_hostnames]
)
response = moblabrpc_pb2.ReverifyHostResponse(message=message)
return response
def repair_host(self, request, _context):
"""
Sets of repair job on specified DUT.
Args:
request: RepairHostRequest with parameter:
dut_hostname(string)
_context: grpc.server.Context
Returns:
RepairHostResponse
"""
message = self.service.repair_host(request.dut_hostname)
response = moblabrpc_pb2.RepairHostResponse(message=message)
return response
def get_network_info(self, request, _context):
"""
Gets network info of Moblab.
Args:
request: GetNetworkInfoRequest object ( with no parameters )
_context: grpc.server.Context
Returns:
GetNetworkInfoResponse
"""
network_info = moblab_info.get_network_info()
response = moblabrpc_pb2.GetNetworkInfoResponse(
moblab_hostname=network_info["server_ip"],
moblab_mac_address=network_info["server_mac_address"],
is_connected=network_info["is_connected"],
)
return response
def get_system_info(self, request, _context):
"""
Gets system info of Moblab.
Args:
request: GetSystemInfoRequest object ( with no parameters )
_context: grpc.server.Context
Returns:
GetSystemInfoResponse
"""
response = moblabrpc_pb2.GetSystemInfoResponse(
cpu_temperature=host_connector.HostServicesConnector.get_cpu_temperature(),
)
return response
def get_dut_wifi_info(self, request, _context):
"""
Gets DUT wifi info configured for Moblab.
Args:
request: GetDutWifiInfoRequest object ( with no parameters )
_context: grpc.server.Context
Returns:
GetDutWifiInfoResponse
"""
dut_wifi_info = self.service.get_dut_wifi_info()
response = moblabrpc_pb2.GetDutWifiInfoResponse(
dut_wifi_name=dut_wifi_info.get("wifi_dut_ap_name", ""),
dut_wifi_password=dut_wifi_info.get("wifi_dut_ap_pass", ""),
)
return response
def set_dut_wifi_info(self, request, _context):
"""
Set DUT wifi info configured for Moblab.
Args:
request: SetDutWifiInfoRequest object with parameters:
dut_wifi_name,
dut_wifi_password
_context: grpc.server.Context
Returns:
GetDutWifiInfoResponse
"""
msg = self.service.set_dut_wifi_info(
request.dut_wifi_name,
request.dut_wifi_password,
)
response = moblabrpc_pb2.SetDutWifiInfoResponse(message=msg)
return response
def get_cloud_configuration(self, request, _context):
"""
Get cloud configuration info of Moblab.
Args:
request: GetCloudConfigurationRequest object ( with no parameters )
_context: grpc.server.Context
Returns:
GetCloudConfigurationResponse
"""
cloud_configuration = self.service.get_cloud_configuration()
is_remote_agent_enabled = self.service.get_is_remote_agent_enabled()
is_remote_command_enabled = (
self.service.get_is_remote_command_enabled()
)
response = moblabrpc_pb2.GetCloudConfigurationResponse(
is_cloud_enabled=cloud_configuration.get(
"is_cloud_enabled", "False"
)
== "True",
boto_key_id=cloud_configuration.get("gs_access_key_id", ""),
boto_key_secret=cloud_configuration.get(
"gs_secret_access_key", ""
),
gcs_bucket_url=cloud_configuration.get("image_storage_server", ""),
is_remote_console_enabled=is_remote_agent_enabled,
is_remote_command_enabled=is_remote_command_enabled,
)
return response
def set_cloud_configuration(self, request, _context):
"""
Set cloud configuration info of Moblab.
Args:
request: SetCloudConfigurationRequest object with parameters:
is_cloud_enabled,
boto_key_id,
boto_key_secret,
gcs_bucket_url,
is_remote_console_enabled
_context: grpc.server.Context
Returns:
GetCloudConfigurationResponse
"""
msg = self.service.set_cloud_configuration(
request.is_cloud_enabled,
request.boto_key_id,
request.boto_key_secret,
request.gcs_bucket_url,
request.is_remote_console_enabled,
request.is_remote_command_enabled,
)
response = moblabrpc_pb2.SetCloudConfigurationResponse(message=msg)
return response
def get_version_info(self, request, _context):
"""
Get version info of Moblab.
Args:
request: GetVersionInfoRequest object ( with no parameters )
_context: grpc.server.Context
Returns:
GetVersionInfoResponse
"""
version_info = moblab_info.get_version_info()
response = moblabrpc_pb2.GetVersionInfoResponse(
chromeos_release_version=version_info.get(
MoblabConstants.BUILD_VERSION_NAME, "No Os Version"
),
chromeos_release_track=version_info.get(
MoblabConstants.CHANNEL_NAME, "No Track Name"
),
chromeos_release_description=version_info.get(
MoblabConstants.DESCRIPTION_NAME, "No Description"
),
moblab_install_id=version_info.get(
MoblabConstants.MOBLAB_INSTALL_ID_NAME, "No Install ID"
),
moblab_serial_number=version_info.get(
MoblabConstants.MOBLAB_HOST_ID_NAME, "No Host ID"
),
moblab_release_version=version_info.get(
MoblabConstants.MOBLAB_RELEASE_VERSION, "No version"
),
)
return response
def reboot_moblab(self, request, _context):
try:
host_connector.HostServicesConnector.reboot()
except host_connector.HostServicesException as e:
return moblabrpc_pb2.RebootMoblabResponse(error_message=str(e))
return moblabrpc_pb2.RebootMoblabResponse()
def get_is_update_available(self, request, _context):
"""
Get whether an update is available for user to pull.
Args:
request: GetIsUpdateAvailableRequest object ( with no parameters )
_context: grpc.server.Context
Returns:
GetIsUpdateAvailableResponse ( with a boolean true iff an update is
available ).
"""
is_update_available = self.service.get_is_update_available()
response = moblabrpc_pb2.GetIsUpdateAvailableResponse(
is_update_available=is_update_available
)
return response
def update_moblab(self, request, _context):
"""
Set off an update of moblab.
Args:
request: UpdateMoblabRequest object ( with no parameters )
_context: grpc.server.Context
Returns:
UpdateMoblabResponse ( with a string result message ).
"""
message = self.service.update_moblab()
response = moblabrpc_pb2.UpdateMoblabResponse(message=message)
return response
def get_remote_agent_configuration(self, request, _context):
"""
Get remote agent configuration info of Moblab.
Args:
request: GetRemoteAgentConfigurationRequest object
(with no parameters)
_context: grpc.server.Context
Returns:
GetRemoteAgentConfigurationResponse
"""
return moblabrpc_pb2.GetRemoteAgentConfigurationResponse(
is_enabled=self.service.get_is_remote_agent_enabled()
)
def set_remote_agent_configuration(self, request, _context):
"""
Set remote agent configuration info of Moblab.
Args:
request: SetRemoteAgentConfigurationRequest object with a
parameter: is_enabled
_context: grpc.server.Context
Returns:
SetRemoteAgentConfigurationResponse
"""
is_succeeded = self.service.set_is_remote_agent_enabled(
is_enabled=request.is_enabled
)
return moblabrpc_pb2.SetRemoteAgentConfigurationResponse(
is_succeeded=is_succeeded
)
def download_service_account(self, unused_request, unused_context):
"""Copy the service account to local machine.
The service account is stored in the cloud account bucket, copy it
to the local drive for use by services.
"""
self.service.download_service_account()
return empty_pb2.Empty()
async def validate_storage_qual_setup(self, request, _context):
"""Validates the DUTs setup for storage qual run.
Storage qual requires 3 DUTs with 3 different labels to be configured.
This method validates that the given pool is configured correctly.
"""
pool_name = request.pool_name
model = request.model
board = request.board
if pool_name == "":
pool_name = None
try:
await self.storage_qual_check_service.check_setup(
model, board, pool_name
)
except StorageQualCheckError as ex:
_context.set_code(grpc.StatusCode.INTERNAL)
_context.set_details(str(ex))
return empty_pb2.Empty()
def add_servo(self, request, _context):
"""Set the information about the servo connected to a DUT.
When a DUT is connected via servo hardware you need to configure that
DUT to know about the servo.
"""
self.service.add_servo(
request.dut_hostname, request.servo_serial_number
)
return moblabrpc_pb2.AddServoResponse()
def get_peripheral_information(self, request, _context):
"""Get information regarding the peripherals connected to the DUT
Figure out the set of peripherals that are connected to the particular
DUT and send this information back to the UI
"""
dut_hostname = request.dut_hostname
logging.debug(
f"obtained {dut_hostname} as dut_hostname for \
get_peripheral_information rpc call"
)
result = self.dut_connector_for_fwupd.get_peripheral_information(
dut_hostname
)
# now ssh to the dut and run the commands
return moblabrpc_pb2.GetPeripheralInformationResponse(
json_info=str(result)
)