| # Copyright 2019 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """Module for interacting with Buildbucket.""" |
| |
| import collections |
| import datetime |
| import logging |
| import os |
| import uuid |
| |
| import analytics |
| import build_lib |
| import constants |
| import file_getter |
| from multi_duts_lib import restruct_secondary_targets_from_string |
| |
| from chromite.api.gen.test_platform import request_pb2 as ctp_request |
| from components import auth |
| from components.prpc import client as prpc_client |
| from chromite.third_party.infra_libs.buildbucket.proto import common_pb2 as bb_common_pb2 |
| from chromite.third_party.infra_libs.buildbucket.proto import builds_service_pb2, builder_common_pb2 |
| from chromite.third_party.infra_libs.buildbucket.proto.builds_service_prpc_pb2 import BuildsServiceDescription |
| from chromite.api.gen.chromiumos.test.api import test_suite_pb2 |
| |
| from oauth2client import service_account |
| |
| from google.protobuf import json_format, struct_pb2 |
| |
| GS_PREFIX = 'gs://chromeos-image-archive/' |
| CONTAINER_METADATA_LOC = 'metadata/containers.jsonpb' |
| |
| # The default prpc timeout(10 sec) is too short for the request_id |
| # to propgate in buildbucket, and could not fully dedup the |
| # ScheduleBuild request. Increase it while not hitting the default |
| # GAE deadline(60 sec) |
| PRPC_TIMEOUT_SEC = 55 |
| |
| |
| def _get_client(address): |
| """Create a prpc client instance for given address.""" |
| return prpc_client.Client(address, BuildsServiceDescription) |
| |
| |
| class BuildbucketRunError(Exception): |
| """Raised when interactions with buildbucket server fail.""" |
| |
| |
| class TestPlatformClient(object): |
| """prpc client for cros_test_platform, aka frontdoor.""" |
| def __init__(self, address, project, bucket, builder): |
| self.client = _get_client(address) |
| self.builder = builder_common_pb2.BuilderID(project=project, |
| bucket=bucket, |
| builder=builder) |
| self.scope = 'https://www.googleapis.com/auth/userinfo.email' |
| self.running_env = constants.environment() |
| |
| def multirequest_run(self, tasks, suite): |
| """Call cros_test_platform Builder to schedule a batch of suite tests. |
| |
| Args: |
| tasks: The suite tasks to run. |
| suite: suite name for the batch request. |
| |
| Returns: |
| List of executed tasks. |
| |
| Raises: |
| BuildbucketRunError: if failed to get build info from task parameters. |
| """ |
| requests = [] |
| executed_tasks = [] |
| counter = collections.defaultdict(int) |
| task_executions = [] |
| req_tags = _bb_tags(suite) |
| for task in tasks: |
| try: |
| params = task.extract_params() |
| if _should_skip(params): |
| continue |
| req = _form_test_platform_request(params) |
| req_json = json_format.MessageToJson(req) |
| counter_key = params['board'] |
| counter_key += '' if params['model'] in [None, 'None' |
| ] else ('_' + params['model']) |
| counter[counter_key] += 1 |
| req_name = counter_key |
| if counter[counter_key] > 1: |
| req_name += '_' + str(counter[counter_key] - 1) |
| requests.append('"%s": %s' % (req_name, req_json)) |
| req_tags.append( |
| bb_common_pb2.StringPair( |
| key='label-image', |
| value=_infer_build_from_task_params(params))) |
| executed_tasks.append(task) |
| if params.get('task_id'): |
| task_executions.append( |
| analytics.ExecutionTask(params['task_id'], req_name)) |
| except (ValueError, BuildbucketRunError): |
| logging.error('Failed to process task: %r', params) |
| if not requests: |
| return [] |
| try: |
| requests_json = '{ "requests": { %s } }' % ', '.join(requests) |
| req_build = self._build_request(requests_json, req_tags) |
| logging.debug('Raw request to buildbucket: %r', req_build) |
| |
| if (self.running_env == constants.RunningEnv.ENV_STANDALONE |
| or self.running_env == constants.RunningEnv.ENV_DEVELOPMENT_SERVER): |
| # If running locally, use the staging service account. |
| sa_key = self._gen_service_account_key( |
| file_getter.STAGING_CLIENT_SECRETS_FILE) |
| cred = prpc_client.service_account_credentials( |
| service_account_key=sa_key) |
| else: |
| cred = prpc_client.service_account_credentials() |
| resp = self.client.ScheduleBuild(req_build, |
| credentials=cred, |
| timeout=PRPC_TIMEOUT_SEC) |
| logging.debug('Response from buildbucket: %r', resp) |
| for t in task_executions: |
| t.update_result(resp) |
| try: |
| if not t.upload(): |
| logging.warning('Failed to insert row: %r', t) |
| # For any exceptions from BQ, only log it. |
| except Exception as e: #pylint: disable=broad-except |
| logging.exception('Failed to insert row: %r, got error: %s', t, |
| str(e)) |
| return executed_tasks |
| except Exception as e: |
| logging.debug('Failed to process tasks: %r', tasks) |
| logging.exception(str(e)) |
| return [] |
| |
| def dummy_run(self): |
| """Perform a dummy run of prpc call to cros_test_platform-dev.""" |
| |
| requests_json = '{ "requests": { "dummy": {} } }' |
| req_build = self._build_request(requests_json, tags=None) |
| # Use the staging service account to authorize the request. |
| sa_key = self._gen_service_account_key( |
| file_getter.STAGING_CLIENT_SECRETS_FILE) |
| cred = prpc_client.service_account_credentials(service_account_key=sa_key) |
| return self.client.ScheduleBuild(req_build, |
| credentials=cred, |
| timeout=PRPC_TIMEOUT_SEC) |
| |
| def _build_request(self, reqs_json, tags): |
| """Generate ScheduleBuildRequest for calling buildbucket. |
| |
| Args: |
| reqs_json: A json string of requests. |
| tags: A list of tags for the buildbucket build. |
| |
| Returns: |
| A ScheduleBuildRequest instance. |
| """ |
| requests_struct = struct_pb2.Struct() |
| recipe_struct = json_format.Parse(reqs_json, requests_struct) |
| return builds_service_pb2.ScheduleBuildRequest(builder=self.builder, |
| properties=recipe_struct, |
| request_id=str( |
| uuid.uuid1()), |
| tags=tags) |
| |
| def _gen_service_account_key(self, sa): |
| """Generate credentials to authorize the call. |
| |
| Args: |
| sa: A string of the path to the service account json file. |
| |
| Returns: |
| A service account key. |
| """ |
| service_credentials = service_account.ServiceAccountCredentials |
| key = service_credentials.from_json_keyfile_name(sa, self.scope) |
| return auth.ServiceAccountKey(client_email=key.service_account_email, |
| private_key=key._private_key_pkcs8_pem, |
| private_key_id=key._private_key_id) |
| |
| |
| def _form_test_platform_request(task_params): |
| """Generates test_platform.Request proto to send to buildbucket. |
| |
| Args: |
| task_params: dict containing the parameters of a task from the suite queue. |
| |
| Returns: |
| A ctp_request.Request instance. |
| """ |
| build = _infer_build_from_task_params(task_params) |
| if build == 'None': |
| raise BuildbucketRunError('No proper build in task params: %r' % |
| task_params) |
| pool = _infer_pool_from_task_params(task_params) |
| timeout = _infer_timeout_from_task_params(task_params) |
| |
| request = ctp_request.Request() |
| params = request.params |
| |
| params.scheduling.CopyFrom(_scheduling_for_pool(pool)) |
| if task_params.get('qs_account') not in ['None', None]: |
| params.scheduling.qs_account = task_params.get('qs_account') |
| # Quota Scheduler has no concept of priority. |
| if (task_params.get('priority') not in ['None', None] |
| and not params.scheduling.qs_account): |
| params.scheduling.priority = int(task_params['priority']) |
| |
| params.software_dependencies.add().chromeos_build = build |
| params.software_attributes.build_target.name = task_params['board'] |
| |
| gs_url = GS_PREFIX + task_params['test_source_build'] |
| params.metadata.test_metadata_url = gs_url |
| params.metadata.debug_symbols_archive_url = gs_url |
| params.metadata.container_metadata_url = os.path.join(gs_url, CONTAINER_METADATA_LOC) |
| |
| params.time.maximum_duration.FromTimedelta(timeout) |
| |
| for key, value in _request_tags(task_params, build, pool).iteritems(): |
| params.decorations.tags.append('%s:%s' % (key, value)) |
| |
| for d in _infer_user_defined_dimensions(task_params): |
| params.freeform_attributes.swarming_dimensions.append(d) |
| |
| if task_params['model'] != 'None': |
| params.hardware_attributes.model = task_params['model'] |
| |
| if task_params.has_key('job_retry') and task_params['job_retry'] == 'True': |
| params.retry.allow = True |
| params.retry.max = constants.Buildbucket.MAX_RETRY |
| |
| params.run_via_cft = (task_params.has_key('run_via_cft') and task_params['run_via_cft'] == 'True') |
| |
| fw_rw_build = task_params.get(build_lib.BuildVersionKey.FW_RW_VERSION) |
| fw_ro_build = task_params.get(build_lib.BuildVersionKey.FW_RO_VERSION) |
| |
| if task_params.has_key('firmware_ro_version'): |
| build = params.software_dependencies.add() |
| build.ro_firmware_build = task_params['firmware_ro_version'] |
| |
| # Skip firmware field if None(unspecified) or 'None'(no such build). |
| if fw_ro_build not in (None, 'None'): |
| build = params.software_dependencies.add() |
| build.ro_firmware_build = fw_ro_build |
| |
| if task_params.has_key('firmware_rw_version'): |
| build = params.software_dependencies.add() |
| build.rw_firmware_build = task_params['firmware_rw_version'] |
| |
| if fw_rw_build not in (None, 'None'): |
| build = params.software_dependencies.add() |
| build.rw_firmware_build = fw_rw_build |
| |
| secondary_targets = task_params.get('secondary_targets') |
| secondary_targets = restruct_secondary_targets_from_string(secondary_targets) |
| if secondary_targets: |
| for s_target in secondary_targets: |
| s_device = params.secondary_devices.add() |
| s_device.software_attributes.build_target.name = s_target.board |
| if s_target.model: |
| s_device.hardware_attributes.model = s_target.model |
| if s_target.cros_build: |
| s_build = s_device.software_dependencies.add() |
| s_build.chromeos_build = s_target.cros_build |
| |
| request.test_plan.suite.add().name = task_params['suite'] |
| if params.run_via_cft: |
| include_tags = task_params.get('include_tags', None) |
| exclude_tags = task_params.get('exclude_tags', None) |
| if include_tags or exclude_tags: |
| request.test_plan.tag_criteria.CopyFrom( |
| test_suite_pb2.TestSuite.TestCaseTagCriteria( |
| tags=_get_test_tags(include_tags), |
| tag_excludes=_get_test_tags(exclude_tags))) |
| return request |
| |
| |
| def _scheduling_for_pool(pool): |
| """Assign appropriate pool name to scheduling instance. |
| |
| Args: |
| pool: string pool name (e.g. 'MANAGED_POOL_QUOTA', 'wificell'). |
| |
| Returns: |
| scheduling: A ctp_request.Request.Params.Scheduling instance. |
| """ |
| mp = ctp_request.Request.Params.Scheduling.ManagedPool |
| if mp.DESCRIPTOR.values_by_name.get(pool) is not None: |
| return ctp_request.Request.Params.Scheduling(managed_pool=mp.Value(pool)) |
| return ctp_request.Request.Params.Scheduling(unmanaged_pool=pool) |
| |
| |
| # Also see: A copy of this function in swarming_lib.py. |
| def _infer_build_from_task_params(task_params): |
| """Infer the build to install on the DUT for the scheduled task. |
| |
| Args: |
| task_params: The parameters of a task loaded from suite queue. |
| |
| Returns: |
| A string representing the build to run. |
| """ |
| cros_build = task_params[build_lib.BuildVersionKey.CROS_VERSION] |
| android_build = task_params[build_lib.BuildVersionKey.ANDROID_BUILD_VERSION] |
| testbed_build = task_params[build_lib.BuildVersionKey.TESTBED_BUILD_VERSION] |
| return cros_build or android_build or testbed_build |
| |
| |
| def _infer_pool_from_task_params(task_params): |
| """Infer the pool to use for the scheduled task. |
| |
| Args: |
| task_params: The parameters of a task loaded from suite queue. |
| |
| Returns: |
| A string pool to schedule task in. |
| """ |
| if task_params.get('override_qs_account'): |
| return 'DUT_POOL_QUOTA' |
| return task_params.get('override_pool') or task_params['pool'] |
| |
| |
| def _infer_timeout_from_task_params(task_params): |
| """Infer the timeout for the scheduled task. |
| |
| Args: |
| task_params: The parameters of a task loaded from suite queue. |
| |
| Returns: |
| A datetime.timedelta instance for the timeout. |
| """ |
| timeout_mins = int(task_params['timeout_mins']) |
| # timeout's unit is hour. |
| if task_params.get('timeout'): |
| timeout_mins = max(int(task_params['timeout']) * 60, timeout_mins) |
| if timeout_mins > constants.Buildbucket.MAX_BUILDBUCKET_TIMEOUT_MINS: |
| timeout_mins = constants.Buildbucket.MAX_BUILDBUCKET_TIMEOUT_MINS |
| return datetime.timedelta(minutes=timeout_mins) |
| |
| |
| def _infer_user_defined_dimensions(task_params): |
| """Infer the dimensions defined by users. |
| |
| Args: |
| task_params: The parameters of a task loaded from suite queue. |
| |
| Returns: |
| A list of strings; an empty list if no dimensions set. |
| |
| Raises: |
| ValueError: if dimension is not valid. |
| """ |
| result = [] |
| if task_params.get('dimensions') in (None, 'None'): |
| return result |
| dims = [d.lstrip() for d in task_params.get('dimensions').split(',')] |
| for d in dims: |
| if len(d.split(':')) != 2: |
| raise ValueError( |
| 'Job %s has invalid dimensions: %s' % |
| (task_params.get('name'), task_params.get('dimensions'))) |
| result.append(d) |
| return result |
| |
| |
| def _request_tags(task_params, build, pool): |
| """Infer tags to include in cros_test_platform request. |
| |
| Args: |
| task_params: suite task parameters. |
| build: The build included in the request. Must not be None. |
| pool: The DUT pool used for the request. Must not be None. |
| |
| Returns: |
| A dict of tags. |
| """ |
| tags = { |
| 'build': build, |
| 'label-pool': pool, |
| 'ctp-fwd-task-name': task_params.get('name') |
| } |
| if task_params.get('board') not in (None, 'None'): |
| tags['label-board'] = task_params['board'] |
| if task_params.get('model') not in (None, 'None'): |
| tags['label-model'] = task_params['model'] |
| if task_params.get('suite') not in (None, 'None'): |
| tags['suite'] = task_params['suite'] |
| if task_params.get('analytics_name') not in (None, 'None'): |
| tags['analytics_name'] = task_params['analytics_name'] |
| return tags |
| |
| |
| def _get_test_tags(tags_list_str): |
| """parse test tags properly and return them. |
| |
| Args: |
| tags_list_str: comma separated list of tags. |
| |
| Returns: |
| A list of strings; an empty list if no tags set. |
| """ |
| if tags_list_str not in (None, '', 'None'): |
| return [d.lstrip() for d in tags_list_str.split(',')] |
| return [] |
| |
| |
| def _get_key_val_from_label(label): |
| """A helper to get key and value from the label. |
| |
| Args: |
| label: A string of label, should be in the form of |
| key:value, e.g. 'pool:ChromeOSSkylab'. |
| """ |
| res = label.split(':') |
| if len(res) == 2: |
| return res[0], res[1] |
| logging.warning('Failed to parse the label, %s', label) |
| |
| |
| def _bb_tags(suite): |
| """Get all the tags required for Buildbucket. |
| |
| Args: |
| suite: A string of suite name being scheduled. |
| |
| Returns: |
| [bb_common_pb2.StringPair] tags to include the buildbucket request. |
| """ |
| return [ |
| bb_common_pb2.StringPair(key='label-suite', value=suite), |
| bb_common_pb2.StringPair(key='suite', value=suite), |
| bb_common_pb2.StringPair(key='user_agent', value='suite_scheduler') |
| ] |
| |
| |
| def _should_skip(params): |
| """Decide whether to skip a task based on env and pool. |
| |
| Suite request from staging may still have a small chance to run |
| in production. However, for unmanaged pools(e.g. wificell), which |
| usually are small, dev traffic is unacceptable. |
| |
| Args: |
| params: dict containing the parameters of a task got from suite |
| queue. |
| |
| Returns: |
| A boolean; true for suite targetting non-default pools from staging |
| env. |
| """ |
| if constants.application_id() == constants.AppID.PROD_APP: |
| return False |
| return params['pool'] != 'MANAGED_POOL_QUOTA' |