| # Copyright 2017 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """Functions to fetch and parse pools.cfg file with list of pools. |
| |
| See comments in proto/pools.proto for more context. Structures defined here are |
| used primarily by task_scheduler.check_schedule_request_acl_caller. |
| """ |
| |
| import collections |
| import logging |
| import random |
| import re |
| |
| from components import auth |
| from components import config |
| from components import cipd |
| from components import utils |
| from components.config import validation |
| |
| from proto.config import pools_pb2 |
| from server import config as local_config |
| from server import directory_occlusion |
| |
| |
| POOLS_CFG_FILENAME = 'pools.cfg' |
| |
| NAMESPACE_RE = re.compile(r'^[a-z0-9A-Z\-._]+$') |
| |
| |
| # Validated read-only representation of one pool. |
| PoolConfig = collections.namedtuple( |
| 'PoolConfig', |
| [ |
| # Name of the pool. |
| 'name', |
| # Revision of pools.cfg file this config came from. |
| 'rev', |
| # Set of auth.Identity that can schedule jobs in the pool. |
| 'scheduling_users', |
| # Set of group names with users that can schedule jobs in the pool. |
| 'scheduling_groups', |
| # Map {auth.Identity of a delegatee => TrustedDelegatee tuple}. |
| 'trusted_delegatees', |
| # Pool realm. |
| 'realm', |
| # Set of enforced realm permission enums. |
| 'enforced_realm_permissions', |
| # Realm to assign to tasks if they don't have any. |
| 'default_task_realm', |
| # resolved TaskTemplateDeployment (optional). |
| 'task_template_deployment', |
| # resolved BotMonitoring. |
| 'bot_monitoring', |
| # Tuple of ExternalSchedulerConfigs for this pool, if defined (or None). |
| 'external_schedulers', |
| # resolved default CipdServer |
| 'default_cipd', |
| # Controls RBE migration parameters, pools_pb2.Pool.RBEMigration. |
| 'rbe_migration', |
| # Controls scheduling algorithm, pools_pb2.Pool.SchedulingAlgorithm. |
| 'scheduling_algorithm', |
| ]) |
| |
| |
| def init_pool_config(**kwargs): |
| """Initializees PoolConfig with given arguments.""" |
| args = { |
| 'name': None, |
| 'rev': None, |
| 'scheduling_users': frozenset(), |
| 'scheduling_groups': frozenset(), |
| 'trusted_delegatees': {}, |
| 'realm': None, |
| 'enforced_realm_permissions': frozenset(), |
| 'default_task_realm': None, |
| 'task_template_deployment': None, |
| 'bot_monitoring': None, |
| 'external_schedulers': None, |
| 'default_cipd': None, |
| 'rbe_migration': None, |
| 'scheduling_algorithm': None, |
| } |
| args.update(kwargs) |
| return PoolConfig(**args) |
| |
| |
| CipdServer = collections.namedtuple('CipdServer', [ |
| 'server', |
| 'package_name', |
| 'client_version', |
| ]) |
| |
| # Validated read-only fields of one trusted delegation scenario. |
| TrustedDelegatee = collections.namedtuple( |
| 'TrustedDelegatee', |
| [ |
| # auth.Identity of the delegatee |
| # (the one who's minting the delegation token). |
| 'peer_id', |
| # A set of tags to look for in the delegation token to allow |
| # the delegation. |
| 'required_delegation_tags', |
| ]) |
| |
| # Read-only hashable representation of a single ExternalSchedulerConfig |
| ExternalSchedulerConfig = collections.namedtuple( |
| 'ExternalScheduler', |
| [ |
| # Service address. |
| 'address', |
| # Scheduler ID (opaque to swarming). |
| 'id', |
| # Dimension set in ['key1:value1', 'key2:value2'] format. |
| # |
| # To-be-deprecated. |
| # |
| # Only 1 of this and 'all_dimensions' should be specified. |
| 'dimensions', |
| # Bot or task should have all of these dimensions in order to be |
| # eligible for scheduler. |
| # |
| # Dimensions should be in 'key1:value1' format. |
| # |
| # Only 1 of this and 'dimensions' should be specified. |
| 'all_dimensions', |
| # If non-empty, bot or task should have any of these dimensions |
| # in order to be eligible for scheduler. |
| 'any_dimensions', |
| # Whether this config is enabled. |
| 'enabled', |
| # Whether to allow fall back to other es-owned tasks if external |
| # scheduler has no tasks for a bot. |
| 'allow_es_fallback', |
| ]) |
| |
| # Describes how task templates apply to a pool. |
| _TaskTemplateDeployment = collections.namedtuple( |
| '_TaskTemplateDeployment', |
| [ |
| # The TaskTemplate for prod builds (optional). |
| 'prod', |
| # The TaskTemplate for canary builds (optional). |
| 'canary', |
| # The chance (int [0, 9999]) of the time that the canary template should |
| # be selected. Must be 0 if no canary is specified. |
| # |
| # NOTE: some tests set this to >9999 in order to force canary selection |
| # without mocking randomint; in the live server |
| # TaskTemplateDeployment.from_pb prevents this from occurring. |
| 'canary_chance', |
| ]) |
| |
| |
| class TaskTemplateDeployment(_TaskTemplateDeployment): |
| |
| @classmethod |
| def from_pb(cls, ctx, d, template_map): |
| """This returns a TaskTemplateDeployment from `d` and `template_map`. |
| |
| Args: |
| ctx (validation.Context) - The validation context. |
| d (pools_pb2.TaskTemplateDeployment) - The proto TaskTemplateDeployment |
| message to convert. |
| template_map (dict[str,TaskTemplate]) - This map should map from any |
| potential include statements to a resolved TaskTemplate instance. |
| |
| Returns a TaskTemplateDeployment object (this class), None if there were |
| errors. |
| """ |
| assert isinstance(ctx, validation.Context) |
| assert isinstance(d, pools_pb2.TaskTemplateDeployment) |
| assert isinstance(template_map, dict) |
| |
| prod = None |
| if d.HasField('prod'): |
| prod = TaskTemplate.from_pb(ctx, d.prod, template_map) |
| |
| canary = None |
| if d.HasField('canary'): |
| canary = TaskTemplate.from_pb(ctx, d.canary, template_map) |
| |
| if not (0 <= d.canary_chance <= 9999): |
| ctx.error('canary_chance out of range `[0,9999]`: %d -> %%%.2f', |
| d.canary_chance, d.canary_chance / 100.) |
| elif d.canary_chance and not canary: |
| ctx.error('canary_chance specified without a canary') |
| |
| return None if ctx.result().has_errors else cls( |
| prod=prod, |
| canary=canary, |
| canary_chance=d.canary_chance, |
| ) |
| |
| |
| # A set of default task parameters to apply to tasks issued within a pool. |
| _TaskTemplate = collections.namedtuple( |
| '_TaskTemplate', |
| [ |
| # sequence of CacheEntry. |
| 'cache', |
| # sequence of CipdPackage. |
| 'cipd_package', |
| # sequence of Env. |
| 'env', |
| |
| # An internal frozenset<str> of the transitive inclusions that went into |
| # the creation of this _TaskTemplate. Users outside of this file should |
| # ignore this field. |
| 'inclusions', |
| ]) |
| |
| |
| def _singleton(name): |
| """Returns a singleton object which represents itself as `name` when printed, |
| but is only comparable (via identity) to itself.""" |
| return type(name, (), {'__repr__': lambda self: name})() |
| |
| |
| class TaskTemplate(_TaskTemplate): |
| CYCLE = _singleton('CYCLE') |
| |
| class _Intermediate(object): |
| """_Intermediate represents an in-flux TaskTemplate instance. |
| |
| This is used internally by the .from_pb method to build up a finalized |
| TaskTemplate instance. |
| """ |
| def __init__(self, ctx, t): |
| assert isinstance(t, pools_pb2.TaskTemplate) |
| |
| self.cache = {} |
| for i, ce in enumerate(t.cache): |
| name, path = ce.name, ce.path |
| with ctx.prefix('cache[%r]: ', name if name else i): |
| if not name: |
| ctx.error('empty name') |
| if not path: |
| ctx.error('empty path') |
| self.cache[name] = path |
| |
| self.cipd_package = {} |
| for i, cp in enumerate(t.cipd_package): |
| path, pkg, version = cp.path, cp.pkg, cp.version |
| # We do this little dance to avoid having u'str' show up in the error |
| # messages. |
| fmt, args = 'cipd_package[%r]: ', (i,) |
| if path and pkg: |
| fmt, args = 'cipd_package[(%r, %r)]: ', (path, pkg) |
| with ctx.prefix(fmt, *args): |
| if not pkg: |
| ctx.error('empty pkg') |
| if not version: |
| ctx.error('empty version') |
| self.cipd_package[(path, pkg)] = version |
| |
| self.env = {} |
| for i, env in enumerate(t.env): |
| var, value, prefix, soft = env.var, env.value, env.prefix, env.soft |
| with ctx.prefix('env[%r]: ', var if var else i): |
| if not var: |
| ctx.error('empty var') |
| if not value and not prefix: |
| ctx.error('empty value AND prefix') |
| self.env[var] = (value, tuple(prefix), soft) |
| |
| # We don't need to initialize this here, update() will adjust this as it |
| # processes includes. |
| self.inclusions = set() |
| |
| def update(self, ctx, other, include_name): |
| """Updates this _Intermediate with the other (TaskTemplate) |
| |
| If this update is due to an inclusion in other, set include_name (str) to |
| that inclusion name; the name will be added to the `inclusions` field, and |
| an error check will prevent the same item from being included more than |
| once. Otherwise set this to None. |
| """ |
| assert isinstance(other, TaskTemplate) |
| |
| if include_name: |
| if include_name in self.inclusions: |
| ctx.error( |
| 'template %r included multiple times', |
| include_name) |
| self.inclusions.add(include_name) |
| |
| for transitive_include in other.inclusions: |
| if transitive_include in self.inclusions: |
| ctx.error('template %r included (transitively) multiple times', |
| transitive_include) |
| self.inclusions.add(transitive_include) |
| |
| for entry in other.cache: |
| self.cache[entry.name] = entry.path |
| |
| for entry in other.cipd_package: |
| self.cipd_package[(entry.path, entry.pkg)] = entry.version |
| |
| for entry in other.env: |
| val, pfx = '', () |
| if entry.var in self.env: |
| val, pfx, _ = self.env[entry.var] |
| self.env[entry.var] = (entry.value or val, (pfx + entry.prefix), |
| entry.soft) |
| |
| def finalize(self, ctx): |
| doc = directory_occlusion.Checker() |
| for (path, pkg), version in self.cipd_package.items(): |
| # all cipd packages are considered compatible in terms of paths: it's |
| # totally legit to install many packages in the same directory. Thus we |
| # set the owner for all cipd packages to 'cipd'. |
| doc.add(path, 'cipd', '%s:%s' % (pkg, version)) |
| |
| for name, path in self.cache.items(): |
| # caches are all unique; they can't overlap. Thus, we give each of them |
| # a unique (via the cache name) owner. |
| doc.add(path, 'cache %r' % name, '') |
| |
| if doc.conflicts(ctx): |
| return |
| |
| return TaskTemplate( |
| cache=tuple( |
| CacheEntry(name, path) |
| for name, path in sorted(self.cache.items())), |
| cipd_package=tuple( |
| CipdPackage(path, pkg, version) |
| for (path, pkg), version in sorted(self.cipd_package.items())), |
| env=tuple( |
| Env(var, value, prefix, soft) |
| for var, (value, prefix, soft) in sorted(self.env.items())), |
| inclusions=frozenset(self.inclusions), |
| ) |
| |
| @classmethod |
| def from_pb(cls, ctx, t, resolve_func=lambda _: None): |
| """This returns a TaskTemplate from `t` and `resolve_func`. |
| |
| Args: |
| ctx (validation.Context) - The validation context. |
| t (pools_pb2.TaskTemplate) - The proto TaskTemplate message to convert. |
| resolve_func (func(include_name) -> TaskTemplate) - A function which, |
| given `include_name` returns the resolved TaskTemplate object. |
| If the resolved TaskTemplate has errors, it should return None. |
| If an include cycle was detected, it should return TaskTemplate.CYCLE. |
| If the include_name is not resolvable, it should raise KeyError. |
| As a convenience, resolve_func may also be a dict, and an appropriate |
| resolution function will be generated for it (dict.__getitem__). |
| |
| Returns a TaskTemplate object (this class), None if there were errors. If |
| any of the included object results in a CYCLE, this returns CYCLE. |
| """ |
| assert isinstance(t, pools_pb2.TaskTemplate) |
| if isinstance(resolve_func, dict): |
| resolve_func = resolve_func.__getitem__ |
| |
| tmp = cls._Intermediate(ctx, pools_pb2.TaskTemplate()) |
| |
| found_cycle = False |
| for include in t.include: |
| try: |
| resolved = resolve_func(include) |
| if isinstance(resolved, TaskTemplate): |
| tmp.update(ctx, resolved, include) |
| elif resolved is None: |
| ctx.error('depends on %r, which has errors', include) |
| elif resolved is cls.CYCLE: |
| found_cycle = True |
| ctx.error('depends on %r, which causes an import cycle', include) |
| else: |
| assert False, ( |
| 'resolve_func returned a bad type: %s: %r' % ( |
| type(resolved), resolved)) |
| except KeyError: |
| ctx.error('unknown include: %r', include) |
| |
| tmp.update(ctx, cls._Intermediate(ctx, t).finalize(ctx), None) |
| |
| # Evaluate this here so that ctx can contain all errors before returning. |
| ret = tmp.finalize(ctx) |
| |
| if found_cycle: |
| return cls.CYCLE |
| return None if ctx.result().has_errors else ret |
| |
| |
| CacheEntry = collections.namedtuple('CacheEntry', ['name', 'path']) |
| CipdPackage = collections.namedtuple('CipdPackage', ['path', 'pkg', 'version']) |
| Env = collections.namedtuple('Env', ['var', 'value', 'prefix', 'soft']) |
| |
| |
| def get_pool_config(pool_name): |
| """Returns PoolConfig for the given pool or None if not defined.""" |
| if pool_name is None: |
| raise ValueError('get_pool_config called with None') |
| return _fetch_pools_config().pools.get(pool_name) |
| |
| |
| def known(): |
| """Returns the list of all pool names.""" |
| return sorted(_fetch_pools_config().pools) |
| |
| |
| def all_pools_migrated_to_rbe(): |
| """Returns True if all pools have been migrated to RBE.""" |
| pools = _fetch_pools_config().pools |
| if not pools: |
| return False # just to simplify tests that don't setup any pool configs |
| for pool in pools.values(): |
| cfg = pool.rbe_migration |
| if not cfg or cfg.rbe_mode_percent != 100: |
| return False |
| return True |
| |
| |
| ### Private stuff. |
| |
| |
| # Used only on dev server as an ultimate fallback to enable local_smoke_testing |
| # to work. |
| _LOCAL_FAKE_CONFIG = None |
| |
| # Parsed representation of pools.cfg ready for queries. |
| _PoolsCfg = collections.namedtuple('_PoolsCfg', [ |
| 'pools', # dict {pool name => PoolConfig tuple} |
| |
| 'default_external_services', # (CipdServer) |
| ]) |
| |
| |
| def _resolve_task_template_inclusions(ctx, task_templates): |
| """Resolves all task template inclusions in the provided |
| pools_pb2.TaskTemplate list. |
| |
| Returns a new dictionary with {name -> TaskTemplate} namedtuples. |
| """ |
| template_map = {t.name: t for t in task_templates} |
| if '' in template_map: |
| ctx.error('one or more templates has a blank name') |
| return |
| |
| if len(template_map) != len(task_templates): |
| ctx.error('one or more templates has a duplicate name') |
| return |
| |
| resolved = {} # name -> TaskTemplate | None | TaskTemplate.CYCLE |
| def resolve(name): |
| # Let this raise KeyError if not found |
| template = template_map[name] |
| |
| if name not in resolved: |
| # Set this entry to CYCLE in case we're asked to resolve ourselves. |
| resolved[name] = TaskTemplate.CYCLE |
| with ctx.prefix('template[%r]: ', name): |
| resolved[name] = TaskTemplate.from_pb(ctx, template, resolve) |
| return resolved[name] |
| |
| map(resolve, sorted(template_map)) |
| |
| return resolved |
| |
| |
| def _resolve_task_template_deployments( |
| ctx, template_map, task_template_deployments): |
| ret = {} |
| |
| for i, deployment in enumerate(task_template_deployments): |
| if not deployment.name: |
| ctx.error('deployment[%d]: has no name', i) |
| return |
| with ctx.prefix('deployment[%r]: ', deployment.name): |
| ret[deployment.name] = TaskTemplateDeployment.from_pb( |
| ctx, deployment, template_map) |
| |
| return ret |
| |
| |
| def _resolve_deployment( |
| ctx, pool_msg, template_map, deployment_map): |
| deployment_scheme = pool_msg.WhichOneof('task_deployment_scheme') |
| if deployment_scheme == 'task_template_deployment': |
| if pool_msg.task_template_deployment not in deployment_map: |
| ctx.error('unknown deployment: %r', pool_msg.task_template_deployment) |
| return |
| return deployment_map[pool_msg.task_template_deployment] |
| |
| if deployment_scheme == 'task_template_deployment_inline': |
| return TaskTemplateDeployment.from_pb( |
| ctx, pool_msg.task_template_deployment_inline, template_map) |
| |
| |
| def _resolve_bot_monitoring(ctx, bot_monitorings): |
| """Validates and simplifies bot_monitoring entries in pools.cfg.""" |
| out = {} |
| for m in bot_monitorings: |
| with ctx.prefix('bot_monitoring %r: ', m.name): |
| # Use the same rules for the name as for the dimensions for simplicity |
| # here. |
| if not local_config.validate_dimension_key(m.name): |
| ctx.error('invalid name') |
| if m.name in out: |
| ctx.error('duplicate name') |
| keys = set(m.dimension_key) |
| if len(keys) != len(m.dimension_key): |
| ctx.error('duplicate dimension_key') |
| for k in keys: |
| if not local_config.validate_dimension_key(k): |
| ctx.error('invalid dimension_key %r', k) |
| # pool is always implicit. |
| keys.add('pool') |
| out[m.name] = sorted(keys) |
| return out |
| |
| |
| def _resolve_external_schedulers(external_schedulers): |
| """Turns external_schedulers into a hashable representation.""" |
| return tuple( |
| ExternalSchedulerConfig( |
| e.address, e.id, frozenset(e.dimensions), frozenset(e.all_dimensions), |
| frozenset(e.any_dimensions), e.enabled, e.allow_es_fallback) |
| for e in external_schedulers) |
| |
| |
| def _to_ident(s): |
| if ':' not in s: |
| s = 'user:' + s |
| return auth.Identity.from_bytes(s) |
| |
| |
| def _validate_ident(ctx, title, s): |
| try: |
| return _to_ident(s) |
| except ValueError as exc: |
| ctx.error('bad %s value "%s" - %s', title, s, exc) |
| |
| |
| def _validate_realm(ctx, title, s): |
| try: |
| auth.validate_realm_name(str(s)) |
| except ValueError as exc: |
| ctx.error('bad %s value: %s', title, exc) |
| |
| |
| def _validate_url(ctx, value): |
| if not value: |
| ctx.error('is not set') |
| elif not validation.is_valid_secure_url(value): |
| ctx.error('must start with "https://" or "http://localhost"') |
| |
| |
| def _validate_external_services_cipd(ctx, cfg): |
| """Validates ExternalServices.CIPD message.""" |
| with ctx.prefix('cipd '): |
| with ctx.prefix('server '): |
| _validate_url(ctx, cfg.server) |
| |
| with ctx.prefix('client_package '): |
| if not cipd.is_valid_package_name_template( |
| cfg.client_package.package_name): |
| ctx.error('is invalid "%s"', cfg.client_package.package_name) |
| |
| with ctx.prefix('client_version '): |
| if not cipd.is_valid_version(cfg.client_package.version): |
| ctx.error('is invalid "%s"', cfg.client_package.version) |
| |
| |
| def _validate_external_services(ctx, cfg): |
| """Validates an ExternalServices message""" |
| _validate_external_services_cipd(ctx, cfg.cipd) |
| |
| |
| def _vaildate_rbe_migration(ctx, msg): |
| """Validates pools_pb2.Pool.RBEMigration.""" |
| BotMode = pools_pb2.Pool.RBEMigration.BotModeAllocation.BotMode |
| |
| if not msg.rbe_instance: |
| ctx.error('rbe_instance is required') |
| if not (0 <= msg.rbe_mode_percent <= 100): |
| ctx.error('rbe_mode_percent should be in [0; 100]') |
| |
| allocs = {} |
| for i, alloc in enumerate(msg.bot_mode_allocation): |
| with ctx.prefix('bot_mode_allocation #%d:', i): |
| if alloc.mode == BotMode.UNKNOWN: |
| ctx.error('mode is require') |
| continue |
| if alloc.mode in allocs: |
| ctx.error('allocation for mode %s was already defined', alloc.mode) |
| continue |
| if not (0 <= alloc.percent <= 100): |
| ctx.error('percent should be in [0; 100]') |
| continue |
| allocs[alloc.mode] = alloc.percent |
| |
| total = (allocs.get(BotMode.SWARMING, 0) + allocs.get(BotMode.HYBRID, 0) + |
| allocs.get(BotMode.RBE, 0)) |
| if total != 100: |
| ctx.error('bot_mode_allocation percents should sum up to 100') |
| |
| |
| @utils.cache_with_expiration(60) |
| def _fetch_pools_config(): |
| """Loads pools.cfg and parses it into a _PoolsCfg instance.""" |
| # store_last_good=True tells config components to update the config file |
| # in a cron job. Here we just read from the datastore. In case it's the first |
| # call ever, or config doesn't exist, it returns (None, None). |
| rev, cfg = config.get_self_config( |
| POOLS_CFG_FILENAME, pools_pb2.PoolsCfg, store_last_good=True) |
| if not cfg: |
| if _LOCAL_FAKE_CONFIG: |
| assert utils.is_local_dev_server() |
| return _LOCAL_FAKE_CONFIG |
| logging.error('There is no pools.cfg, no task is accepted') |
| return _PoolsCfg({}, (None, None)) |
| |
| # The config is already validated at this point. |
| |
| ctx = validation.Context.logging() |
| template_map = _resolve_task_template_inclusions(ctx, cfg.task_template) |
| deployment_map = _resolve_task_template_deployments( |
| ctx, template_map, cfg.task_template_deployment) |
| bot_monitorings = _resolve_bot_monitoring(ctx, cfg.bot_monitoring) |
| |
| default_cipd = None |
| if cfg.HasField('default_external_services'): |
| ext = cfg.default_external_services |
| default_cipd = CipdServer( |
| ext.cipd.server, |
| ext.cipd.client_package.package_name, |
| ext.cipd.client_package.version) |
| |
| pools = {} |
| for msg in cfg.pool: |
| for name in msg.name: |
| pools[name] = init_pool_config( |
| name=name, |
| rev=rev, |
| scheduling_users=frozenset(_to_ident(u) for u in msg.schedulers.user), |
| scheduling_groups=frozenset(msg.schedulers.group), |
| trusted_delegatees={ |
| _to_ident(d.peer_id): TrustedDelegatee( |
| peer_id=_to_ident(d.peer_id), |
| required_delegation_tags=frozenset(d.require_any_of.tag)) |
| for d in msg.schedulers.trusted_delegation |
| }, |
| realm=msg.realm if msg.realm else None, |
| default_task_realm=(msg.default_task_realm |
| if msg.default_task_realm else None), |
| enforced_realm_permissions=frozenset(msg.enforced_realm_permissions), |
| task_template_deployment=_resolve_deployment(ctx, msg, template_map, |
| deployment_map), |
| bot_monitoring=bot_monitorings.get(name), |
| external_schedulers=_resolve_external_schedulers( |
| msg.external_schedulers), |
| default_cipd=default_cipd, |
| rbe_migration=(msg.rbe_migration |
| if msg.HasField('rbe_migration') else None), |
| scheduling_algorithm=(msg.scheduling_algorithm |
| or pools_pb2.Pool.SCHEDULING_ALGORITHM_UNKNOWN), |
| ) |
| return _PoolsCfg(pools, (default_cipd)) |
| |
| |
| @validation.self_rule(POOLS_CFG_FILENAME, pools_pb2.PoolsCfg) |
| def _validate_pools_cfg(cfg, ctx): |
| """Validates pools.cfg file.""" |
| |
| template_map = _resolve_task_template_inclusions( |
| ctx, cfg.task_template) |
| deployment_map = _resolve_task_template_deployments( |
| ctx, template_map, cfg.task_template_deployment) |
| bot_monitorings = _resolve_bot_monitoring(ctx, cfg.bot_monitoring) |
| bot_monitoring_unreferred = set(bot_monitorings) |
| |
| # Currently optional |
| if cfg.HasField("default_external_services"): |
| _validate_external_services(ctx, cfg.default_external_services) |
| |
| pools = set() |
| for i, msg in enumerate(cfg.pool): |
| with ctx.prefix('pool #%d (%s): ', i, '|'.join(msg.name) or 'unnamed'): |
| # Validate names. |
| if not msg.name: |
| ctx.error('at least one pool name must be given') |
| for name in msg.name: |
| if not local_config.validate_dimension_value(name): |
| ctx.error('bad pool name "%s", not a valid dimension value', name) |
| elif name in pools: |
| ctx.error('pool "%s" was already declared', name) |
| else: |
| pools.add(name) |
| |
| # Validate realm names. They are optional for now. |
| if msg.realm: |
| _validate_realm(ctx, 'realm', msg.realm) |
| if msg.default_task_realm: |
| _validate_realm(ctx, 'default_task_realm', msg.default_task_realm) |
| |
| # TODO(crbug.com/1066839): Validate enforced_realm_permissions. |
| # The following permissions must be enforced at the same time. |
| # REALM_PERMISSION_TASKS_CREATE_IN_REALM |
| # REALM_PERMISSION_POOLS_CREATE_TASK |
| # REALM_PERMISSION_TASKS_ACT_AS |
| |
| # Validate schedulers.user. |
| for u in msg.schedulers.user: |
| _validate_ident(ctx, 'user', u) |
| |
| # Validate schedulers.group. |
| for g in msg.schedulers.group: |
| if not auth.is_valid_group_name(g): |
| ctx.error('bad group name "%s"', g) |
| |
| # Validate schedulers.trusted_delegation. |
| seen_peers = set() |
| for d in msg.schedulers.trusted_delegation: |
| with ctx.prefix('trusted_delegation #%d (%s): ', i, d.peer_id): |
| if not d.peer_id: |
| ctx.error('"peer_id" is required') |
| else: |
| peer_id = _validate_ident(ctx, 'peer_id', d.peer_id) |
| if peer_id in seen_peers: |
| ctx.error('peer "%s" was specified twice', d.peer_id) |
| elif peer_id: |
| seen_peers.add(peer_id) |
| for j, tag in enumerate(d.require_any_of.tag): |
| if ':' not in tag: |
| ctx.error('bad tag #%d "%s" - must be <key>:<value>', j, tag) |
| |
| # Validate external schedulers. |
| for j, es in enumerate(msg.external_schedulers): |
| if not es.address: |
| ctx.error('%sth external scheduler config had no address', j) |
| |
| _resolve_deployment(ctx, msg, template_map, deployment_map) |
| |
| if msg.HasField('rbe_migration'): |
| with ctx.prefix('rbe_migration: '): |
| _vaildate_rbe_migration(ctx, msg.rbe_migration) |
| |
| if msg.bot_monitoring: |
| if msg.bot_monitoring not in bot_monitorings: |
| ctx.error('refer to missing bot_monitoring %r', msg.bot_monitoring) |
| else: |
| bot_monitoring_unreferred.discard(msg.bot_monitoring) |
| if bot_monitoring_unreferred: |
| ctx.error( |
| 'bot_monitoring not referred to: %s', |
| ', '.join(sorted(bot_monitoring_unreferred))) |
| |
| |
| def bootstrap_dev_server_acls(): |
| """Adds default pools.cfg.""" |
| assert utils.is_local_dev_server() |
| global _LOCAL_FAKE_CONFIG |
| _LOCAL_FAKE_CONFIG = _PoolsCfg( |
| { |
| 'default': |
| init_pool_config( |
| name='default', |
| rev='pools_cfg_rev', |
| scheduling_users=frozenset([ |
| auth.Identity(auth.IDENTITY_USER, 'smoke-test@example.com'), |
| auth.Identity(auth.IDENTITY_BOT, 'whitelisted-ip'), |
| ]), |
| scheduling_algorithm=pools_pb2.Pool.SCHEDULING_ALGORITHM_LIFO, |
| ), |
| }, |
| (None, None), |
| ) |