blob: a38fecb36f684882ead416c053f86f47c4214c90 [file] [log] [blame]
# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module integrates buildbucket with swarming.
A bucket config in cr-buildbucket.cfg may have "swarming" field that specifies
how a builder is mapped to a LUCI executable. If build is scheduled for a bucket
with swarming configuration, the integration overrides the default behavior,
e.g. there is no peeking/leasing of builds.
A push task is recorded transactionally with a build entity. The push task then
creates a swarming task, based on the build proto and global settings, and
re-enqueues itself with 1m delay. Future invocations of the push task
synchronize the build state with the swarming task state (e.g. if a task
starts, then the build is marked as started too) and keep re-enqueuing itself,
with 1m delay, and continuously synchronizes states, until the swarming task
is complete.
When creating a task, a PubSub topic is specified. Swarming will notify on
task status updates to the topic and buildbucket will sync its state.
Eventually both swarming task and buildbucket build will complete.
"""
import base64
import collections
import copy
import datetime
import json
import logging
import posixpath
import re
import uuid
from google.appengine.api import app_identity
from google.appengine.api import memcache
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
from google.protobuf import json_format
import webapp2
from components import decorators
from components import net
from components import utils
from legacy import api_common
from go.chromium.org.luci.buildbucket.proto import common_pb2
from go.chromium.org.luci.buildbucket.proto import launcher_pb2
from go.chromium.org.luci.buildbucket.proto import service_config_pb2
import config
import errors
import events
import model
import resultdb
import tokens
import tq
import user
# Name of a push task queue that handles the first attempt of the swarming task
# creation; a push task per build.
CREATE_QUEUE_NAME = 'swarming-build-create'
# Name of a push task queue that synchronizes a buildbucket build and a swarming
# task; a push task per build.
SYNC_QUEUE_NAME = 'swarming-build-sync'
# This is the path, relative to the swarming run dir, to the directory that
# contains the mounted swarming named caches. It will be prepended to paths of
# caches defined in swarmbucket configs.
_CACHE_DIR = 'cache'
# This is the path, relative to the swarming run dir, which is where the recipes
# are either checked out, or installed via CIPD package.
#
# TODO(iannucci): rename this for bbagent (maybe to "user_exe").
_KITCHEN_CHECKOUT = 'kitchen-checkout'
# Directory where user-available packages are installed, such as git.
# Relative to swarming task cwd.
# USER_PACKAGE_DIR and USER_PACKAGE_DIR/bin are prepended to $PATH.
USER_PACKAGE_DIR = 'cipd_bin_packages'
################################################################################
# Creation/cancellation of tasks.
class Error(Exception):
"""Base class for swarmbucket-specific errors."""
def _buildbucket_property(build):
"""Returns value for '$recipe_engine/buildbucket' build property.
Code that reads it:
https://cs.chromium.org/chromium/infra/recipes-py/recipe_modules/buildbucket
"""
# Exclude some fields from the property.
export = copy.deepcopy(build.proto)
export.ClearField('status')
export.ClearField('update_time')
export.ClearField('output')
export.input.ClearField('properties')
export.infra.ClearField('recipe')
export.infra.buildbucket.ClearField('requested_properties')
if export.exe.cmd:
export.exe.ClearField('cmd') # useless for kitchen
build.tags_to_protos(export.tags)
return {
'build': json_format.MessageToDict(export),
'hostname': app_identity.get_default_version_hostname(),
}
def compute_task_def(build, settings, fake_build):
"""Returns a swarming task definition for the |build|.
Args:
build (model.Build): the build to generate the task definition for.
build.proto.infra and build.proto.input.properties must be initialized.
settings (service_config_pb2.SettingsCfg): global settings.
fake_build (bool): False if the build is not going to be actually
created in buildbucket. This is used by led that only needs the definition
of the task that *would be* used for a new build like this.
Returns a task_def dict.
Corresponds to JSON representation of
https://cs.chromium.org/chromium/infra/luci/appengine/swarming/swarming_rpcs.py?q=NewTaskRequest&sq=package:chromium&g=0&l=438
"""
assert isinstance(build, model.Build), type(build)
assert isinstance(fake_build, bool), type(fake_build)
assert build.proto.HasField('infra')
assert build.proto.input.HasField('properties')
assert isinstance(settings, service_config_pb2.SettingsCfg)
sw = build.proto.infra.swarming
task = {
'name': 'bb-%d-%s' % (build.proto.id, build.builder_id),
'tags': _compute_tags(build),
'priority': str(sw.priority),
'task_slices': _compute_task_slices(build, settings),
}
if build.proto.number: # pragma: no branch
task['name'] += '-%d' % build.proto.number
if sw.parent_run_id:
# Semantically, it's a run id, but swarming API unfortunately called it
# parent_task_id.
task['parent_task_id'] = sw.parent_run_id
if sw.task_service_account: # pragma: no branch
# Don't pass it if not defined, for backward compatibility.
task['service_account'] = sw.task_service_account
if not fake_build: # pragma: no branch | covered by swarmbucketapi_test.py
task['pubsub_topic'] = 'projects/%s/topics/swarming' % (
app_identity.get_application_id()
)
task['pubsub_userdata'] = json.dumps(
{
'build_id': build.proto.id,
'created_ts': utils.datetime_to_timestamp(utils.utcnow()),
'swarming_hostname': sw.hostname,
},
sort_keys=True,
)
return task
def _compute_tags(build):
"""Computes the Swarming task request tags to use."""
tags = {
'buildbucket_bucket:%s' % build.bucket_id,
'buildbucket_build_id:%s' % build.key.id(),
'buildbucket_hostname:%s' % app_identity.get_default_version_hostname(),
'buildbucket_template_canary:%s' % ('1' if build.canary else '0'),
# TODO(iannucci): 'luci_project' is only necessary for getting swarming to
# show the 'rich output' tab. Swarming should instead look for the
# buildbucket_* tags to compute a buildbucket link on milo, or
# raw_build_log to compute a raw link (for led tasks).
'luci_project:%s' % build.proto.builder.project,
}
if _using_kitchen(build.proto):
logdog = build.proto.infra.logdog
tags.add(
'log_location:logdog://%s/%s/%s/+/annotations' %
(logdog.hostname, logdog.project, logdog.prefix)
)
tags.update(build.tags)
return sorted(tags)
def _compute_task_slices(build, settings):
"""Compute swarming task slices."""
# {expiration_secs: [{'key': key, 'value': value}]}
dims = collections.defaultdict(list)
for c in build.proto.infra.swarming.caches:
assert not c.wait_for_warm_cache.nanos
if c.wait_for_warm_cache.seconds:
dims[c.wait_for_warm_cache.seconds].append({
'key': 'caches', 'value': c.name
})
for d in build.proto.infra.swarming.task_dimensions:
assert not d.expiration.nanos
dims[d.expiration.seconds].append({'key': d.key, 'value': d.value})
dim_key = lambda x: (x['key'], x['value'])
base_dims = dims.pop(0, [])
base_dims.sort(key=dim_key)
grace_period_secs = build.proto.grace_period.seconds
if not _using_kitchen(build.proto):
# bbagent reserves 3 minutes of grace_period, in order to have
# time to have a couple retry rounds for UpdateBuild RPCs.
#
# Once UpdateBuild is in Go, this can very likely be adjusted with
# Go's performance characteristics in mind.
#
# TODO(https://crbug.com/1110990)
grace_period_secs += 180
base_slice = {
'expiration_secs': str(build.proto.scheduling_timeout.seconds),
'wait_for_capacity': build.proto.wait_for_capacity,
'properties': {
'cipd_input': _compute_cipd_input(build, settings),
'execution_timeout_secs': str(build.proto.execution_timeout.seconds),
'grace_period_secs': str(grace_period_secs),
'caches': [{
'path': posixpath.join(_CACHE_DIR, c.path), 'name': c.name
} for c in build.proto.infra.swarming.caches],
'dimensions': base_dims,
'env_prefixes': _compute_env_prefixes(build, settings),
'env': [{
'key': 'BUILDBUCKET_EXPERIMENTAL',
'value': str(build.experimental).upper(),
}],
'command': _compute_command(build, settings),
},
}
if not dims:
return [base_slice]
assert len(dims) <= 6, dims # Swarming limitation
# Create a fallback by copying the original task slice, each time adding the
# corresponding expiration.
task_slices = []
last_exp = 0
for expiration_secs in sorted(dims):
t = {
'expiration_secs': str(expiration_secs - last_exp),
'properties': copy.deepcopy(base_slice['properties']),
}
last_exp = expiration_secs
task_slices.append(t)
# Tweak expiration on the base_slice, which is the last slice.
exp = max(int(base_slice['expiration_secs']) - last_exp, 60)
base_slice['expiration_secs'] = str(exp)
task_slices.append(base_slice)
assert len(task_slices) == len(dims) + 1
# Now add the actual fallback dimensions.
extra_dims = []
for i, (_expiration_secs, kv) in enumerate(sorted(dims.iteritems(),
reverse=True)):
# Now mutate each TaskProperties to have the desired dimensions.
extra_dims.extend(kv)
props = task_slices[-2 - i]['properties']
props['dimensions'].extend(extra_dims)
props['dimensions'].sort(key=dim_key)
return task_slices
def _compute_env_prefixes(build, settings):
"""Returns env_prefixes key in swarming properties."""
env_prefixes = {
'PATH': [
USER_PACKAGE_DIR,
posixpath.join(USER_PACKAGE_DIR, 'bin'),
],
}
extra_paths = set()
for up in settings.swarming.user_packages:
if up.subdir:
path = posixpath.join(USER_PACKAGE_DIR, up.subdir)
extra_paths.add(path)
extra_paths.add(posixpath.join(path, 'bin'))
env_prefixes['PATH'].extend(sorted(extra_paths))
for c in build.proto.infra.swarming.caches:
if c.env_var:
prefixes = env_prefixes.setdefault(c.env_var, [])
prefixes.append(posixpath.join(_CACHE_DIR, c.path))
return [{
'key': key,
'value': value,
} for key, value in sorted(env_prefixes.iteritems())]
def _compute_cipd_input(build, settings):
"""Returns swarming task CIPD input."""
def convert(path, pkg):
"""Converts a package from settings to swarming."""
version = pkg.version
if pkg.version_canary and build.proto.canary:
version = pkg.version_canary
return {
'package_name': pkg.package_name,
'path': path,
'version': version,
}
packages = [
convert('.', settings.swarming.bbagent_package),
convert('.', settings.swarming.kitchen_package),
{
'package_name': build.proto.exe.cipd_package,
'path': _KITCHEN_CHECKOUT,
'version': build.proto.exe.cipd_version,
},
]
for up in settings.swarming.user_packages:
if config.builder_matches(build.proto.builder, up.builders):
path = USER_PACKAGE_DIR
if up.subdir:
path = posixpath.join(path, up.subdir)
packages.append(convert(path, up))
return {
'packages': packages,
}
def _compute_command(build, settings):
if not _using_kitchen(build.proto):
return _compute_bbagent(build, settings)
logdog = build.proto.infra.logdog
annotation_url = (
'logdog://%s/%s/%s/+/annotations' %
(logdog.hostname, logdog.project, logdog.prefix)
)
ret = [
'kitchen${EXECUTABLE_SUFFIX}',
'cook',
'-buildbucket-hostname',
app_identity.get_default_version_hostname(),
'-buildbucket-build-id',
build.proto.id,
'-call-update-build',
'-build-url',
_generate_build_url(settings.swarming.milo_hostname, build),
'-luci-system-account',
'system',
'-recipe',
build.proto.input.properties['recipe'],
'-cache-dir',
_CACHE_DIR,
'-checkout-dir',
_KITCHEN_CHECKOUT,
'-temp-dir',
'tmp',
'-properties',
api_common.properties_to_json(_compute_legacy_properties(build)),
'-logdog-annotation-url',
annotation_url,
]
for h in settings.known_public_gerrit_hosts:
ret += ['-known-gerrit-host', h]
ret = map(unicode, ret) # Ensure strings.
return ret
def _compute_legacy_properties(build):
"""Returns a Struct of properties to be sent to the swarming task.
Mostly provides backward compatibility.
"""
# This is a recipe-based builder. We need to mutate the properties
# to account for backward compatibility.
ret = copy.copy(build.proto.input.properties)
ret.update({
# TODO(crbug.com/877161): remove legacy "buildername" property.
'buildername': build.proto.builder.builder,
'$recipe_engine/buildbucket': _buildbucket_property(build),
})
ret.get_or_create_struct('$recipe_engine/runtime').update({
'is_luci': True,
'is_experimental': build.experimental,
})
if build.proto.number: # pragma: no branch
ret['buildnumber'] = build.proto.number
# Add repository property, for backward compatibility.
# TODO(crbug.com/877161): remove it.
if len(build.proto.input.gerrit_changes) == 1: # pragma: no branch
cl = build.proto.input.gerrit_changes[0]
suffix = '-review.googlesource.com'
if cl.host.endswith(suffix) and cl.project: # pragma: no branch
ret['repository'] = 'https://%s.googlesource.com/%s' % (
cl.host[:-len(suffix)], cl.project
)
return ret
# Strip newlines and end padding characters.
_CLI_ENCODED_STRIP_RE = re.compile('\n|=')
def _cli_encode_proto(message):
"""Encodes a proto message for use on the bbagent command line."""
raw = message.SerializeToString().encode('zlib').encode('base64')
return _CLI_ENCODED_STRIP_RE.sub('', raw)
def _compute_bbagent(build, settings):
"""Returns the command for bbagent."""
if build.bbagent_getbuild:
return [
u'bbagent${EXECUTABLE_SUFFIX}',
u'-host',
build.proto.infra.buildbucket.hostname,
u'-build-id',
str(build.proto.id),
]
proto = copy.deepcopy(build.proto)
build.tags_to_protos(proto.tags)
return [
u'bbagent${EXECUTABLE_SUFFIX}',
_cli_encode_proto(
launcher_pb2.BBAgentArgs(
payload_path=_KITCHEN_CHECKOUT,
cache_dir=_CACHE_DIR,
known_public_gerrit_hosts=settings.known_public_gerrit_hosts,
build=proto,
)
),
]
def validate_build(build):
"""Raises errors.InvalidInputError if swarming constraints are violated."""
if build.lease_key:
raise errors.InvalidInputError(
'Swarming buckets do not support creation of leased builds'
)
expirations = set()
for dim in build.proto.infra.swarming.task_dimensions:
assert not dim.expiration.nanos
expirations.add(dim.expiration.seconds)
if len(expirations) > 6:
raise errors.InvalidInputError(
'swarming supports up to 6 unique expirations'
)
def create_sync_task(build): # pragma: no cover
"""Returns def of a push task that maintains build state until it ends.
Handled by TaskSyncBuild.
Raises:
errors.InvalidInputError if the build is invalid.
"""
validate_build(build)
payload = {
'id': build.key.id(),
'generation': 0,
}
return {
'url': '/internal/task/swarming/sync-build/%s' % build.key.id(),
'payload': json.dumps(payload, sort_keys=True),
'retry_options': {'task_age_limit': model.BUILD_TIMEOUT.total_seconds()},
}
def _sync_build(build_id, generation):
"""Synchronizes build with Swarming and ResultDB.
If the swarming task does not exist yet, creates it.
Otherwise updates the build state to match swarming task state.
Enqueues a new sync push task if the build did not end.
"""
bundle = model.BuildBundle.get(build_id, infra=True, input_properties=True)
if not bundle: # pragma: no cover
logging.warning('build not found')
return
build = bundle.build
if build.is_ended:
logging.info('build ended')
return
build.proto.infra.ParseFromString(bundle.infra.infra)
build.proto.input.properties.ParseFromString(
bundle.input_properties.properties
)
build.proto.infra.buildbucket.hostname = (
app_identity.get_default_version_hostname()
)
if not build.proto.infra.swarming.task_id:
_create_swarming_task(build)
else:
# Note: _load_task_result return value may end up being None here if there's
# no such task. It will result in the build ending with INFRA_FAILURE.
_sync_build_with_task_result(build_id, _load_task_result(build))
# Enqueue a continuation task.
next_gen = generation + 1
payload = {
'id': build.key.id(),
'generation': next_gen,
}
deadline = build.create_time + model.BUILD_TIMEOUT
age_limit = deadline - utils.utcnow()
continuation = {
'name': 'sync-task-%d-%d' % (build_id, next_gen),
'url': '/internal/task/swarming/sync-build/%s' % build.key.id(),
'payload': json.dumps(payload, sort_keys=True),
'retry_options': {'task_age_limit': age_limit.total_seconds()},
'countdown': 300, # Run the continuation task in 5m.
}
try:
tq.enqueue_async(
SYNC_QUEUE_NAME, [continuation], transactional=False
).get_result()
except (taskqueue.TaskAlreadyExistsError,
taskqueue.TombstonedTaskError): # pragma: no cover
# Previous attempt for this generation of the task might have already
# created the next generation task, and in case of TombstonedTaskError this
# task may be already executing or even finished. This is OK.
pass
def _create_swarming_task(build):
"""Creates a swarming task for the build.
Requires build.proto.input.properties and build.proto.infra to be populated.
"""
assert build.proto.HasField('infra')
assert build.proto.input.HasField('properties')
sw = build.proto.infra.swarming
logging.info('creating a task on %s', sw.hostname)
build_id = build.proto.id
settings = config.get_settings_async().get_result()
# Prepare task definition.
task_def = compute_task_def(build, settings, fake_build=False)
# This is to prevent accidental multiple task creation.
task_def['request_uuid'] = str(uuid.UUID(int=build_id))
# Insert secret bytes.
build_token = tokens.generate_build_token(build_id)
secrets = launcher_pb2.BuildSecrets(
build_token=build_token,
resultdb_invocation_update_token=build.resultdb_update_token,
)
secret_bytes_b64 = base64.b64encode(secrets.SerializeToString())
for ts in task_def['task_slices']:
ts['properties']['secret_bytes'] = secret_bytes_b64
auth_kwargs = {}
if build.uses_realms:
# If running in realms mode, use the project identity when calling Swarming.
# Swarming will check whether this project is allowed to use the requested
# pool. This replaces `delegation_tag` mechanism.
auth_kwargs.update(act_as_project=build.project)
# Associate the task with the build's realm. Swarming will use it to
# control who can access the Swarming task.
task_def['realm'] = build.realm
else:
# In non-realms mode use delegation tokens, this is deprecated.
auth_kwargs.update(
# "Pretend" to be the user who submitted the build.
impersonate=True,
delegation_identity=build.created_by,
# Tell Swarming what bucket the task belongs to. Swarming uses this to
# authorize access to pools assigned to specific buckets only.
delegation_tag='buildbucket:bucket:%s' % build.bucket_id,
)
new_task_id = None
try:
res = _call_api_async(
hostname=sw.hostname,
path='tasks/new',
method='POST',
payload=task_def,
deadline=30,
# Try only once so we don't have multiple swarming tasks with same
# build_id and valid token, otherwise they will race.
max_attempts=1,
**auth_kwargs
).get_result()
new_task_id = res['task_id']
assert new_task_id
logging.info('Created a swarming task %r', new_task_id)
except net.Error as err:
if err.status_code >= 500 or err.status_code is None:
raise
# Dump the task definition to the log.
# Pop secret bytes.
for ts in task_def['task_slices']:
ts['properties'].pop('secret_bytes')
logging.error(
(
'Swarming responded with HTTP %d. '
'Ending the build with INFRA_FAILURE.\n'
'Task def: %s\n'
'Response: %s'
),
err.status_code,
task_def,
err.response,
)
_end_build(
build_id,
common_pb2.INFRA_FAILURE,
(
'Swarming task creation API responded with HTTP %d: `%s`' %
(err.status_code, err.response.replace('`', '"'))
),
end_time=utils.utcnow(),
)
return
# Task was created.
@ndb.transactional
def txn():
bundle = model.BuildBundle.get(build_id, infra=True)
if not bundle: # pragma: no cover
return False
with bundle.infra.mutate() as infra:
sw = infra.swarming
if sw.task_id:
logging.warning('build already has a task %r', sw.task_id)
return False
sw.task_id = new_task_id
bundle.build.update_token = build_token
bundle.put()
return True
updated = False
try:
updated = txn()
finally:
if not updated:
logging.error(
'created a task, but did not update datastore.\n'
'canceling task %s, best effort',
new_task_id,
)
cancel_task(
sw.hostname, new_task_id, build.realm if build.uses_realms else None
)
class TaskSyncBuild(webapp2.RequestHandler): # pragma: no cover
"""Sync a LUCI build with swarming."""
@decorators.require_taskqueue(CREATE_QUEUE_NAME, SYNC_QUEUE_NAME)
def post(self, build_id): # pylint: disable=unused-argument
body = json.loads(self.request.body)
_sync_build(body['id'], body['generation'])
def _generate_build_url(milo_hostname, build):
if not milo_hostname:
sw = build.proto.infra.swarming
return 'https://%s/task?id=%s' % (sw.hostname, sw.task_id)
return 'https://%s/b/%d' % (milo_hostname, build.key.id())
@ndb.tasklet
def cancel_task_async(hostname, task_id, realm):
"""Cancels a swarming task.
Args:
hostname: Swarming service hostname.
task_id: ID of the Swarming task to cancel.
realm: if set, use the corresponding project identity when calling Swarming.
"""
# When running in Realms mode use the project identity instead of the
# Buildbucket's own account.
res = yield _call_api_async(
hostname=hostname,
path='task/%s/cancel' % task_id,
method='POST',
payload={
'kill_running': True,
},
act_as_project=realm.split(':')[0] if realm else None,
)
if res.get('ok'):
logging.info('response: %r', res)
else:
logging.warning('response: %r', res)
def cancel_task(hostname, task_id, realm):
"""Sync version of cancel_task_async."""
cancel_task_async(hostname, task_id, realm).get_result()
def cancel_task_transactionally_async(build, swarming):
"""Transactionally schedules a push task to cancel a swarming task.
Args:
build: a model.Build entity whose task we are canceling.
swarming: a build_pb2.BuildInfra.Swarming message with info about the task.
"""
assert ndb.in_transaction()
assert swarming.hostname and swarming.task_id
task_def = {
# Put hostname and task_id into the URL for more informative logging.
'url':
'/internal/task/buildbucket/cancel_swarming_task/%s/%s' % (
swarming.hostname,
swarming.task_id,
),
'payload': {
'hostname': swarming.hostname,
'task_id': swarming.task_id,
'realm': build.realm if build.uses_realms else None,
},
}
return tq.enqueue_async('backend-default', [task_def])
class TaskCancelSwarmingTask(webapp2.RequestHandler): # pragma: no cover
"""Cancels a swarming task."""
@decorators.require_taskqueue('backend-default')
def post(self, host, task_id): # pylint: disable=unused-argument
payload = json.loads(self.request.body)
cancel_task(payload['hostname'], payload['task_id'], payload['realm'])
################################################################################
# Update builds.
def _load_task_result(build):
"""Fetches the result of a Swarming task associated with |build|.
Uses build.proto.infra.swarming to figure out where to fetch the result from.
This proto section must be populated prior to calling this function.
Logs an error and returns None if there's no such task.
"""
assert build.proto.HasField('infra')
hostname = build.proto.infra.swarming.hostname
task_id = build.proto.infra.swarming.task_id
# When running in Realms mode use the project identity instead of the
# Buildbucket's own account.
result = _call_api_async(
hostname=hostname,
path='task/%s/result' % task_id,
act_as_project=build.project if build.uses_realms else None,
).get_result()
if not result:
logging.error(
'Task %s/%s referenced by build %s is not found', hostname, task_id,
build.key.id()
)
return result
def _sync_build_with_task_result_in_memory(build, build_infra, task_result):
"""Syncs buildbucket |build| state with swarming task |result|.
Mutates build only if status has changed. Returns True in that case.
If task_result is None, marks the build as INFRA_FAILURE.
"""
# Task result docs:
# https://github.com/luci/luci-py/blob/985821e9f13da2c93cb149d9e1159c68c72d58da/appengine/swarming/server/task_result.py#L239
if build.is_ended: # pragma: no cover
# Completed builds are immutable.
return False
now = utils.utcnow()
old_status = build.proto.status
bp = build.proto
with build_infra.mutate() as infra:
sw = infra.swarming
sw.ClearField('bot_dimensions')
for d in (task_result or {}).get('bot_dimensions', []):
if 'value' in d:
assert isinstance(d['value'], list)
for v in d['value']:
sw.bot_dimensions.add(key=d['key'], value=v)
sw.bot_dimensions.sort(key=lambda d: (d.key, d.value))
terminal_states = {
'EXPIRED',
'TIMED_OUT',
'BOT_DIED',
'CANCELED',
'COMPLETED',
'KILLED',
'NO_RESOURCE',
}
state = (task_result or {}).get('state')
if state is None:
bp.status = common_pb2.INFRA_FAILURE
bp.summary_markdown = (
'Swarming task %s on %s unexpectedly disappeared' %
(sw.task_id, sw.hostname)
)
elif state == 'PENDING':
if bp.status == common_pb2.STARTED: # pragma: no cover
# Most probably, race between PubSub push handler and Cron job.
# With swarming, a build cannot go from STARTED back to PENDING,
# so ignore this.
return False
bp.status = common_pb2.SCHEDULED
elif state == 'RUNNING':
bp.status = common_pb2.STARTED
elif state in terminal_states:
if state in ('CANCELED', 'KILLED'):
bp.status = common_pb2.CANCELED
elif state == 'NO_RESOURCE':
# Task did not start.
bp.status = common_pb2.INFRA_FAILURE
bp.status_details.resource_exhaustion.SetInParent()
elif state == 'EXPIRED':
# Task did not start.
bp.status = common_pb2.INFRA_FAILURE
bp.status_details.resource_exhaustion.SetInParent()
bp.status_details.timeout.SetInParent()
elif state == 'TIMED_OUT':
# Task started, but timed out.
bp.status = common_pb2.INFRA_FAILURE
bp.status_details.timeout.SetInParent()
elif state == 'BOT_DIED' or task_result.get('failure'):
# If this truly was a non-infra failure, bbagent would catch that and
# mark the build as FAILURE.
# That did not happen, so this is an infra failure.
bp.status = common_pb2.INFRA_FAILURE
else:
assert state == 'COMPLETED'
bp.status = common_pb2.SUCCESS
else: # pragma: no cover
assert False, 'Unexpected task state: %s' % state
if bp.status == old_status: # pragma: no cover
return False
build.status_changed_time = now
logging.info(
'Build %s status: %s -> %s', build.key.id(), old_status, bp.status
)
def ts(key):
v = (task_result or {}).get(key)
return _parse_ts(v) if v else None
if bp.status == common_pb2.STARTED:
bp.start_time.FromDatetime(ts('started_ts') or now)
elif build.is_ended: # pragma: no branch
logging.info('Build %s result: %s', build.key.id(), build.result)
started_ts = ts('started_ts')
if started_ts:
bp.start_time.FromDatetime(started_ts)
bp.end_time.FromDatetime(ts('completed_ts') or ts('abandoned_ts') or now)
# It is possible that swarming task was marked as NO_RESOURCE the moment
# it was created. Swarming VM time is not synchronized with buildbucket VM
# time, so adjust end_time if needed.
if bp.end_time.ToDatetime() < bp.create_time.ToDatetime():
bp.end_time.CopyFrom(bp.create_time)
return True
def _sync_build_with_task_result(build_id, task_result):
"""Syncs Build entity in the datastore with a result of the swarming task."""
@ndb.transactional
def txn():
bundle = model.BuildBundle.get(build_id, infra=True)
if not bundle: # pragma: no cover
return None
build = bundle.build
status_changed = _sync_build_with_task_result_in_memory(
build, bundle.infra, task_result
)
if not status_changed:
return None
futures = [bundle.put_async()]
if build.proto.status == common_pb2.STARTED:
futures.append(events.on_build_starting_async(build))
elif build.is_ended: # pragma: no branch
futures.append(
model.BuildSteps.cancel_incomplete_steps_async(
build_id, build.proto.end_time
)
)
futures.append(events.on_build_completing_async(build))
for f in futures:
f.check_success()
return build
build = txn()
if build:
if build.proto.status == common_pb2.STARTED:
events.on_build_started(build)
elif build.is_ended: # pragma: no branch
events.on_build_completed(build)
class SubNotify(webapp2.RequestHandler):
"""Handles PubSub messages from swarming.
Assumes unprivileged users cannot send requests to this handler.
"""
bad_message = False
def unpack_msg(self, msg):
"""Extracts swarming hostname, creation time, task id and build id from msg.
Aborts if |msg| is malformed.
"""
data_b64 = msg.get('data')
if not data_b64:
self.stop('no message data')
try:
data_json = base64.b64decode(data_b64)
except ValueError as ex: # pragma: no cover
self.stop('cannot decode message data as base64: %s', ex)
data = self.parse_json_obj(data_json, 'message data')
userdata = self.parse_json_obj(data.get('userdata'), 'userdata')
hostname = userdata.get('swarming_hostname')
if not hostname:
self.stop('swarming hostname not found in userdata')
if not isinstance(hostname, basestring):
self.stop('swarming hostname is not a string')
created_ts = userdata.get('created_ts')
if not created_ts:
self.stop('created_ts not found in userdata')
try:
created_time = utils.timestamp_to_datetime(created_ts)
except ValueError as ex:
self.stop('created_ts in userdata is invalid: %s', ex)
build_id = userdata.get('build_id')
if not isinstance(build_id, (int, long)):
self.stop('invalid build_id %r', build_id)
task_id = data.get('task_id')
if not task_id:
self.stop('task_id not found in message data')
return hostname, created_time, task_id, build_id
def post(self):
msg = self.request.json['message']
logging.info('Received message: %r', msg)
# Try not to process same message more than once.
nc = 'swarming-pubsub-msg-id'
if memcache.get(msg['messageId'], namespace=nc):
logging.info('seen this message before, ignoring')
else:
self._process_msg(msg)
memcache.set(msg['messageId'], 1, namespace=nc, time=10 * 60)
def _process_msg(self, msg):
hostname, created_time, task_id, build_id = self.unpack_msg(msg)
task_url = 'https://%s/task?id=%s' % (hostname, task_id)
# Load build.
logging.info('Build id: %s', build_id)
bundle = model.BuildBundle.get(build_id, infra=True)
if not bundle:
# TODO(nodir): remove this if statement.
fresh = utils.utcnow() < created_time + datetime.timedelta(minutes=1)
if fresh: # pragma: no cover
self.stop(
'Build for a swarming task not found yet\nBuild: %s\nTask: %s',
build_id,
task_url,
redeliver=True
)
self.stop(
'Build for a swarming task not found\nBuild: %s\nTask: %s', build_id,
task_url
)
# Populate build.proto.infra section, it is used by _load_task_result.
build = bundle.build
build.proto.infra.ParseFromString(bundle.infra.infra)
# Ensure the loaded build is associated with the task.
sw = build.proto.infra.swarming
if hostname != sw.hostname:
self.stop(
'swarming_hostname %s of build %s does not match %s', sw.hostname,
build_id, hostname
)
if not sw.task_id:
# Do not re-deliver if the build is completed.
self.stop(
'build is not associated with a task',
redeliver=not build.is_ended,
)
if task_id != sw.task_id:
self.stop(
'swarming_task_id %s of build %s does not match %s', sw.task_id,
build_id, task_id
)
# Update build.
_sync_build_with_task_result(build_id, _load_task_result(build))
def stop(self, msg, *args, **kwargs):
"""Logs error and stops request processing.
Args:
msg: error message
args: format args for msg.
kwargs:
redeliver: True to process this message later.
"""
self.bad_message = True
if args:
msg = msg % args
redeliver = kwargs.get('redeliver')
logging.log(logging.WARNING if redeliver else logging.ERROR, msg)
self.response.write(msg)
self.abort(400 if redeliver else 200)
def parse_json_obj(self, text, name):
"""Parses a JSON object from |text| if possible. Otherwise stops."""
try:
result = json.loads(text or '')
if not isinstance(result, dict):
raise ValueError()
return result
except ValueError:
self.stop('%s is not a valid JSON object: %r', name, text)
def get_backend_routes(): # pragma: no cover
return [
webapp2.Route(
r'/internal/task/swarming/sync-build/<build_id:\d+>', TaskSyncBuild
),
webapp2.Route(
r'/internal/task/buildbucket/cancel_swarming_task/<host>/<task_id>',
TaskCancelSwarmingTask
),
webapp2.Route(r'/_ah/push-handlers/swarming/notify', SubNotify),
]
################################################################################
# Utility functions
@ndb.tasklet
def _call_api_async(
hostname,
path,
method='GET',
payload=None,
act_as_project=None,
impersonate=False,
delegation_tag=None,
delegation_identity=None,
deadline=None,
max_attempts=None,
):
"""Calls Swarming API.
Has three modes of authentication:
1. `not impersonate and not act_as_project`: act as Buildbucket.
2. `impersonate and not act_as_project`: act as `delegation_identity`.
3. `not impersonate and act_as_project`: act as `project:<act_as_project>`.
All other combinations are forbidden.
"""
assert not (impersonate and act_as_project)
delegation_token = None
if impersonate:
delegation_token = yield user.delegate_async(
hostname, identity=delegation_identity, tag=delegation_tag
)
url = 'https://%s/_ah/api/swarming/v1/%s' % (hostname, path)
res = yield net.json_request_async(
url,
method=method,
payload=payload,
scopes=net.EMAIL_SCOPE,
deadline=deadline,
max_attempts=max_attempts,
delegation_token=delegation_token,
project_id=act_as_project,
)
raise ndb.Return(res)
def _parse_ts(ts):
"""Parses Swarming API's timestamp, which is RFC3339 without time zone."""
# time-secfrac part of RFC3339 format is optional
# https://tools.ietf.org/html/rfc3339#section-5.6
# strptime cannot handle optional parts.
# HACK: add the time-secfrac part if it is missing.
# P(time-secfrac is missing) = 1e-6.
if '.' not in ts: # pragma: no cover
ts += '.0'
return datetime.datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%f')
def _end_build(build_id, status, summary_markdown='', end_time=None):
assert model.is_terminal_status(status)
end_time = end_time or utils.utcnow()
@ndb.transactional
def txn():
build = model.Build.get_by_id(build_id)
if not build: # pragma: no cover
return None
build.proto.status = status
build.proto.summary_markdown = summary_markdown
build.proto.end_time.FromDatetime(end_time)
ndb.Future.wait_all([
build.put_async(),
events.on_build_completing_async(build)
])
return build
build = txn()
if build: # pragma: no branch
events.on_build_completed(build)
def _using_kitchen(build_proto):
return build_proto.exe.cmd[0] == 'recipes'