blob: 67c5e650aacaa592596672f54d726763ee45af0b [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2019 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Buildbucket api utility.
"""
from distutils.spawn import find_executable
import http.client
import json
import logging
import os
import shutil
import socket
import ssl
import tempfile
# from third_party
from google.protobuf import json_format
from google.protobuf.field_mask_pb2 import FieldMask
from google.protobuf.timestamp_pb2 import Timestamp
from google.protobuf.message import DecodeError
from infra_libs.buildbucket.proto.builder_pb2 import BuilderID
from infra_libs.buildbucket.proto.build_pb2 import Build
from infra_libs.buildbucket.proto.builds_service_pb2 import BuildPredicate
from infra_libs.buildbucket.proto.builds_service_pb2 import GetBuildRequest
from infra_libs.buildbucket.proto.builds_service_pb2 import ScheduleBuildRequest
from infra_libs.buildbucket.proto.builds_service_pb2 import SearchBuildsRequest
from infra_libs.buildbucket.proto.builds_service_pb2 import SearchBuildsResponse
from infra_libs.buildbucket.proto.common_pb2 import Status
from infra_libs.buildbucket.proto.common_pb2 import StringPair
from infra_libs.buildbucket.proto.common_pb2 import TimeRange
from bisect_kit import errors
from bisect_kit import gclient_util
from bisect_kit import util
logger = logging.getLogger(__name__)
luci_link = ('https://ci.chromium.org/p/chromeos/builders/'
'{bucket}/{board}-{bucket}/b{id}')
def get_luci_link(board, build_id, bucket='bisector'):
"""Get a link directs to luci build page.
Args:
board: ChromeOS board name.
build_id: Buildbucket build id.
bucket: A bucket name of buildbucket.
Returns:
A link to luci build details.
"""
return luci_link.format(board=board, bucket=bucket, id=build_id)
class BuildbucketApi:
"""A wrapper of buildbucket apis.
This is a singleton class, it returns a same instance every time.
"""
_instance = None
_build_fields = [
'status', 'update_time', 'created_by', 'builder', 'create_time',
'critical', 'start_time', 'input', 'output', 'end_time', 'id', 'tags',
'summary_markdown'
]
buildbucket_domain = 'cr-buildbucket.appspot.com'
buildbucket_domain_url = 'https://cr-buildbucket.appspot.com/'
buildbucket_api_path = '/prpc/buildbucket.v2.Builds/'
cas_instance = 'chromeos-swarming'
token_cache = os.path.expanduser('~/.isolated_oauth')
# 2020-05-12T00:00:00Z: remove cros_debug flag
old_artifact_cutoff = 1589241600
def __new__(cls):
if cls._instance is None:
cls._instance = object.__new__(cls)
return cls._instance
def __init__(self):
"""Find swarming folder and its auth script."""
self._token = None
swarming_py = find_executable('swarming.py')
if swarming_py is None:
raise errors.InternalError('swarming.py is not found in path, ' +
'please check if swarming.py is installed, ' +
' go/lab-tools for more information.')
folder = os.path.dirname(os.path.realpath(swarming_py))
self.default_service_account_json = os.environ.get(
'SKYLAB_CLOUD_SERVICE_ACCOUNT_JSON')
self.default_service_account_client_id = self._get_client_id(
self.default_service_account_json)
self.auth_py = os.path.join(folder, 'auth.py')
if not os.path.exists(self.auth_py):
raise errors.InternalError('auth.py not found in swarming folder: %s' %
folder)
def _get_client_id(self, json_file):
"""Get client id from account json.
Returns:
Account id or None.
"""
if not json_file:
return None
with open(json_file) as f:
data = json.load(f)
return data.get('client_id')
def _cas_upload(self, file_name, content):
"""Upload a file to cas server.
Args:
file_name: file name that will upload to cas
content: content of `file_name`
Returns:
A hash string generated by cas.
"""
work_space = tempfile.mkdtemp()
hash_file = os.path.join(work_space, 'hash.txt')
file_path = os.path.join(work_space, file_name)
try:
with open(file_path, 'w') as f:
f.write(content)
# Normalize file mode, so the isolate hash depends only on file content.
# The actual mode value doesn't matter.
os.chmod(file_path, 0o640)
cmd = [
'cas',
'archive',
'-cas-instance',
'chromeos-swarming',
'-paths',
'%s:.' % work_space,
'-dump-digest',
hash_file,
]
if self.default_service_account_json:
cmd += [
'-service-account-json',
self.default_service_account_json,
]
output = util.check_output(*cmd)
with open(hash_file, 'r') as f:
cas_digest = f.read()
finally:
shutil.rmtree(work_space, True)
if not cas_digest:
raise errors.ExternalError('cas archive fail: ' + output)
return cas_digest
def _get_token(self):
"""Returns access_token saved in cache file.
Currently this function loads cache from swarming login,
which is in `~/.isolated_oauth`.
Returns:
access_token: token loaded from token cache.
Raises:
errors.InternalError: If token cache file is not found
"""
if self._token is None:
if not os.path.exists(self.token_cache):
raise errors.InternalError(
'buildbucket api cannot find ' +
'token cache file, please run %s manually to login.' % self.auth_py)
with open(self.token_cache) as f:
data = json.load(f)['data']
# if service account is set, use service account first
if self.default_service_account_client_id:
for x in data:
client_id = x.get('credential', {}).get('client_id')
if client_id == self.default_service_account_client_id:
self._token = x['credential']['access_token']
# use user account if service account is not found
if not self._token:
logger.warning(
'No token for the given service account is found in the token file,'
' use the first token instead.')
self._token = data[0]['credential']['access_token']
return self._token
def _get_header(self):
"""Returns a header includes token for api request."""
return {
'Content-Type': 'application/prpc; encoding=binary',
'Authorization': 'Bearer %s' % self._get_token()
}
def _refresh_token(self, domain):
"""Trigger swarming's auth library to renew access_token"""
if self.default_service_account_json:
util.check_call(
self.auth_py,
'check',
'--service',
domain,
'--auth-service-account-json',
self.default_service_account_json,
)
else:
# refresh user account token
util.check_call(
self.auth_py,
'check',
'--service',
domain,
)
logger.debug('buildbucket token refreshed')
self._token = None
def _post(self, api, request, return_class, timeout=10):
"""Post a rpc request to buildbucket server.
Args:
api: A string indicates api name in `buildbucket.v2.Builds`.
request: A protobuf object.
return_class: A protobuf class that catches api's return.
timeout: connection timeout in seconds, default to 10.
Returns:
A `return_class` object.
Raises:
errors.ExternalError: api request fail
"""
max_retries = 5
logged_in = False
response_content = None
while max_retries > 0:
try:
conn = http.client.HTTPSConnection(
self.buildbucket_domain, timeout=timeout)
conn.request('POST', self.buildbucket_api_path + api,
request.SerializeToString(), self._get_header())
response = conn.getresponse()
response_content = response.read()
result = return_class()
result.ParseFromString(response_content)
return result
except DecodeError as e:
if not logged_in:
self._refresh_token(self.buildbucket_domain_url)
logged_in = True
else:
if response_content:
response_content = response_content.decode('utf-8')
logger.exception('Error in buildbucket post: %s', response_content)
raise errors.ExternalError('buildbucket api response: ' +
response_content) from e
except (socket.timeout, ssl.SSLError):
if max_retries <= 0:
raise
max_retries -= 1
raise errors.ExternalError('buildbucket api %s failed' % api)
def get_build(self, buildbucket_id):
"""Get a build object by its buildbucket id.
Args:
buildbucket_id: An integer indicates buildbucket id.
Returns:
A Build object if success.
Raises:
errors.InternalError: If argument format incorrect.
errors.ExternalError: If api request fail.
"""
assert isinstance(buildbucket_id, int)
field_mask = FieldMask(paths=self._build_fields)
request = GetBuildRequest(id=buildbucket_id, fields=field_mask)
return self._post('GetBuild', request, Build)
def schedule_build(self,
board,
manifest_xml,
recipe='build_postsubmit',
deps=None,
git_auth=True,
bucket='bisector',
force_rebuild=False,
build_tags=None):
"""Schedule a build job in buildbucket.
Args:
board: ChromeOS board name
manifest_xml: A full manifest xml for building chromeOS
recipe: Recipe name to execute
deps: A full DEPS file for building Chrome
git_auth: A workaround for crbug.com/1068743
bucket: A bucket name of buildbucket that will handle this build
force_rebuild: Set True to ignore existing builds on buildbucket
build_tags: Tags for buildbucket build object.
Returns:
A Build object if success.
Raises:
errors.ExternalError: If api request fail.
"""
if build_tags is None:
build_tags = {}
tags = []
for key, value in build_tags.items():
tags.append(StringPair(key=key, value=value))
chromeos_digest = self._cas_upload('snapshot.xml', manifest_xml)
# TODO(zjchang): remove git_auth after crbug.com/1068743 fixed.
properties = {
'$recipe_engine/cas': {
'instance': self.cas_instance,
},
'$chromeos/failures': {
'disable_silences': True,
},
'$chromeos/cros_relevance': {
'force_postsubmit_relevance': True,
},
'$chromeos/cros_source': {
'snapshot_cas': {
'digest': chromeos_digest,
},
},
'$kitchen': {
'devshell': True,
'git_auth': git_auth,
},
'build_target': {
'name': board,
},
'builder_name': '%s-%s' % (board, bucket),
'recipe': recipe,
}
chrome_digest = None
deps_src_ver = None
if deps:
depsParser = gclient_util.DepsParser(None, None)
parsed = depsParser.parse_single_deps(deps)
deps_src_ver = parsed.remove_src()
deps = parsed.to_string()
chrome_digest = self._cas_upload('DEPS', deps)
properties['$chromeos/chrome'] = {
'deps_cas': {
'digest': chrome_digest,
},
'version': deps_src_ver,
}
builder = {
'project': 'chromeos',
'builder': '%s-%s' % (board, bucket),
'bucket': bucket,
}
# search old builds
if not force_rebuild:
build = self.search_usable_build(board, bucket, chromeos_digest,
chrome_digest, deps_src_ver)
if build:
return build
request = ScheduleBuildRequest(builder=builder, tags=tags)
request.properties.update(properties)
return self._post('ScheduleBuild', request, Build)
def is_running(self, build):
"""Check if a build is in running or scheduled status.
Args:
build: A Build object returned from get_build
Returns:
True if the build is running or scheduled.
Raises:
errors.ExternalError: If api request fail.
"""
waiting_status = [
Status.Value('SCHEDULED'),
Status.Value('STARTED'),
Status.Value('STATUS_UNSPECIFIED'),
]
return build.status in waiting_status
def is_success(self, build):
"""Check if a build is succeeded.
Args:
build: A Build object returned from get_build
Returns:
True if the build is succeeded.
Raises:
errors.ExternalError: If api request fail.
"""
return build.status == Status.Value('SUCCESS')
def search_builds(self,
project='chromeos',
bucket=None,
builder=None,
status=None,
created_by=None,
snapshot_commit=None,
limit=100,
start_time=None,
end_time=None):
"""Search builds.
Args:
project: Project name in buildbucket. Defaults to 'chromeos'.
bucket: Bucket name in buildbucket. Defaults to None.
builder: Builder name in buildbucket. Defaults to None.
status: Search builds by build status. Defaults to None.
created_by: Search builds by creator. Defaults to None.
snapshot_commit: Search by annealing snapshot commit hash for Chrome
OS. Defaults to None.
limit: Maximum numbers of builds to return. Defaults to 100.
start_time: An integer timestamp indicates lower bound of start time.
end_time: An integer timestamp indicates upper bound of start time.
Returns:
A list of builds.
"""
search_response_fields = ['builds.*.%s' % x for x in self._build_fields
] + ['next_page_token']
field_mask = FieldMask(paths=search_response_fields)
create_time = None
tags = []
if status:
status = Status.Value(status)
if snapshot_commit:
tags.append(StringPair(key='snapshot', value=snapshot_commit))
if start_time or end_time:
create_time = TimeRange()
if start_time:
create_time.start_time.CopyFrom(Timestamp(seconds=start_time, nanos=0))
if end_time:
create_time.end_time.CopyFrom(Timestamp(seconds=end_time, nanos=0))
# Limitation of page_size is 1000 in theory.
# But in practice, setting to 1000 can cause api instance exceed memory
# limit, so here size is set to 100 to avoid api crash.
page_size = min(100, limit)
builder = BuilderID(
project=project,
bucket=bucket,
builder=builder,
)
predicate = BuildPredicate(
builder=builder,
created_by=created_by,
status=status,
tags=tags,
create_time=create_time,
)
request = SearchBuildsRequest(
fields=field_mask,
predicate=predicate,
page_size=page_size,
)
result = []
while len(result) < limit:
response = self._post(
'SearchBuilds', request, SearchBuildsResponse, timeout=120)
result += response.builds
if not response.next_page_token:
break
request = SearchBuildsRequest(
fields=field_mask,
predicate=predicate,
page_token=response.next_page_token,
page_size=page_size,
)
return result[0:limit]
def search_usable_build(self,
board,
bucket,
manifest_digest,
deps_digest=None,
deps_src_ver=None):
"""Search for recent success build if exists.
Args:
board: ChromeOS board name.
bucket: Bucket name in buildbucket.
manifest_digest: Cas hash for Chrome OS manifest.
deps_digest: Cas hash for Chrome DEPS file, leave None if no
special buildspec for Chrome.
deps_src_ver: The src revision inside deps.
Returns:
An build object if found, or None if no existing build.
"""
builds = self.search_builds(
bucket=bucket,
builder='%s-%s' % (board, bucket),
start_time=self.old_artifact_cutoff,
)
valid_status = [
Status.Value('SCHEDULED'),
Status.Value('STARTED'),
Status.Value('SUCCESS')
]
for build in builds:
if build.status not in valid_status:
continue
input_properties = json_format.MessageToDict(build.input.properties)
output_properties = json_format.MessageToDict(build.output.properties)
gs_bucket = util.dict_get(output_properties, 'artifacts', 'gs_bucket')
gs_path = util.dict_get(output_properties, 'artifacts', 'gs_path')
build_manifest_hash = util.dict_get(input_properties,
'$chromeos/cros_source',
'snapshot_cas', 'digest')
build_deps_hash = util.dict_get(input_properties, '$chromeos/chrome',
'deps_cas', 'digest')
build_src_ver = util.dict_get(input_properties, '$chromeos/chrome',
'version')
if (build_manifest_hash == manifest_digest and
build_deps_hash == deps_digest and build_src_ver == deps_src_ver and
gs_bucket and gs_path):
return build
return None