# Copyright 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module integrates buildbucket with swarming.
A bucket config may have "swarming" field that specifies how a builder
is mapped to a recipe. If build is scheduled for a bucket/builder
with swarming configuration, the integration overrides the default behavior.
Prior adding Build to the datastore, a swarming task is created. The definition
of the task definition is rendered from a global template. The parameters of the
template are defined by the bucket config and build parameters.
A build may have "swarming" parameter which is a JSON object with keys:
recipe: JSON object
revision: revision of the recipe. Will be available in the task template
as $revision parameter.
When creating a task, a PubSub topic is specified. Swarming will notify on
task status updates to the topic and buildbucket will sync its state.
Eventually both swarming task and buildbucket build will complete.
Swarming does not guarantee notification delivery, so there is also a cron job
that checks task results of all incomplete builds every 10 min.
import base64
import copy
import datetime
import hashlib
import json
import logging
import posixpath
import random
import string
from components import config as component_config
from components import decorators
from components import net
from components import protoutil
from components import utils
from components.config import validation
from google.appengine.api import app_identity
from google.appengine.api import memcache
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
from google.protobuf import json_format
import webapp2
from third_party import annotations_pb2
from . import flatten_swarmingcfg
from . import isolate
from . import swarmingcfg as swarmingcfg_module
from proto import build_pb2
from proto import launcher_pb2
from proto.config import project_config_pb2
from v2 import tokens
import annotations
import api_common
import buildtags
import config
import errors
import events
import gae_ts_mon
import logdog
import model
import user
_PUBSUB_TOPIC = 'swarming'
_PARAM_PROPERTIES = 'properties'
_PARAM_SWARMING = 'swarming'
_PARAM_CHANGES = 'changes'
_BUILD_RUN_RESULT_FILENAME = 'build-run-result.json'
# Note: we do not store build-run-result.json as is.
# Instead we convert it to a list of buildbucket.v2.Step and store
# in binary form in model.BuildSteps, which has its own limit.
# A limit is still needed to avoid OOMs.
_BUILD_RUN_RESULT_SIZE_METRIC = gae_ts_mon.CumulativeDistributionMetric(
'Size of the build result JSON file fetched from isolate',
_BUILD_STEPS_SIZE_METRIC = gae_ts_mon.CumulativeDistributionMetric(
'Size of the build steps',
# The default percentage of builds that use canary swarming task template.
# This number is relatively high so we treat canary seriously and that we have
# a strong signal if the canary is broken.
# If it is, the template must be reverted to a stable version ASAP.
# This is the path, relative to the swarming run dir, to the directory that
# contains the mounted swarming named caches. It will be prepended to paths of
# caches defined in swarmbucket configs.
_CACHE_DIR = 'cache'
# This is the path, relative to the swarming run dir, which is where the recipes
# are either checked out, or installed via CIPD package.
_KITCHEN_CHECKOUT = 'kitchen-checkout'
# Creation/cancellation of tasks.
class Error(Exception):
"""Base class for swarmbucket-specific errors."""
class TemplateNotFound(Error):
"""Raised when a task template is not found."""
class CanaryTemplateNotFound(TemplateNotFound):
"""Raised when canary template is explicitly requested, but not found."""
def _get_settings_async(): # pragma: no cover
settings = yield config.get_settings_async()
raise ndb.Return(settings.swarming)
def _get_task_template_async(canary, canary_required=True):
"""Gets a tuple (template_revision, template_dict, canary_bool).
canary (bool): specifies a whether canary template should be returned.
canary_required (bool): controls the behavior if |canary| is True and
the canary template is not found. If False use the non-canary template,
otherwise raise CanaryTemplateNotFound.
Ignored if canary is False.
Tuple (template_revision, template_dict, canary):
template_revision (str): revision of the template, e.g. commit hash.
template_dict (dict): parsed template, or None if not found.
May contain $parameters that must be expanded using format_obj().
canary (bool): True if the returned template is a canary template.
text = None
revision = None
if canary:
logging.warning('using canary swarming task template')
revision, text = yield component_config.get_self_config_async(
'swarming_task_template_canary.json', store_last_good=True
canary = bool(text)
if not text:
if canary_required:
raise CanaryTemplateNotFound(
'canary swarming task template is requested, '
'but the canary template is not found'
'canary swarming task template is not found. using the default one'
if not text:
revision, text = yield component_config.get_self_config_async(
'swarming_task_template.json', store_last_good=True
template = None
if text:
template = json.loads(text)
template.pop('__comment__', None)
raise ndb.Return(revision, template, canary)
def validate_build_parameters(builder_name, params):
"""Raises errors.InvalidInputError if build parameters are invalid."""
params = copy.deepcopy(params)
def bad(fmt, *args):
raise errors.InvalidInputError(fmt % args)
params.pop(model.BUILDER_PARAMETER) # already validated
def assert_object(name, value):
if not isinstance(value, dict):
bad('%s parameter must be an object' % name)
changes = params.pop(_PARAM_CHANGES, None)
if changes is not None:
if not isinstance(changes, list):
bad('changes param must be an array')
for c in changes: # pragma: no branch
if not isinstance(c, dict):
bad('changes param must contain only objects')
repo_url = c.get('repo_url')
if repo_url is not None and not isinstance(repo_url, basestring):
bad('change repo_url must be a string')
author = c.get('author')
if not isinstance(author, dict):
bad('change author must be an object')
email = author.get('email')
if not isinstance(email, basestring):
bad('change author email must be a string')
if not email:
bad('change author email not specified')
properties = params.pop(_PARAM_PROPERTIES, None)
if properties is not None:
assert_object('properties', properties)
if properties.pop('buildername', builder_name) != builder_name:
bad('inconsistent builder name')
expected_emails = [c['author']['email'] for c in (changes or [])]
if properties.pop('blamelist', None) not in (None, expected_emails):
'inconsistent blamelist property; blamelist must not be set or '
'it must match the emails in the "changes" build parameter'
# Validate the rest of the properties using common logic.
ctx = validation.Context.raise_on_error(exc_type=errors.InvalidInputError)
for k, v in properties.iteritems():
with ctx.prefix('property %r:', k):
swarmingcfg_module.validate_recipe_property(k, v, ctx)
swarming = params.pop(_PARAM_SWARMING, None)
if swarming is not None:
assert_object('swarming', swarming)
swarming = copy.deepcopy(swarming)
override_builder_cfg_data = swarming.pop('override_builder_cfg', None)
if override_builder_cfg_data is not None:
assert_object('swarming.override_builder_cfg', override_builder_cfg_data)
if 'build_numbers' in override_builder_cfg_data:
'swarming.override_builder_cfg parameter '
'cannot override build_numbers'
override_builder_cfg = project_config_pb2.Builder()
protoutil.merge_dict(override_builder_cfg_data, override_builder_cfg)
except TypeError as ex:
bad('swarming.override_builder_cfg parameter: %s', ex)
bad('swarming.override_builder_cfg cannot override builder name')
if override_builder_cfg.mixins:
bad('swarming.override_builder_cfg cannot use mixins')
if 'pool:' in override_builder_cfg.dimensions:
bad('swarming.override_builder_cfg cannot remove pool dimension')
ctx = validation.Context.raise_on_error(
prefix='swarming.override_builder_cfg parameter: '
override_builder_cfg, [], False, ctx
if swarming:
bad('unrecognized keys in swarming param: %r', swarming.keys())
# Mocked in tests.
def _should_use_canary_template(percentage): # pragma: no cover
"""Returns True if a canary template should be used.
This function is non-determinstic.
return random.randint(0, 99) < percentage
def _prepare_builder_config(builder_cfg, swarming_param):
"""Returns final version of builder config to use for |build|.
Expects arguments to be valid.
# Builders are already flattened in the datastore.
result = builder_cfg
# Apply overrides in the swarming parameter.
override_builder_cfg_data = swarming_param.get('override_builder_cfg', {})
if override_builder_cfg_data:
override_builder_cfg = project_config_pb2.Builder()
protoutil.merge_dict(override_builder_cfg_data, result)
ctx = validation.Context.raise_on_error(
prefix='swarming.override_builder_cfg parameter: '
flatten_swarmingcfg.merge_builder(result, override_builder_cfg)
swarmingcfg_module.validate_builder_cfg(result, [], True, ctx)
return result
def _buildbucket_property(build):
"""Returns value for 'buildbucket' build property.
The format of the returned value corresponds the one used in
buildbot-buildbucket integration [1], with two exceptions:
- it is not encoded in JSON
- the list of tags are initial tags only.
Does not include auto-generated tags.
return {
'hostname': app_identity.get_default_version_hostname(),
'build': {
'project': build.project,
'bucket': api_common.format_luci_bucket(build.bucket_id),
'created_by': build.created_by.to_bytes(),
'created_ts': utils.datetime_to_timestamp(build.create_time),
'id': str(,
# Note: this includes only user-specified tags.
# It does not include auto-generated tags, such as "swarming_tag".
# This is a bit different from Buildbot-Buildbucket integration.
# In practice, however, only "buildset" tag is read from this list.
'tags': build.initial_tags,
def _apply_if_tags(task):
"""Filters a task based on '#if-tag's on JSON objects.
JSON objects containing a property '#if-tag' will be checked to see if the
given value is one of the task's swarming tags. If the tag is present in the
swarming tags of the task, the object is included and the '#if-tag' property
is dropped.
If the JSON object does not contain '#if-tag', it will be unconditionally
It is not possible to filter the entire (top-level) task :).
This returns a copy of the task.
tags = set(task.get('tags', ()))
tag_string = '#if-tag'
def keep(obj):
if isinstance(obj, dict) and tag_string in obj:
return obj[tag_string] in tags
return True
def walk(obj):
if isinstance(obj, dict):
return {
k: walk(v) for k, v in obj.iteritems() if k != tag_string and keep(v)
if isinstance(obj, list):
return [walk(i) for i in obj if keep(i)]
return obj
return walk(task)
def _is_migrating_builder_prod_async(builder_cfg, build):
"""Returns True if the builder is prod according to the migration app.
See also 'luci_migration_host' in the project config.
If unknown, returns None.
On failures, logs them and returns None.
ret = None
master = None
props_list = (
build.parameters.get(_PARAM_PROPERTIES) or {},
for prop_name in ('luci_migration_master_name', 'mastername'):
for props in props_list:
master = props.get(prop_name)
if master:
if master: # pragma: no branch
host = swarmingcfg_module.clear_dash(builder_cfg.luci_migration_host)
if master and host:
url = 'https://%s/masters/%s/builders/%s/' % (
host, master,
res = yield net.json_request_async(
url, params={'format': 'json'}, scopes=net.EMAIL_SCOPE
ret = res.get('luci_is_prod')
except net.NotFoundError:
'missing migration status for %r/%r', master,
except net.Error:
'failed to get migration status for %r/%r', master,
raise ndb.Return(ret)
def _create_task_def_async(
swarming_cfg, builder_cfg, build, build_number, fake_build
"""Creates a swarming task definition for the |build|.
Supports build properties that are supported by Buildbot-Buildbucket
integration. See
Sets build.swarming_hostname and build.canary attributes.
errors.InvalidInputError if build.parameters are invalid.
assert isinstance(swarming_cfg,
project_config_pb2.Swarming), type(swarming_cfg)
assert isinstance(builder_cfg, project_config_pb2.Builder), type(builder_cfg)
assert isinstance(build, model.Build), type(build)
assert build.key and, build.key
assert build.url, 'build.url should have been initialized'
assert isinstance(build_number,
int) or build_number is None, type(build_number)
assert isinstance(fake_build, bool), type(fake_build)
params = copy.deepcopy(build.parameters) or {}
build.parameters_actual = params
# params is an alias for build.parameters_actual.
validate_build_parameters(, params)
swarming_param = params.get(_PARAM_SWARMING) or {}
# Use canary template?
assert isinstance(build.canary_preference, model.CanaryPreference)
if build.canary_preference == model.CanaryPreference.AUTO:
if swarming_cfg.HasField( # pragma: no branch
canary_percentage = swarming_cfg.task_template_canary_percentage.value
build.canary = _should_use_canary_template(canary_percentage)
build.canary = build.canary_preference == model.CanaryPreference.CANARY
builder_cfg = _prepare_builder_config(builder_cfg, swarming_param)
task_template_rev, task_template, build.canary = (
yield _get_task_template_async(
build.canary_preference == model.CanaryPreference.CANARY
except CanaryTemplateNotFound as ex:
raise errors.InvalidInputError(ex.message)
if not task_template:
raise TemplateNotFound('task template is not configured')
build.swarming_hostname = swarming_cfg.hostname
if not build.swarming_hostname: # pragma: no cover
raise Error('swarming hostname is not configured')
h = hashlib.sha256('%s/%s' % (build.bucket_id,
task_template_params = {
'builder_hash': h,
'build_result_filename': _BUILD_RUN_RESULT_FILENAME,
'build_url': build.url,
'cache_dir': _CACHE_DIR,
'hostname': app_identity.get_default_version_hostname(),
'project': build.project,
'swarming_hostname': build.swarming_hostname,
extra_swarming_tags = []
extra_cipd_packages = []
if builder_cfg.HasField('recipe'): # pragma: no branch
(extra_swarming_tags, extra_cipd_packages, extra_task_template_params
) = _setup_recipes(build, builder_cfg, build_number, params)
# Render task template.
# Format is
task_template_params = {
k: v or '' for k, v in task_template_params.iteritems()
task = format_obj(task_template, task_template_params)
# Set 'pool_task_template' to match our build's canary status.
# This can be made unconditional after is closed. Right now
# we override this with "SKIP" in the templates to allow an atomic transition.
'pool_task_template', 'CANARY_PREFER' if build.canary else 'CANARY_NEVER'
task['priority'] = _calc_priority(build, builder_cfg, task.get('priority'))
if builder_cfg.service_account: # pragma: no branch
# Don't pass it if not defined, for backward compatibility.
task['service_account'] = builder_cfg.service_account
task['tags'] = _calc_tags(
build, builder_cfg, extra_swarming_tags, task_template_rev,
task = _apply_if_tags(task)
build, builder_cfg, extra_cipd_packages, task
if not fake_build: # pragma: no branch | covered by
_setup_swarming_request_pubsub(task, build)
raise ndb.Return(task)
def _setup_recipes(build, builder_cfg, build_number, params):
"""Initializes a build request using recipes.
Mutates params.
extra_swarming_tags, extra_cipd_packages, extra_task_template_params
# Properties specified in build parameters must override those in builder
# config.
build_properties = flatten_swarmingcfg.read_properties(builder_cfg.recipe)
build_properties.update(params.get(_PARAM_PROPERTIES) or {})
params[_PARAM_PROPERTIES] = build_properties
# build_properties is an alias for params[_PARAM_PROPERTIES]
# In order to allow some builders to behave like other builders, we allow
# builders to explicitly set buildername.
if 'buildername' not in build_properties:
build_properties['buildername'] =
assert isinstance(build.experimental, bool)
build_properties.setdefault('$recipe_engine/runtime', {}).update(
if build_number is not None: # pragma: no branch
build_properties['buildnumber'] = build_number
changes = params.get(_PARAM_CHANGES)
if changes: # pragma: no branch
# Buildbucket-Buildbot integration passes repo_url of the first change in
# build parameter "changes" as "repository" attribute of SourceStamp.
# /scripts/master/buildbucket/
# Buildbot passes repository of the build source stamp as "repository"
# build property. Recipes, in partiular bot_update recipe module, rely on
# "repository" property and it is an almost sane property to support in
# swarmbucket.
repo_url = changes[0].get('repo_url')
if repo_url: # pragma: no branch
build_properties['repository'] = repo_url
# Buildbot-Buildbucket integration converts emails in changes to blamelist
# property.
emails = [c.get('author', {}).get('email') for c in changes]
build_properties['blamelist'] = filter(None, emails)
extra_task_template_params = {
'properties_json': json.dumps(build_properties, sort_keys=True),
'checkout_dir': _KITCHEN_CHECKOUT,
extra_swarming_tags = [
'recipe_name:%s' %,
extra_cipd_packages = []
if builder_cfg.recipe.cipd_package:
'repository': '',
'revision': '',
'recipe_package:' + builder_cfg.recipe.cipd_package
'package_name': builder_cfg.recipe.cipd_package,
'version': builder_cfg.recipe.cipd_version or 'refs/heads/master',
'repository': builder_cfg.recipe.repository,
'revision': 'HEAD',
'recipe_repository:' + builder_cfg.recipe.repository
return extra_swarming_tags, extra_cipd_packages, extra_task_template_params
def _calc_priority(build, builder_cfg, priority):
"""Calculates the Swarming task request priority to use."""
priority = int(priority or 0)
if builder_cfg.priority > 0: # pragma: no branch
priority = builder_cfg.priority
if build.experimental:
priority = min(255, priority * 2)
# Swarming accepts priority as a string
return str(priority)
def _calc_tags(
build, builder_cfg, extra_swarming_tags, task_template_rev, tags
"""Calculates the Swarming task request tags to use."""
tags = set(tags or [])
tags.add('buildbucket_bucket:%s' % build.bucket_id)
tags.add('buildbucket_build_id:%s' %
'buildbucket_hostname:%s' % app_identity.get_default_version_hostname()
tags.add('buildbucket_template_canary:%s' % ('1' if build.canary else '0'))
tags.add('buildbucket_template_revision:%s' % task_template_rev)
return sorted(tags)
def _setup_swarming_request_task_slices(
build, builder_cfg, extra_cipd_packages, task
"""Mutate the task request with named cache, CIPD packages and (soon) expiring
# For now, refuse a task template with more than one TaskSlice. Otherwise
# it would be much harder to rationalize what's happening while reading the
# Swarming task template.
if len(task[u'task_slices']) != 1:
raise errors.InvalidInputError(
'base swarming task template can only have one task_slices'
if build.key: # pragma: no branch
secrets = launcher_pb2.BuildSecrets(
task[u'task_slices'][0][u'properties'][u'secret_bytes'] = base64.b64encode(
if builder_cfg.expiration_secs > 0:
task[u'task_slices'][0][u'expiration_secs'] = str(
# Now take a look to generate a fallback! This is done by inspecting the
# Builder named caches for the flag "wait_for_warm_cache_secs".
dims = _setup_swarming_props(
if dims:
if len(dims) > 6:
raise errors.InvalidInputError(
'Too many (%d > 6) TaskSlice fallbacks' % len(dims)
# Create a fallback by copying the original task slice, each time adding the
# corresponding expiration.
base_task_slice = task[u'task_slices'].pop()
base_task_slice.setdefault(u'wait_for_capacity', False)
last_exp = 0
for expiration_secs in sorted(dims):
t = {
u'expiration_secs': str(expiration_secs - last_exp),
u'properties': copy.deepcopy(base_task_slice[u'properties']),
u'wait_for_capacity': base_task_slice[u'wait_for_capacity'],
last_exp = expiration_secs
# Tweak expiration on the base_task_slice, which is the last slice.
exp = max(int(base_task_slice[u'expiration_secs']) - last_exp, 60)
base_task_slice[u'expiration_secs'] = str(exp)
assert len(task[u'task_slices']) == len(dims) + 1
# Now add the actual fallback dimensions. They could be either from optional
# named caches or from buildercfg dimensions in the form
# "<expiration_secs>:<key>:<value>".
extra_dims = []
for i, (_expiration_secs, kv) in enumerate(sorted(dims.iteritems(),
# Now mutate each TaskProperties to have the desired dimensions.
props = task[u'task_slices'][-2 - i][u'properties']
props[u'dimensions'].sort(key=lambda x: (x[u'key'], x[u'value']))
def _setup_swarming_props(build, builder_cfg, extra_cipd_packages, props):
"""Fills a TaskProperties.
Updates props; a python format of TaskProperties.
dict {expiration_sec: {key: list(values)}} to support caches. This is
different than the format in flatten_swarmingcfg.parse_dimensions().
props.setdefault('env', []).append({
'value': str(build.experimental).upper(),
props.setdefault('cipd_input', {}).setdefault('packages',
if builder_cfg.execution_timeout_secs > 0:
props['execution_timeout_secs'] = str(builder_cfg.execution_timeout_secs)
cache_fallbacks = _setup_named_caches(builder_cfg, props)
# Add in all of the non-fallback swarming dimensions to the task properties.
dims = swarmingcfg_module.read_dimensions(builder_cfg)
# Reconstruct dims as the actual list of dimensions needed. The challenge here
# is that repeated values are valid!
out = {}
for expirations_secs, items in cache_fallbacks.iteritems():
out.setdefault(expirations_secs, []).extend(
{u'key': u'caches', u'value': item} for item in items
for key, (value, expirations_secs) in dims.iteritems():
out.setdefault(expirations_secs, []).append({u'key': key, u'value': value})
props['dimensions'] = out.pop(0, [])
props[u'dimensions'].sort(key=lambda x: (x[u'key'], x[u'value']))
return out
def _setup_named_caches(builder_cfg, props):
"""Adds/replaces named caches to/in the Swarming TaskProperties.
Mutates props.
dict {expiration_secs: list(caches)}
template_caches = props.get(u'caches', [])
props[u'caches'] = []
names = set()
paths = set()
cache_fallbacks = {}
# Look for builder specific named caches.
for c in builder_cfg.caches:
if c.path.startswith(u'cache/'): # pragma: no cover
# TODO(nodir): remove this code path onces clients remove "cache/" from
# their configs.
cache_path = c.path
cache_path = posixpath.join(_CACHE_DIR, c.path)
props[u'caches'].append({u'path': cache_path, u'name':})
if c.wait_for_warm_cache_secs:
cache_fallbacks.setdefault(c.wait_for_warm_cache_secs, []).append(
# Look for named cache fallback from the swarming task template itself.
for c in template_caches:
# Only process the caches that were not overridden.
if c.get(u'path') not in paths and c.get(u'name') not in names:
props[u'caches'].append({u'name': c[u'name'], u'path': c[u'path']})
v = c.get(u'wait_for_warm_cache_secs')
if v:
cache_fallbacks.setdefault(v, []).append(c[u'name'])
props[u'caches'].sort(key=lambda p: p.get(u'path'))
return cache_fallbacks
def _setup_swarming_request_pubsub(task, build):
"""Mutates Swarming task request to add pubsub topic."""
task['pubsub_topic'] = 'projects/%s/topics/%s' % (
app_identity.get_application_id(), _PUBSUB_TOPIC
task['pubsub_userdata'] = json.dumps(
'created_ts': utils.datetime_to_timestamp(utils.utcnow()),
'swarming_hostname': build.swarming_hostname,
def _get_builder_async(build):
"""Returns builder info of the build.
errors.InvalidInputError if build has no builder name.
errors.BuilderNotFoundError if builder is not found.
(project_config_pb2.Bucket, project_config_pb2.Builder) tuple future.
if not build.parameters:
raise errors.InvalidInputError(
'A build for bucket %r must have parameters' % build.bucket_id
builder_name = build.parameters.get(model.BUILDER_PARAMETER)
if not isinstance(builder_name, basestring):
raise errors.InvalidInputError('Invalid builder name %r' % builder_name)
project_id, bucket_cfg = yield config.get_bucket_async(build.bucket_id)
assert bucket_cfg, 'if there is no bucket, this code should not have run'
assert project_id == build.project, '%r != %r' % (project_id, build.project)
if not bucket_cfg.HasField('swarming'):
raise errors.InvalidInputError(
'bucket %r is not a swarming bucket' %
for builder_cfg in # pragma: no branch
if == builder_name: # pragma: no branch
raise ndb.Return(bucket_cfg, builder_cfg)
raise errors.BuilderNotFoundError(
'Builder %r is not found in bucket %r' % (builder_name, build.bucket_id)
def prepare_task_def_async(build, build_number=None, fake_build=False):
settings = yield _get_settings_async()
bucket_cfg, builder_cfg = yield _get_builder_async(build)
ret = yield _prepare_task_def_async(
build, build_number, bucket_cfg, builder_cfg, settings, fake_build
raise ndb.Return(ret)
def _prepare_task_def_async(
build, build_number, bucket_cfg, builder_cfg, settings, fake_build
"""Prepares a swarming task definition.
Validates the new build.
If configured, generates a build number and updates the build.
Creates a swarming task definition.
Sets build attributes: swarming_hostname, canary and url.
May add "build_address" tag.
Returns a task_def dict.
if build.lease_key:
raise errors.InvalidInputError(
'Swarming buckets do not support creation of leased builds'
if build_number is not None:
# TODO( migrate build address to use short
# bucket names.
build.url = _generate_build_url(settings.milo_hostname, build)
if build.experimental is None:
build.experimental = (builder_cfg.experimental == project_config_pb2.YES)
is_prod = yield _is_migrating_builder_prod_async(builder_cfg, build)
if is_prod is not None:
build.experimental = not is_prod
task_def = yield _create_task_def_async(
bucket_cfg.swarming, builder_cfg, build, build_number, fake_build
raise ndb.Return(task_def)
def create_task_async(build, build_number=None):
"""Creates a swarming task for the build and mutates the build.
May be called only if build's bucket is configured for swarming.
errors.InvalidInputError if build attribute values are invalid.
settings = yield _get_settings_async()
bucket_cfg, builder_cfg = yield _get_builder_async(build)
task_def = yield _prepare_task_def_async(
build, build_number, bucket_cfg, builder_cfg, settings, False
assert build.swarming_hostname
res = yield _call_api_async(
# Make Swarming know what bucket the task belong too. Swarming uses
# this to authorize access to pools assigned to specific buckets only.
delegation_tag='buildbucket:bucket:%s' % build.bucket_id,
# Higher timeout than normal because if the task creation request
# fails, but the task is actually created, later we will receive a
# notification that the task is completed, but we won't have a build
# for that task, which results in errors in the log.
# This code path is executed by put and put_batch request handlers.
# Clients should retry these requests on transient errors, so
# do not retry requests to swarming.
task_id = res['task_id']'Created a swarming task %s', task_id)
build.swarming_task_id = task_id
'swarming_hostname:%s' % build.swarming_hostname,
'swarming_task_id:%s' % task_id,
task_req = res.get('request', {})
for t in task_req.get('tags', []):
key, value = buildtags.parse(t)
if key == 'log_location':
host, project, prefix, _ = logdog.parse_url(value)
build.logdog_hostname = host
build.logdog_project = project
build.logdog_prefix = prefix
build.tags.append(buildtags.unparse(buildtags.SWARMING_TAG_KEY, t))
task_slices = task_req.get('task_slices') or [{}]
for d in task_slices[0].get('properties', {}).get('dimensions', []):
dt = buildtags.unparse(d['key'], d['value'])
build.tags.append(buildtags.unparse(buildtags.SWARMING_DIMENSION_KEY, dt))
build.service_account = task_req.get('service_account')
# Mark the build as leased.
exp = sum(int(t['expiration_secs']) for t in task_def['task_slices'])
# This is not exactly true but #closeenough.
exp += int(
# Adding an hour definitely helps with the #closeenough.
exp += 24 * 60 * 60
build.lease_expiration_date = utils.utcnow() + datetime.timedelta(seconds=exp)
build.leasee = user.self_identity()
build.never_leased = False
def _generate_build_url(milo_hostname, build):
if not milo_hostname:
return (
'https://%s/task?id=%s' %
(build.swarming_hostname, build.swarming_task_id)
return 'https://%s/b/%d' % (milo_hostname,
def cancel_task_async(hostname, task_id):
"""Cancels a swarming task.
Noop if the task started running.
res = yield _call_api_async(
path='task/%s/cancel' % task_id,
'kill_running': True,
if res.get('ok'):'response: %r', res)
logging.warning('response: %r', res)
def cancel_task(hostname, task_id):
"""Sync version of cancel_task_async.
Noop if the task started running.
cancel_task_async(hostname, task_id).get_result()
def cancel_task_transactionally_async(hostname, task_id): # pragma: no cover
"""Transactionally schedules a push task to cancel a swarming task.
Swarming task cancelation is noop if the task started running.
url = (
'/internal/task/buildbucket/cancel_swarming_task/%s/%s' %
(hostname, task_id)
task = taskqueue.Task(url=url)
return task.add_async(queue_name='backend-default', transactional=True)
# Update builds.
def _load_task_result_async(hostname, task_id): # pragma: no cover
return _call_api_async(
path='task/%s/result' % task_id,
def _load_build_run_result_async(task_result, bucket_id, builder):
"""Fetches _BUILD_RUN_RESULT_FILENAME from swarming task output.
Logs errors.
Returns (build_run_result dict, error_string), where error_string is None
if there is no error.
outputs_ref = task_result.get('outputs_ref')
if not outputs_ref:
raise ndb.Return(None, None)
server_prefix = 'https://'
if not outputs_ref['isolatedserver'].startswith(server_prefix):
'Bad isolatedserver %r read from task %s',
outputs_ref['isolatedserver'], task_result['id']
raise ndb.Return(None, _BUILD_RUN_RESULT_CORRUPTED)
hostname = outputs_ref['isolatedserver'][len(server_prefix):]
def fetch_json_async(loc):
raw = yield isolate.fetch_async(loc)
raise ndb.Return(json.loads(raw))
except ValueError:
logging.exception('could not load %s', isolated_loc.human_url)
raise ndb.Return(None)
isolated_loc = isolate.Location(
hostname, outputs_ref['namespace'], outputs_ref['isolated']
isolated = yield fetch_json_async(isolated_loc)
if isolated is None:
raise ndb.Return(None, _BUILD_RUN_RESULT_CORRUPTED)
# Assume the isolated file format
result_entry = isolated['files'].get(_BUILD_RUN_RESULT_FILENAME)
if not result_entry:
raise ndb.Return(None, None)
result_size = int(result_entry['s'])
if result_size >= _BUILD_RUN_RESULT_MAX_SIZE:
raise ndb.Return(None, _BUILD_RUN_RESULT_TOO_LARGE)
result_size / 1000,
'bucket': bucket_id,
'builder': builder,
result_loc = isolated_loc._replace(digest=result_entry['h'])
build_result = yield fetch_json_async(result_loc)
raise ndb.Return(
build_result, None if build_result else _BUILD_RUN_RESULT_CORRUPTED
def _sync_build_in_memory(
build, task_result, build_run_result, build_run_result_error
"""Syncs buildbucket |build| state with swarming task |result|."""
# Task result docs:
# build_run_result is dict parsed from BuildRunResult JSONPB. May be None.
if build.status == model.BuildStatus.COMPLETED: # pragma: no cover
# Completed builds are immutable.
return False
now = utils.utcnow()
old_status = build.status
build.status = None
build.result = None
build.failure_reason = None
build.cancelation_reason = None
build.result_details = {'swarming': {'bot_dimensions': {}}}
bot_dimensions = build.result_details['swarming']['bot_dimensions']
for d in (task_result or {}).get('bot_dimensions', []):
bot_dimensions[d['key']] = d['value'] # this is a list of values
# error message to include in result_details. Used only if build is complete.
errmsg = ''
terminal_states = {
state = (task_result or {}).get('state')
if build_run_result_error:
build.status = model.BuildStatus.COMPLETED
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
errmsg = '%s returned by the swarming task is bad: %s.' % (
_BUILD_RUN_RESULT_FILENAME, build_run_result_error
elif state is None:
build.status = model.BuildStatus.COMPLETED
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
errmsg = (
'Swarming task %s on %s unexpectedly disappeared' %
(build.swarming_task_id, build.swarming_hostname)
elif state == 'PENDING':
if build.status == model.BuildStatus.STARTED: # pragma: no cover
# Most probably, race between PubSub push handler and Cron job.
# With swarming, a build cannot go from STARTED back to PENDING,
# so ignore this.
return False
build.status = model.BuildStatus.SCHEDULED
elif state == 'RUNNING':
build.status = model.BuildStatus.STARTED
elif state in terminal_states:
build.status = model.BuildStatus.COMPLETED
if state in ('CANCELED', 'KILLED'):
build.result = model.BuildResult.CANCELED
build.cancelation_reason = model.CancelationReason.CANCELED_EXPLICITLY
elif state in ('EXPIRED', 'NO_RESOURCE'):
# Task did not start.
build.result = model.BuildResult.CANCELED
build.cancelation_reason = model.CancelationReason.TIMEOUT
elif state == 'TIMED_OUT':
# Task started, but timed out.
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
elif state == 'BOT_DIED' or task_result.get('internal_failure'):
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
elif build_run_result is None:
# There must be a build_run_result, otherwise it is an infra failure.
build.result = model.BuildResult.FAILURE
build.failure_reason = model.FailureReason.INFRA_FAILURE
elif task_result.get('failure'):
build.result = model.BuildResult.FAILURE
if build_run_result.get('infraFailure'):
build.failure_reason = model.FailureReason.INFRA_FAILURE
build.failure_reason = model.FailureReason.BUILD_FAILURE
assert state == 'COMPLETED'
build.result = model.BuildResult.SUCCESS
else: # pragma: no cover
assert False, 'Unexpected task state: %s' % state
if build.status == old_status: # pragma: no cover
return False
build.status_changed_time = now
'Build %s status: %s -> %s',, old_status, build.status
def ts(key):
v = (task_result or {}).get(key)
return _parse_ts(v) if v else None
if build.status == model.BuildStatus.STARTED:
build.start_time = ts('started_ts') or now
elif build.status == model.BuildStatus.COMPLETED: # pragma: no branch'Build %s result: %s',, build.result)
build.start_time = ts('started_ts') or build.start_time
build.complete_time = ts('completed_ts') or ts('abandoned_ts') or now
if build_run_result:
ann = build_run_result.get('annotations') or {}
build.result_details['ui'] = {
'info': '\n'.join(ann.get('text', [])),
build.result_details['properties'] = _extract_properties(ann)
if errmsg:
build.result_details['error'] = {'message': errmsg}
return True
def _extract_properties(annotation_step):
"""Extracts properties from an annotation step"""
ret = {}
def extract(step):
for p in step.get('property') or ():
ret[p['name']] = json.loads(p['value'])
for s in step.get('substep') or ():
extract(s.get('step') or {})
return ret
def _extract_build_steps(build_run_result):
"""Extracts a list of buildbucket.v2.Step from build_run_result."""
# TODO( remove, accept build steps from kitchen directly.
build_run_result = build_run_result or {}
ann_dict = build_run_result.get('annotations')
ann_url = build_run_result.get('annotationUrl')
if not ann_dict or not ann_url: # pragma: no cover
return []
ann_step = annotations_pb2.Step()
json_format.Parse(json.dumps(ann_dict), ann_step, ignore_unknown_fields=True)
host, project, prefix, _ = logdog.parse_url(ann_url)
parser = annotations.StepParser(
default_logdog_prefix='%s/%s' % (project, prefix),
return parser.parse_substeps(ann_step.substep)
def _sync_build_async(build_id, task_result, bucket_id, builder):
"""Syncs Build entity in the datastore with the swarming task."""
build_run_result = None
build_run_result_error = False
if task_result:
build_run_result, build_run_result_error = yield (
_load_build_run_result_async(task_result, bucket_id, builder)
build_key = ndb.Key(model.Build, build_id)
# TODO(nodir): accept build steps via a separate RPC.
step_container = build_pb2.Build(steps=_extract_build_steps(build_run_result))
step_byte_size = step_container.ByteSize()
step_byte_size / 1000, # convert to Kb
'bucket': bucket_id,
'builder': builder,
too_large = step_byte_size > model.BuildSteps.MAX_STEPS_LEN
build_steps = None
if too_large: # pragma: no cover
# piggy back on the existing error handling mechanism
build_run_result = None
build_run_result_error = (
'build steps are %d bytes which is more than %d' %
(step_byte_size, model.BuildSteps.MAX_STEPS_LEN)
build_run_result_error = _BUILD_RUN_RESULT_TOO_LARGE
# Do not set build_steps.step_container unless we are sure it is under the
# size limit.
build_steps = model.BuildSteps(
def txn_async():
build = yield build_key.get_async()
if not build: # pragma: no cover
raise ndb.Return(None)
made_change = _sync_build_in_memory(
build, task_result, build_run_result, build_run_result_error
if not made_change:
raise ndb.Return(None)
futures = [build.put_async()]
if build.status == model.BuildStatus.STARTED:
elif build.status == model.BuildStatus.COMPLETED: # pragma: no cover
# This code is coverd by tests, but pycover reports coverage incorrectly!
if build_steps:
yield futures
raise ndb.Return(build)
build = yield txn_async()
if build:
if build.status == model.BuildStatus.STARTED:
elif build.status == model.BuildStatus.COMPLETED: # pragma: no branch
class SubNotify(webapp2.RequestHandler):
"""Handles PubSub messages from swarming.
Assumes unprivileged users cannot send requests to this handler.
bad_message = False
def unpack_msg(self, msg):
"""Extracts swarming hostname, creation time, task id and build id from msg.
Aborts if |msg| is malformed.
data_b64 = msg.get('data')
if not data_b64:
self.stop('no message data')
data_json = base64.b64decode(data_b64)
except ValueError as ex: # pragma: no cover
self.stop('cannot decode message data as base64: %s', ex)
data = self.parse_json_obj(data_json, 'message data')
userdata = self.parse_json_obj(data.get('userdata'), 'userdata')
hostname = userdata.get('swarming_hostname')
if not hostname:
self.stop('swarming hostname not found in userdata')
if not isinstance(hostname, basestring):
self.stop('swarming hostname is not a string')
created_ts = userdata.get('created_ts')
if not created_ts:
self.stop('created_ts not found in userdata')
created_time = utils.timestamp_to_datetime(created_ts)
except ValueError as ex:
self.stop('created_ts in userdata is invalid: %s', ex)
build_id = userdata.get('build_id')
if not isinstance(build_id, (int, long)):
self.stop('invalid build_id %r', build_id)
task_id = data.get('task_id')
if not task_id:
self.stop('task_id not found in message data')
return hostname, created_time, task_id, build_id
def post(self):
msg = self.request.json['message']'Received message: %r', msg)
# Try not to process same message more than once.
nc = 'swarming-pubsub-msg-id'
if memcache.get(msg['messageId'], namespace=nc):'seen this message before, ignoring')
memcache.set(msg['messageId'], 1, namespace=nc, time=10 * 60)
def _process_msg(self, msg):
hostname, created_time, task_id, build_id = self.unpack_msg(msg)
task_url = 'https://%s/task?id=%s' % (hostname, task_id)
# Load build.'Build id: %s', build_id)
build = model.Build.get_by_id(build_id)
if not build:
if utils.utcnow() < created_time + datetime.timedelta(minutes=1):
'Build for a swarming task not found yet\nBuild: %s\nTask: %s',
'Build for a swarming task not found\nBuild: %s\nTask: %s', build_id,
# Ensure the loaded build is associated with the task.
if build.swarming_hostname != hostname:
'swarming_hostname %s of build %s does not match %s',
build.swarming_hostname, build_id, hostname
if build.swarming_task_id != task_id:
'swarming_task_id %s of build %s does not match %s',
build.swarming_task_id, build_id, task_id
assert build.parameters
# Update build.
result = _load_task_result_async(hostname, task_id).get_result()
def stop(self, msg, *args, **kwargs):
"""Logs error and stops request processing.
msg: error message
args: format args for msg.
redeliver: True to process this message later.
self.bad_message = True
if args:
msg = msg % args
redeliver = kwargs.get('redeliver')
logging.log(logging.WARNING if redeliver else logging.ERROR, msg)
self.abort(400 if redeliver else 200)
def parse_json_obj(self, text, name):
"""Parses a JSON object from |text| if possible. Otherwise stops."""
result = json.loads(text or '')
if not isinstance(result, dict):
raise ValueError()
return result
except ValueError:
self.stop('%s is not a valid JSON object: %r', name, text)
class CronUpdateBuilds(webapp2.RequestHandler):
"""Updates builds that are associated with swarming tasks."""
def update_build_async(self, build):
result = yield _load_task_result_async(
build.swarming_hostname, build.swarming_task_id
if not result:
'Task %s/%s referenced by build %s is not found',
build.swarming_hostname, build.swarming_task_id,
yield _sync_build_async(, result, build.bucket_id,
def get(self): # pragma: no cover
q = model.Build.query(
model.Build.swarming_task_id != None,
# We cannot have a second negation filter, so use IN.
# This will result in two datastore queries, which is fine.
model.BuildStatus.SCHEDULED, model.BuildStatus.STARTED
def get_backend_routes(): # pragma: no cover
return [
webapp2.Route(r'/internal/cron/swarming/update_builds', CronUpdateBuilds),
webapp2.Route(r'/_ah/push-handlers/swarming/notify', SubNotify),
# Utility functions
def _call_api_async(
"""Calls Swarming API."""
delegation_token = None
if impersonate:
delegation_token = yield user.delegate_async(hostname, delegation_tag)
url = 'https://%s/_ah/api/swarming/v1/%s' % (hostname, path)
res = yield net.json_request_async(
raise ndb.Return(res)
def format_obj(obj, params):
"""Evaluates all strings in a JSON-like object as a template."""
def transform(obj):
if isinstance(obj, list):
return map(transform, obj)
elif isinstance(obj, dict):
return {k: transform(v) for k, v in obj.iteritems()}
elif isinstance(obj, basestring):
return string.Template(obj).safe_substitute(params)
return obj
return transform(obj)
def _parse_ts(ts):
"""Parses Swarming API's timestamp, which is RFC3339 without time zone."""
# time-secfrac part of RFC3339 format is optional
# strptime cannot handle optional parts.
# HACK: add the time-secfrac part if it is missing.
# P(time-secfrac is missing) = 1e-6.
if '.' not in ts:
ts += '.0'
return datetime.datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%f')