# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Access to bucket configurations.
Stores bucket list in datastore, synchronizes it with bucket configs in
project repositories: `projects/<project_id>:<buildbucket-app-id>.cfg`.
import collections
import copy
import hashlib
import logging
import re
from components import utils
from google import protobuf
from google.appengine.api import app_identity
from google.appengine.ext import ndb
from components import auth
from components import config
from components import datastore_utils
from components import gitiles
from components.config import validation
from proto.config import project_config_pb2
from proto.config import service_config_pb2
import errors
ACL_SET_NAME_RE = re.compile('^[a-z0-9_]+$')
def cfg_path():
"""Returns relative buildbucket config file path."""
appid = app_identity.get_application_id()
except AttributeError: # pragma: no cover | does not get run on some bots
# Raised in testbed environment because cfg_path is called
# during decoration.
appid = 'testbed-test'
return '%s.cfg' % appid
def self_config_set():
"""Returns buildbucket's service config set."""
return config.self_config_set()
except AttributeError: # pragma: no cover | does not get run on some bots
# Raised in testbed environment because cfg_path is called
# during decoration.
return 'services/testbed-test'
def validate_identity(identity, ctx):
if ':' in identity:
kind, name = identity.split(':', 2)
kind = 'user'
name = identity
auth.Identity(kind, name)
except ValueError as ex:
ctx.error('%s', ex)
def validate_access_list(acl_list, ctx):
"""Validates a list of Acl messages."""
for i, acl in enumerate(acl_list):
with ctx.prefix('acl #%d: ', i + 1):
if and acl.identity:
ctx.error('either group or identity must be set, not both')
if not auth.is_valid_group_name(
ctx.error('invalid group: %s',
elif acl.identity:
validate_identity(acl.identity, ctx)
ctx.error('group or identity must be set')
@validation.project_config_rule(cfg_path(), project_config_pb2.BuildbucketCfg)
def validate_buildbucket_cfg(cfg, ctx):
from swarming import swarmingcfg
acl_set_names = set()
for i, acl_set in enumerate(cfg.acl_sets):
with ctx.prefix('ACL set #%d (%s): ', i + 1,
if not
ctx.error('name is unspecified')
elif not ACL_SET_NAME_RE.match(
'invalid name "%s" does not match regex %r',,
elif in acl_set_names:
ctx.error('duplicate name "%s"',
validate_access_list(acl_set.acls, ctx)
mixin_ctx = validation.Context( # pragma: no cover
on_message=lambda msg: ctx.msg(msg.severity, '%s', msg.text))
swarmingcfg.validate_builder_mixins(cfg.builder_mixins, mixin_ctx)
mixins_are_valid = not mixin_ctx.result().has_errors
mixin_by_name = { m for m in cfg.builder_mixins}
bucket_names = set()
for i, bucket in enumerate(cfg.buckets):
with ctx.prefix('Bucket %s: ', or ('#%d' % (i + 1))):
errors.validate_bucket_name(, project_id=ctx.project_id)
except errors.InvalidInputError as ex:
ctx.error('invalid name: %s', ex.message)
if in bucket_names:
ctx.error('duplicate bucket name')
if i > 0 and < cfg.buckets[i - 1].name:
ctx.warning('out of order')
validate_access_list(bucket.acls, ctx)
for name in bucket.acl_sets:
if name not in acl_set_names:
'undefined ACL set "%s". '
'It must be defined in the same file', name
if bucket.HasField('swarming'): # pragma: no cover
with ctx.prefix('swarming: '):
bucket.swarming, mixin_by_name, mixins_are_valid, ctx
self_config_set(), 'settings.cfg', service_config_pb2.SettingsCfg
def validate_settings_cfg(cfg, ctx): # pragma: no cover
from swarming import swarmingcfg
if cfg.HasField('swarming'):
with ctx.prefix('swarming: '):
swarmingcfg.validate_service_cfg(cfg.swarming, ctx)
# TODO( delete LegacyBucket in favor of Bucket.
class LegacyBucket(ndb.Model):
"""DEPRECATED. Stores project a bucket belongs to, and its ACLs.
For historical reasons, some bucket names must match Chromium Buildbot master
names, therefore they may not contain project id. Consequently, it is
impossible to retrieve a project id from bucket name without an additional
{bucket_name -> project_id} map. This entity kind is used to store the mapping
and a copy of a bucket config retrieved from luci-config.
By storing this mapping, we reserve bucket names for projects. If project X
is trying to use a bucket name already being used by project Y, the
config of projct X is considered invalid.
Bucket entities are updated in cron_update_buckets() from project configs.
Entity key:
Root entity. Id is bucket name.
def _get_kind(cls):
return 'Bucket'
# Version of entity schema. If not current, cron_update_buckets will update
# the entity forcefully.
entity_schema_version = ndb.IntegerProperty()
# Project id in luci-config.
project_id = ndb.StringProperty(required=True)
# Bucket revision matches its config revision.
revision = ndb.StringProperty(required=True)
# Bucket configuration (Bucket message in project_config.proto),
# copied verbatim from luci-config for get_bucket API.
# Must not be used in by serving code paths, use config_content_binary
# instead.
config_content = ndb.TextProperty(required=True)
# Binary equivalent of config_content.
config_content_binary = ndb.BlobProperty(required=True)
class Project(ndb.Model):
"""Parent entity for Bucket.
Does not exist in the datastore.
Entity key:
Root entity. ID is project id.
def is_legacy_bucket_id(bucket_id):
return '/' not in bucket_id
def format_bucket_id(project_id, bucket_name): # pragma: no cover
"""Returns a bucket id string."""
return '%s/%s' % (project_id, bucket_name)
def parse_bucket_id(bucket_id):
"""Returns a (project_id, bucket_name) tuple."""
parts = bucket_id.split('/', 1)
assert len(parts) == 2
return tuple(parts)
def validate_project_id(project_id):
"""Raises errors.InvalidInputError if project_id is invalid."""
if not validation.is_valid_project_id(project_id):
raise errors.InvalidInputError('invalid project_id %r' % project_id)
validate_bucket_name = errors.validate_bucket_name
def validate_bucket_id(bucket_id):
"""Raises errors.InvalidInputError if bucket_id is invalid."""
assert not is_legacy_bucket_id(bucket_id)
project_id, bucket_name = parse_bucket_id(bucket_id)
except errors.InvalidInputError as ex:
raise errors.InvalidInputError('invalid bucket_id %r: %s' % (bucket_id, ex))
parts = bucket_name.split('.', 2)
if len(parts) == 3 and parts[0] == 'luci' and parts[1] == project_id:
expected_bucket_id = '%s/%s' % (project_id, parts[2])
raise errors.InvalidInputError(
'invalid bucket_id string %r. Did you mean %r?' %
(bucket_id, expected_bucket_id)
class Bucket(ndb.Model):
"""Stores bucket configurations.
Bucket entities are updated in cron_update_buckets() from project configs.
Entity key:
Parent is Project. Id is a "short" bucket name.
See also bucket_name attribute and short_bucket_name().
def _get_kind(cls):
return 'BucketV2'
# Bucket name not prefixed by project id.
# For example "try" or "master.x".
# If a bucket in a config file has "luci.<project_id>." prefix, the
# prefix is stripped, e.g. "try", not "luci.chromium.try".
bucket_name = ndb.ComputedProperty(lambda self:
# Version of entity schema. If not current, cron_update_buckets will update
# the entity forcefully.
entity_schema_version = ndb.IntegerProperty()
# Bucket revision matches its config revision.
revision = ndb.StringProperty(required=True)
# Binary equivalent of config_content.
config = datastore_utils.ProtobufProperty(project_config_pb2.Bucket)
def _pre_put_hook(self):
assert ==
def project_id(self):
return self.key.parent().id()
def make_key(project_id, bucket_name):
return ndb.Key(Project, project_id, Bucket, bucket_name)
def bucket_id(self):
return format_bucket_id(self.project_id, self.bucket_name)
def short_bucket_name(bucket_name):
"""Returns bucket name without "luci.<project_id>." prefix."""
parts = bucket_name.split('.', 2)
if len(parts) == 3 and parts[0] == 'luci':
return parts[2]
return bucket_name
# TODO( remove.
def parse_binary_bucket_config(cfg_bytes): # pragma: no cover
cfg = project_config_pb2.Bucket()
return cfg
def is_swarming_config(cfg):
"""Returns True if this is a Swarming bucket config."""
return cfg and cfg.HasField('swarming')
def get_buckets_async(bucket_ids=None):
"""Returns configured buckets.
If bucket_ids is None, returns all buckets.
Otherwise returns only specified buckets.
If a bucket does not exist, returns a None map value.
{bucket_id: project_config_pb2.Bucket} dict.
if bucket_ids is not None:
bucket_ids = list(bucket_ids)
keys = [Bucket.make_key(*parse_bucket_id(bid)) for bid in bucket_ids]
buckets = yield ndb.get_multi_async(keys)
raise ndb.Return({
bid: b.config if b else None for bid, b in zip(bucket_ids, buckets)
buckets = yield Bucket.query().fetch_async()
raise ndb.Return({b.bucket_id: b.config for b in buckets})
'resolve_bucket_name_async', key_args=['bucket_name'], time=300
) # memcache for 5m
def resolve_bucket_name_async(bucket_name):
"""Returns bucket id for the bucket name.
Does not check access.
errors.InvalidInputError if the bucket name is not unique.
bucket id string or None if such bucket does not exist.
buckets = yield Bucket.query(Bucket.bucket_name == bucket_name).fetch_async()
if len(buckets) > 1:
raise errors.InvalidInputError(
'bucket name %r is ambiguous, '
'it has to be prefixed with a project id: "<project_id>/%s"' %
(bucket_name, bucket_name)
raise ndb.Return(buckets[0].bucket_id if buckets else None)
def get_bucket_async(bucket_id):
"""Returns a (project_id, project_config_pb2.Bucket) tuple."""
if is_legacy_bucket_id(bucket_id): # pragma: no cover
# Legacy mode. TODO( remove.
bucket = yield LegacyBucket.get_by_id_async(bucket_id)
if bucket is None:
raise ndb.Return(None, None)
raise ndb.Return(
key = Bucket.make_key(*parse_bucket_id(bucket_id))
bucket = yield key.get_async()
if bucket is None:
raise ndb.Return(None, None)
# TODO( replace returned project_id with rev,
# when legacy mode is dropped.
# The caller knows project id.
raise ndb.Return(bucket.project_id, bucket.config)
def get_bucket(bucket_id):
"""Returns a (project_id, project_config_pb2.Bucket) tuple."""
return get_bucket_async(bucket_id).get_result()
def _normalize_acls(acls):
"""Normalizes a RepeatedCompositeContainer of Acl messages."""
for a in acls:
if a.identity and ':' not in a.identity:
a.identity = 'user:%s' % a.identity
sort_key = lambda a: (a.role,, a.identity)
for i in xrange(len(acls) - 1, 0, -1):
if sort_key(acls[i]) == sort_key(acls[i - 1]):
del acls[i]
def put_bucket(project_id, revision, bucket_cfg):
legacy_bucket = LegacyBucket(,
# New Bucket format uses short bucket names, e.g. "try" instead of
# "luci.chromium.try".
# Use short name in both entity key and config contents.
short_bucket_cfg = copy.deepcopy(bucket_cfg) = short_bucket_name(
bucket = Bucket(
ndb.put_multi([bucket, legacy_bucket])
def cron_update_buckets():
"""Synchronizes bucket entities with configs fetched from luci-config.
When storing in the datastore, inlines the referenced ACL sets and clears
the acl_sets message field. Also inlines swarmbucket builder defaults and
mixins and clears Builder.mixins field.
from swarming import flatten_swarmingcfg
from swarming import swarmingcfg
config_map = config.get_project_configs(
cfg_path(), project_config_pb2.BuildbucketCfg
to_delete = collections.defaultdict(set) # project_id -> ndb keys
for bucket in LegacyBucket.query().fetch():
for key in Bucket.query().fetch(keys_only=True):
for project_id, (revision, project_cfg, _) in config_map.iteritems():
if project_cfg is None:
logging.error('config of project %s is broken', project_id)
# Do not delete all buckets of a broken project.
to_delete.pop(project_id, None)
# revision is None in file-system mode. Use SHA1 of the config as revision.
revision = revision or 'sha1:%s' % hashlib.sha1(
acl_sets_by_name = { a for a in project_cfg.acl_sets}
builder_mixins_by_name = { m for m in project_cfg.builder_mixins}
for bucket_cfg in project_cfg.buckets:
bucket_key = Bucket.make_key(
project_id, short_bucket_name(
bucket = bucket_key.get()
if (bucket and
bucket.entity_schema_version == CURRENT_BUCKET_SCHEMA_VERSION and
bucket.revision == revision):
# Inline ACL sets.
for name in bucket_cfg.acl_sets:
acl_set = acl_sets_by_name.get(name)
if not acl_set:
'referenced acl_set not found.\n'
'Bucket: %s\n'
'ACL set name: %r\n'
'Config revision: %r', bucket_key, name, revision
del bucket_cfg.acl_sets[:]
if bucket_cfg.HasField('swarming'):
# Pull builder defaults out and apply default pool.
defaults = bucket_cfg.swarming.builder_defaults
if not any(d.startswith('pool:') for d in defaults.dimensions):
# TODO( make it "luci.<project>.<bucket name>".
defaults.dimensions.append('pool:' +
for b in
b, defaults, builder_mixins_by_name
# pylint: disable=no-value-for-parameter
def update_bucket():
bucket = bucket_key.get()
if (bucket and
bucket.entity_schema_version == CURRENT_BUCKET_SCHEMA_VERSION and
bucket.revision == revision): # pragma: no coverage
put_bucket(project_id, revision, bucket_cfg)'Updated bucket %s to revision %s', bucket_key, revision)
# Delete non-existing buckets.
to_delete_flat = sum([list(n) for n in to_delete.itervalues()], [])
if to_delete_flat:
logging.warning('Deleting buckets: %s', ', '.join(map(str, to_delete_flat)))
def get_buildbucket_cfg_url(project_id):
"""Returns URL of a buildbucket config file in a project, or None."""
config_url = config.get_config_set_location('projects/%s' % project_id)
if config_url is None: # pragma: no cover
return None
loc = gitiles.Location.parse(config_url)
except ValueError: # pragma: no cover
'Not a valid Gitiles URL %r of project %s', config_url, project_id
return None
return str(loc.join(cfg_path()))
def get_settings_async(): # pragma: no cover
_, global_settings = yield config.get_self_config_async(
'settings.cfg', service_config_pb2.SettingsCfg, store_last_good=True
raise ndb.Return(global_settings or service_config_pb2.SettingsCfg())