| # Copyright 2015 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| """Internal bot API handlers.""" |
| |
| import base64 |
| import datetime |
| import json |
| import logging |
| import re |
| import urlparse |
| |
| import six |
| |
| import webob |
| import webapp2 |
| |
| from google.appengine.api import app_identity |
| from google.appengine.api import datastore_errors |
| from google.appengine.ext import ndb |
| from google.appengine import runtime |
| from google.appengine.runtime import apiproxy_errors |
| |
| from components import auth |
| from components import datastore_utils |
| from components import decorators |
| from components import ereporter2 |
| from components import utils |
| from server import acl |
| from server import bot_auth |
| from server import bot_code |
| from server import bot_groups_config |
| from server import bot_management |
| from server import bot_session |
| from server import config |
| from server import named_caches |
| from server import rbe |
| from server import resultdb |
| from server import service_accounts |
| from server import task_pack |
| from server import task_queues |
| from server import task_request |
| from server import task_result |
| from server import task_scheduler |
| from server import task_to_run |
| import api_helpers |
| import ts_mon_metrics |
| |
| # Methods used to authenticate requests from bots, see get_auth_methods(). |
| # |
| # Order matters if a single request have multiple credentials of different |
| # kinds: the first method that found its corresponding header wins. |
| _BOT_AUTH_METHODS = ( |
| auth.optional_gce_vm_authentication, # ignore present, but broken header |
| auth.machine_authentication, |
| auth.oauth_authentication, |
| ) |
| |
| # Allowed bot session IDs. |
| _SESSION_ID_RE = re.compile(r'^[a-z0-9\-_/]{1,50}$') |
| |
| |
| def has_unexpected_subset_keys(expected_keys, minimum_keys, actual_keys, name): |
| """Returns an error if unexpected keys are present or expected keys are |
| missing. |
| |
| Accepts optional keys. |
| |
| This is important to catch typos. |
| """ |
| actual_keys = frozenset(actual_keys) |
| superfluous = actual_keys - expected_keys |
| missing = minimum_keys - actual_keys |
| if superfluous or missing: |
| msg_missing = (' missing: %s' % sorted(missing)) if missing else '' |
| msg_superfluous = ((' superfluous: %s' % |
| sorted(superfluous)) if superfluous else '') |
| return 'Unexpected %s%s%s; did you make a typo?' % (name, msg_missing, |
| msg_superfluous) |
| |
| |
| def log_unexpected_subset_keys(expected_keys, minimum_keys, actual_keys, |
| request, source, name): |
| """Logs an error if unexpected keys are present or expected keys are missing. |
| |
| Accepts optional keys. |
| |
| This is important to catch typos. |
| """ |
| message = has_unexpected_subset_keys(expected_keys, minimum_keys, actual_keys, |
| name) |
| if message: |
| ereporter2.log_request(request, source=source, message=message) |
| return message |
| |
| |
| def has_missing_keys(minimum_keys, actual_keys, name): |
| """Returns an error if expected keys are not present. |
| |
| Do not warn about unexpected keys. |
| """ |
| actual_keys = frozenset(actual_keys) |
| missing = minimum_keys - actual_keys |
| if missing: |
| msg_missing = (' missing: %s' % sorted(missing)) if missing else '' |
| return 'Unexpected %s%s; did you make a typo?' % (name, msg_missing) |
| |
| |
| def is_valid_session_id(session_id): |
| if not isinstance(session_id, basestring): |
| return False |
| return bool(_SESSION_ID_RE.match(session_id)) |
| |
| |
| ## Generic handlers (no auth) |
| |
| |
| class ServerPingHandler(webapp2.RequestHandler): |
| """Handler to ping when checking if the server is up. |
| |
| This handler should be extremely lightweight. It shouldn't do any |
| computations, it should just state that the server is up. It's open to |
| everyone for simplicity and performance. |
| """ |
| |
| def get(self): |
| self.response.headers['Content-Type'] = 'text/plain; charset=utf-8' |
| self.response.out.write('Server up') |
| |
| |
| ## Bot code (bootstrap and swarming_bot.zip) handlers |
| |
| |
| class _BotAuthenticatingHandler(auth.AuthenticatingHandler): |
| """Like AuthenticatingHandler, but also implements machine authentication. |
| |
| Handlers inheriting this class are used during bot bootstrap and self-update. |
| |
| Unlike _BotApiHandler handlers, _BotAuthenticatingHandler handlers don't check |
| dimensions or bot_id, since they are not yet known when these handlers are |
| called. They merely check that the bot credentials are known to the server, or |
| the endpoint is being used by an account authorized to do bot bootstrap. |
| """ |
| |
| # Bots are passing credentials through special headers (not cookies), no need |
| # for XSRF tokens. |
| xsrf_token_enforce_on = () |
| |
| # This is header storing bot id. |
| _X_LUCI_SWARMING_BOT_ID = 'X-Luci-Swarming-Bot-ID' |
| |
| @classmethod |
| def get_auth_methods(cls, conf): # pylint: disable=unused-argument |
| return _BOT_AUTH_METHODS |
| |
| def check_bot_code_access(self, bot_id, generate_token): |
| """Raises AuthorizationError if caller is not authorized to access bot code. |
| |
| Four variants here: |
| 1. A valid bootstrap token is passed as '?tok=...' parameter. |
| 2. An user, allowed to do a bootstrap, is using their credentials. |
| 3. A machine whose IP is in the allowlist is making this call. |
| 4. A bot (with given bot_id) is using it's own machine credentials. |
| |
| In later three cases we optionally generate and return a new bootstrap |
| token, that can be used to authorize /bot_code calls. |
| """ |
| existing_token = self.request.get('tok') |
| if existing_token: |
| payload = bot_code.validate_bootstrap_token(existing_token) |
| if payload is None: |
| raise auth.AuthorizationError('Invalid bootstrap token') |
| logging.debug('Using bootstrap token %r', payload) |
| return existing_token |
| |
| # TODO(crbug/1010555): Remove is_ip_whitelisted_machine check once all bots |
| # are using auth for bootstrap and updating. |
| if (not acl.can_create_bot() and |
| not (bot_id and bot_auth.is_authenticated_bot(bot_id)) and |
| not acl.is_ip_whitelisted_machine()): |
| raise auth.AuthorizationError('Not allowed to access the bot code') |
| |
| return bot_code.generate_bootstrap_token() if generate_token else None |
| |
| |
| class BootstrapHandler(_BotAuthenticatingHandler): |
| """Returns python code to run to bootstrap a swarming bot.""" |
| |
| @auth.public # auth inside check_bot_code_access() |
| def get(self): |
| # We must pass a bootstrap token (generating it, if necessary) to |
| # get_bootstrap(...), since bootstrap.py uses tokens exclusively (it can't |
| # transparently pass OAuth headers to /bot_code). |
| bootstrap_token = self.check_bot_code_access( |
| bot_id=None, generate_token=True) |
| self.response.headers['Content-Type'] = 'text/x-python' |
| self.response.headers['Content-Disposition'] = ( |
| 'attachment; filename="swarming_bot_bootstrap.py"') |
| self.response.out.write( |
| bot_code.get_bootstrap(self.request.host_url, bootstrap_token).content) |
| |
| |
| class BotCodeHandler(_BotAuthenticatingHandler): |
| """Returns a zip file with all the files required by a bot. |
| |
| Optionally specify the hash version to download. If so, the returned data is |
| cacheable. |
| """ |
| |
| @auth.public # auth inside check_bot_code_access() |
| def get(self, version=None): |
| info = bot_code.config_bundle_rev_key().get() |
| |
| # Bootstrap the bot archive for the local smoke test. |
| if not info and utils.is_local_dev_server(): |
| info = bot_code.bootstrap_for_dev_server(self.request.host_url) |
| |
| # If didn't ask for a concrete version, or asked for a version we don't |
| # know, redirect to the latest stable version. Use a redirect, instead of |
| # serving it directly, to utilize the GAE response cache. |
| known = version and version in (info.stable_bot.digest, |
| info.canary_bot.digest) |
| if not version or not known: |
| if version: |
| logging.warning('Requesting unknown version %s', version) |
| # Historically, the bot_id query argument was used for the bot to pass its |
| # ID to the server. More recent bot code uses the X-Luci-Swarming-Bot-ID |
| # HTTP header instead so it doesn't bust the GAE transparent public cache. |
| # Support both mechanism until 2020-01-01. |
| bot_id = self.request.get('bot_id') or self.request.headers.get( |
| self._X_LUCI_SWARMING_BOT_ID) or _bot_id_from_auth_token() |
| self.check_bot_code_access(bot_id=bot_id, generate_token=False) |
| self.redirect_to_version(info.stable_bot.digest) |
| return |
| |
| # If the request has a query string, redirect to an URI without it, since |
| # it is the one being cached by GAE. |
| if self.request.query_string: |
| logging.info('Ignoring query string: %s', self.request.query_string) |
| self.redirect_to_version(version) |
| return |
| |
| # Serve the corresponding bot code, asking GAE to cache it for a while. |
| # TODO(b/362324087): Improve. |
| if version == info.stable_bot.digest: |
| self.serve_cached_blob(info.stable_bot.fetch_archive()) |
| elif version == info.canary_bot.digest: |
| self.serve_cached_blob(info.canary_bot.fetch_archive()) |
| else: |
| raise AssertionError('Impossible, already checked version is known') |
| |
| def redirect_to_version(self, version): |
| self.redirect('%s/swarming/api/v1/bot/bot_code/%s' % |
| (str(self.request.host_url), str(version))) |
| |
| def serve_cached_blob(self, blob): |
| self.response.headers['Cache-Control'] = 'public, max-age=3600' |
| self.response.headers['Content-Type'] = 'application/octet-stream' |
| self.response.headers['Content-Disposition'] = ( |
| 'attachment; filename="swarming_bot.zip"') |
| self.response.out.write(blob) |
| |
| |
| def _bot_id_from_auth_token(): |
| """Extracts bot ID from the authenticated credentials.""" |
| ident = auth.get_peer_identity() |
| if ident.kind != 'bot': |
| return None |
| # This is either 'botid.domain' or 'botid@gce.project'. |
| val = ident.name |
| if '@' in val: |
| return val.split('@')[0] |
| return val.split('.')[0] |
| |
| |
| ## Bot API RPCs |
| |
| |
| class _BotApiHandler(auth.ApiHandler): |
| """Like ApiHandler, but also implements machine authentication.""" |
| |
| # Bots are passing credentials through special headers (not cookies), no need |
| # for XSRF tokens. |
| xsrf_token_enforce_on = () |
| |
| @classmethod |
| def get_auth_methods(cls, conf): # pylint: disable=unused-argument |
| return _BOT_AUTH_METHODS |
| |
| |
| ### Bot Session API RPC handlers |
| |
| |
| class _ProcessResult(object): |
| """Returned by _BotBaseHandler.process.""" |
| |
| # A dict with parsed JSON request body, as it was received. |
| request = None |
| # Bot identifier, extracted from 'id' dimension. |
| bot_id = None |
| # Version of the bot code, as reported by the bot itself. |
| version = None |
| # Dict with bot state (as reported by the bot). |
| state = None |
| # Dict with bot dimensions (union of bot-reported and server-side ones). |
| dimensions = None |
| # An RBE instance the bot should be using (if any). |
| rbe_instance = None |
| # If True, consume tasks from both Swarming and RBE schedulers. |
| rbe_hybrid_mode = None |
| # Instance of BotGroupConfig with server-side bot config (from bots.cfg). |
| bot_group_cfg = None |
| # Instance of BotAuth with auth method details used to authenticate the bot. |
| bot_auth_cfg = None |
| # BotDetails to pass to the scheduler. |
| bot_details = None |
| # Bot quarantine message (or None if the bot is not in a quarantine). |
| quarantined_msg = None |
| # Bot maintenance message (or None if the bot is not under maintenance). |
| maintenance_msg = None |
| |
| def __init__(self, **kwargs): |
| for k, v in kwargs.items(): |
| # Typo catching assert, ensure _ProcessResult class has the attribute. |
| assert hasattr(self, k), k |
| setattr(self, k, v) |
| |
| @property |
| def os(self): |
| return (self.dimensions or {}).get('os') or [] |
| |
| def rbe_params(self, sleep_streak): |
| """Generates a dict with RBE-related parameters for bots in RBE mode.""" |
| assert self.rbe_instance |
| return { |
| 'instance': |
| self.rbe_instance, |
| 'hybrid_mode': |
| self.rbe_hybrid_mode, |
| 'sleep': |
| task_scheduler.exponential_backoff(sleep_streak), |
| 'poll_token': |
| rbe.generate_poll_token( |
| bot_id=self.bot_id, |
| rbe_instance=self.rbe_instance, |
| enforced_dimensions=self.bot_group_cfg.dimensions, |
| bot_auth_cfg=self.bot_auth_cfg, |
| ), |
| } |
| |
| |
| def _validate_dimensions(dimensions): |
| """Validates bot dimensions.""" |
| error_msgs = [] |
| for key, values in sorted(dimensions.items()): |
| if not config.validate_dimension_key(key): |
| error_msgs.append("Invalid dimension key: %s" % key) |
| # keep validating values. |
| if not isinstance(values, list): |
| error_msgs.append("Dimension values must be list. key: %s, values: %s" % |
| (key, values)) |
| # the following validations assume the values is a list. |
| continue |
| |
| if len(values) == 0: |
| error_msgs.append("Dimension values should not be empty. key: %s" % key) |
| continue |
| |
| has_invalid_value_type = False |
| for value in sorted(values): |
| if config.validate_dimension_value(value): |
| continue |
| error_msgs.append("Invalid dimension value. key: %s, value: %s" % |
| (key, value)) |
| if not isinstance(value, six.text_type): |
| has_invalid_value_type = True |
| if not has_invalid_value_type and len(values) != len(set(values)): |
| error_msgs.append( |
| "Dimension values include duplication. key: %s, values: %s" % |
| (key, values)) |
| return error_msgs |
| |
| |
| class _BotBaseHandler(_BotApiHandler): |
| """ |
| Request body is a JSON dict: |
| { |
| "dimensions": <dict of properties>, |
| "state": <dict of properties>, |
| "version": <sha-1 of swarming_bot.zip uncompressed content>, |
| } |
| |
| Well-known state keys: |
| * running_time: how long the bot process is running in seconds. |
| * sleep_streak: number of consecutive idle cycles. Native scheduler only. |
| * maintenance: a bot maintenance message. If set, the bot won't get tasks. |
| * quarantined: a boolean, if True the bot won't get tasks. |
| * original_bot_id: the ID the bot started with. |
| * bot_group_cfg_version: the version of BotGroupConfig the bot is using now. |
| * rbe_instance: the RBE instance the bot is connected to right now. |
| * rbe_session: the RBE session ID, if have an active RBE session. |
| * rbe_idle: True if the last RBE poll returned no tasks, None if not on RBE. |
| """ |
| |
| EXPECTED_KEYS = {u'dimensions', u'state', u'version'} |
| OPTIONAL_KEYS = {u'session'} |
| REQUIRED_STATE_KEYS = {u'running_time', u'sleep_streak'} |
| |
| # Endpoint name to use in timeout tsmon metrics. |
| TSMON_ENDPOINT_ID = 'bot/unknown' |
| |
| # All exception classes that can indicate a datastore or overall timeout. |
| TIMEOUT_EXCEPTIONS = ( |
| apiproxy_errors.CancelledError, |
| apiproxy_errors.DeadlineExceededError, |
| datastore_errors.InternalError, |
| datastore_errors.Timeout, |
| datastore_utils.CommitError, # one reason is timeout |
| runtime.DeadlineExceededError, |
| task_to_run.ScanDeadlineError, |
| ) |
| |
| def abort_by_timeout(self, stage, exc): |
| if isinstance(exc, task_to_run.ScanDeadlineError): |
| stage += ':%s' % exc.code |
| clazz = exc.__class__.__name__ |
| ts_mon_metrics.on_handler_timeout(self.TSMON_ENDPOINT_ID, stage, clazz) |
| logging.exception('abort_by_timeout: %s %s', stage, clazz) |
| self.abort(429, 'Timeout in %s (%s)' % (stage, clazz)) |
| |
| def process(self): |
| """Fetches bot info and settings, does authorization and quarantine checks. |
| |
| Returns: |
| _ProcessResult instance, see its fields for more info. |
| |
| Raises: |
| auth.AuthorizationError if bot's credentials are invalid. |
| """ |
| request = self.parse_body() |
| version = request.get('version', None) |
| |
| dimensions = request.get('dimensions') or {} |
| state = request.get('state') or {} |
| bot_id = None |
| if dimensions.get('id'): |
| dimension_id = dimensions['id'] |
| if (isinstance(dimension_id, list) and len(dimension_id) == 1 and |
| isinstance(dimension_id[0], unicode)): |
| bot_id = dimensions['id'][0] |
| |
| if bot_id: |
| logging.debug('Fetching bot settings for bot id: %s', bot_id) |
| bot_settings = bot_management.get_settings_key(bot_id).get() |
| |
| # Make sure bot self-reported ID matches the authentication token. Raises |
| # auth.AuthorizationError if not. |
| bot_group_cfg, bot_auth_cfg = bot_auth.authenticate_bot(bot_id) |
| logging.debug('Fetched bot_group_cfg for %s: %s', bot_id, bot_group_cfg) |
| |
| # The server side dimensions from bot_group_cfg override bot-provided ones. |
| # If both server side config and bot report some dimension, server side |
| # config wins. We still emit an warning if bot tries to supply the dimension |
| # and it disagrees with the server defined one. Note that this may happen |
| # on a first poll after server side config for a bot has changed. The bot |
| # doesn't know about new server-assigned dimensions yet in this case. Also |
| # don't report ['default'], bot sends it in the handshake before it knows |
| # anything at all. |
| for dim_key, from_cfg in bot_group_cfg.dimensions.items(): |
| from_bot = sorted(dimensions.get(dim_key) or []) |
| from_cfg = sorted(from_cfg) |
| if from_bot and from_bot != ['default'] and from_bot != from_cfg: |
| logging.warning( |
| 'Dimensions in bots.cfg don\'t match ones provided by the bot\n' |
| 'bot_id: "%s", key: "%s", from_bot: %s, from_cfg: %s', bot_id, |
| dim_key, from_bot, from_cfg) |
| dimensions[dim_key] = from_cfg |
| |
| # Check the config to see if this bot is in the RBE mode. |
| rbe_cfg = rbe.get_rbe_config_for_bot(bot_id, dimensions.get('pool')) |
| |
| # These are passed to the scheduler as is to be stored with the task. |
| bot_details = task_scheduler.BotDetails(version, |
| bot_group_cfg.logs_cloud_project) |
| |
| # Fill in all result fields except 'quarantined_msg'. |
| result = _ProcessResult( |
| request=request, |
| bot_id=bot_id, |
| version=version, |
| state=state, |
| dimensions=dimensions, |
| rbe_instance=rbe_cfg.instance if rbe_cfg else None, |
| rbe_hybrid_mode=rbe_cfg.hybrid_mode if rbe_cfg else False, |
| bot_group_cfg=bot_group_cfg, |
| bot_auth_cfg=bot_auth_cfg, |
| bot_details=bot_details, |
| maintenance_msg=state.get('maintenance')) |
| |
| # The bot may decide to "self-quarantine" itself. Accept both via |
| # dimensions or via state. See bot_management._BotCommon.quarantined for |
| # more details. |
| if (bool(dimensions.get('quarantined')) or bool(state.get('quarantined'))): |
| result.quarantined_msg = 'Bot self-quarantined' |
| return result |
| |
| # Quarantine if the request is invalid. |
| validation_errors = [] |
| |
| err_msg = has_unexpected_subset_keys( |
| self.EXPECTED_KEYS | self.OPTIONAL_KEYS, self.EXPECTED_KEYS, request, |
| 'keys') |
| if err_msg: |
| validation_errors.append(err_msg) |
| |
| if state: |
| err_msg = has_missing_keys(self.REQUIRED_STATE_KEYS, state, 'state') |
| if err_msg: |
| validation_errors.append(err_msg) |
| |
| dimension_errors = _validate_dimensions(dimensions) |
| if dimension_errors: |
| validation_errors += dimension_errors |
| # reset dimensions to avoid NDB model errors. |
| result.dimensions = {} |
| |
| if validation_errors: |
| quarantined_msg = '\n'.join(validation_errors) |
| line = 'Quarantined Bot\nhttps://%s/restricted/bot/%s\n%s' % ( |
| app_identity.get_default_version_hostname(), bot_id, quarantined_msg) |
| ereporter2.log_request(self.request, source='bot', message=line) |
| result.quarantined_msg = quarantined_msg |
| return result |
| |
| # Look for admin enforced quarantine. |
| if bool(bot_settings and bot_settings.quarantined): |
| result.quarantined_msg = 'Quarantined by admin' |
| return result |
| |
| return result |
| |
| def prepare_manifest(self, request, secret_bytes, run_result, |
| bot_request_info): |
| """Returns a manifest with all information about a task needed to run it. |
| |
| Arguments: |
| request: TaskRequest representing the task. |
| secret_bytes: SecretBytes with task secrets. |
| run_result: TaskRunResult identifying the slice to run. |
| bot_request_info: _ProcessResult as returned by process(). |
| |
| Returns: |
| A task manifest dict ready to be placed in a some JSON response. |
| """ |
| props = request.task_slice(run_result.current_task_slice).properties |
| caches = [c.to_dict() for c in props.caches] |
| pool = props.dimensions['pool'][0] |
| names = [c.name for c in props.caches] |
| |
| # Warning: this is doing a DB GET on the cold path, which will increase the |
| # reap failure. |
| # |
| # TODO(vadimsh): Do this before recording bot_event. |
| hints = named_caches.get_hints(pool, bot_request_info.os, names) |
| for i, hint in enumerate(hints): |
| caches[i]['hint'] = str(hint) |
| |
| logging.debug('named cache: %s', caches) |
| |
| resultdb_context = None |
| if request.resultdb_update_token: |
| resultdb_context = { |
| 'hostname': |
| urlparse.urlparse(config.settings().resultdb.server).hostname, |
| 'current_invocation': { |
| 'name': |
| resultdb.get_invocation_name( |
| task_pack.pack_run_result_key(run_result.run_result_key)), |
| 'update_token': |
| request.resultdb_update_token, |
| } |
| } |
| realm_context = {} |
| if request.realm: |
| realm_context['name'] = request.realm |
| |
| out = { |
| 'bot_id': |
| bot_request_info.bot_id, |
| 'bot_authenticated_as': |
| auth.get_peer_identity().to_bytes(), |
| 'caches': |
| caches, |
| 'cipd_input': { |
| 'client_package': props.cipd_input.client_package.to_dict(), |
| 'packages': [p.to_dict() for p in props.cipd_input.packages], |
| 'server': props.cipd_input.server, |
| } if props.cipd_input else None, |
| 'command': |
| props.command, |
| 'containment': { |
| 'containment_type': props.containment.containment_type, |
| } if props.containment else {}, |
| 'dimensions': |
| props.dimensions, |
| 'env': |
| props.env, |
| 'env_prefixes': |
| props.env_prefixes, |
| 'grace_period': |
| props.grace_period_secs, |
| 'hard_timeout': |
| props.execution_timeout_secs, |
| # TODO(b/355013257): Stop sending this in Go version. |
| 'host': |
| utils.get_versioned_hosturl(), |
| 'io_timeout': |
| props.io_timeout_secs, |
| 'secret_bytes': |
| (secret_bytes.secret_bytes.encode('base64') if secret_bytes else None), |
| 'cas_input_root': { |
| 'cas_instance': props.cas_input_root.cas_instance, |
| 'digest': { |
| 'hash': props.cas_input_root.digest.hash, |
| 'size_bytes': props.cas_input_root.digest.size_bytes, |
| }, |
| } if props.cas_input_root else None, |
| 'outputs': |
| props.outputs, |
| 'realm': |
| realm_context, |
| 'relative_cwd': |
| props.relative_cwd, |
| 'resultdb': |
| resultdb_context, |
| 'service_accounts': { |
| 'system': { |
| # 'none', 'bot' or email. Bot interprets 'none' and 'bot' |
| # locally. When it sees something else, it uses /oauth_token |
| # API endpoint to grab tokens through server. |
| 'service_account': |
| bot_request_info.bot_group_cfg.system_service_account or 'none', |
| }, |
| 'task': { |
| # Same here. |
| 'service_account': request.service_account, |
| }, |
| }, |
| 'task_id': |
| task_pack.pack_run_result_key(run_result.key), |
| 'bot_dimensions': |
| bot_request_info.dimensions, |
| } |
| return utils.to_json_encodable(out) |
| |
| |
| class BotHandshakeHandler(_BotBaseHandler): |
| """First request to be called to get initial data like bot code version. |
| |
| The bot is server-controlled so the server doesn't have to support multiple |
| API version. When running a task, the bot sync the version specific URL. |
| Once a bot finishes its currently running task, it'll be immediately upgraded |
| on its next poll. |
| |
| This endpoint does not return commands to the bot, for example to upgrade |
| itself. It'll be told so when it does its first poll. |
| |
| Response body is a JSON dict: |
| { |
| "bot_version": <sha-1 of swarming_bot.zip uncompressed content>, |
| "server_version": "138-193f1f3", |
| "bot_group_cfg_version": "0123abcdef", |
| "bot_group_cfg": { |
| "dimensions": { <server-defined dimensions> }, |
| }, |
| "rbe": { |
| ... |
| } |
| } |
| """ |
| |
| OPTIONAL_KEYS = {u'session_id'} |
| |
| @auth.public # auth happens in self.process() |
| def post(self): |
| res = self.process() |
| |
| # Bot session is optional for now until it is fully rolled out. |
| session_id = res.request.get('session_id') |
| if session_id is not None: |
| if not is_valid_session_id(session_id): |
| self.abort_with_error(400, error='Bad session ID') |
| return |
| |
| # This boolean marks the state as "not fully initialized yet", since it |
| # lacks entries reported by custom bot hooks (the bot doesn't have them |
| # yet, they are returned below). |
| res.state['handshaking'] = True |
| |
| # The dimensions provided by Bot won't be applied to BotInfo since they |
| # provide them without injected bot_config. The bot will report valid |
| # dimensions at poll. |
| bot_management.bot_event( |
| event_type='bot_connected', |
| bot_id=res.bot_id, |
| session_id=session_id, |
| external_ip=self.request.remote_addr, |
| authenticated_as=auth.get_peer_identity().to_bytes(), |
| dimensions=res.dimensions, |
| state=res.state, |
| version=res.version, |
| quarantined=bool(res.quarantined_msg), |
| maintenance_msg=res.maintenance_msg, |
| event_msg=res.quarantined_msg) |
| |
| channel = bot_code.get_bot_channel(res.bot_id, config.settings()) |
| logging.debug('Bot channel: %s', channel) |
| bot_ver, bot_config_rev = bot_code.get_bot_version(channel) |
| |
| data = { |
| 'bot_version': bot_ver, |
| 'bot_config_rev': bot_config_rev, |
| 'bot_config_name': 'bot_config.py', |
| 'server_version': utils.get_app_version(), |
| 'bot_group_cfg_version': res.bot_group_cfg.version, |
| 'bot_group_cfg': { |
| # Let the bot know its server-side dimensions (from bots.cfg file). |
| 'dimensions': res.bot_group_cfg.dimensions, |
| }, |
| } |
| if res.bot_group_cfg.bot_config_script_content: |
| logging.info('Injecting %s: rev %s, %d bytes', |
| res.bot_group_cfg.bot_config_script, |
| res.bot_group_cfg.bot_config_script_rev, |
| len(res.bot_group_cfg.bot_config_script_content)) |
| data['bot_config'] = res.bot_group_cfg.bot_config_script_content |
| data['bot_config_rev'] = res.bot_group_cfg.bot_config_script_rev |
| data['bot_config_name'] = res.bot_group_cfg.bot_config_script |
| if res.rbe_instance: |
| data['rbe'] = res.rbe_params(0) |
| |
| if session_id: |
| logging.info('Session ID: %s', session_id) |
| data['session'] = bot_session.marshal( |
| bot_session.create( |
| bot_id=res.bot_id, |
| session_id=session_id, |
| bot_group_cfg=res.bot_group_cfg, |
| )) |
| |
| self.send_response(data) |
| |
| |
| class BotPollHandler(_BotBaseHandler): |
| """The bot polls for a task; returns either a task, update command or sleep. |
| |
| In case of exception on the bot, this is enough to get it just far enough to |
| eventually self-update to a working version. This is to ensure that coding |
| errors in bot code doesn't kill all the fleet at once, they should still be up |
| just enough to be able to self-update again even if they don't get task |
| assigned anymore. |
| """ |
| TSMON_ENDPOINT_ID = 'bot/poll' |
| OPTIONAL_KEYS = {u'session', u'request_uuid', u'force'} |
| |
| @auth.public # auth happens in self.process() |
| def post(self): |
| """Handles a polling request. |
| |
| Be very permissive on missing values. This can happen because of errors |
| on the bot, *we don't want to deny them the capacity to update*, so that the |
| bot code is eventually fixed and the bot self-update to this working code. |
| |
| It makes recovery of the fleet in case of catastrophic failure much easier. |
| """ |
| logging.debug('Request started') |
| settings = config.settings() |
| if settings.force_bots_to_sleep_and_not_run_task: |
| # Ignore everything, just sleep. Tell the bot it is quarantined to inform |
| # it that it won't be running anything anyway. Use a large streak so it |
| # will sleep for 60s. |
| self._cmd_sleep(1000, True) |
| return |
| |
| deadline = utils.utcnow() + datetime.timedelta(seconds=60) |
| res = self.process() |
| |
| sleep_streak = res.state.get('sleep_streak', 0) |
| quarantined = bool(res.quarantined_msg) |
| |
| # TODO(vadimsh): This is temporary to detect bots that dynamically change |
| # ID. See bot_main._get_state. |
| original_bot_id = res.state.get('original_bot_id') |
| if original_bot_id and original_bot_id != res.bot_id: |
| logging.error('bot_id_mismatch: original=%s, reported=%s' % |
| (original_bot_id, res.bot_id)) |
| |
| # Note bot existence at two places, one for stats at 1 minute resolution, |
| # the other for the list of known bots. |
| |
| def bot_event(event_type, task_id=None, task_name=None): |
| try: |
| bot_management.bot_event( |
| event_type=event_type, |
| bot_id=res.bot_id, |
| external_ip=self.request.remote_addr, |
| authenticated_as=auth.get_peer_identity().to_bytes(), |
| dimensions=res.dimensions, |
| state=res.state, |
| version=res.version, |
| quarantined=quarantined, |
| maintenance_msg=res.maintenance_msg, |
| event_msg=res.quarantined_msg, |
| task_id=task_id, |
| task_name=task_name, |
| register_dimensions=True) |
| except self.TIMEOUT_EXCEPTIONS as e: |
| self.abort_by_timeout('bot_event:%s' % event_type, e) |
| except datastore_errors.BadValueError as e: |
| logging.warning('Invalid BotInfo or BotEvent values', exc_info=True) |
| self.abort_with_error(400, error=str(e)) |
| |
| # Ask the bot to update itself if it runs unexpected version. |
| channel = bot_code.get_bot_channel(res.bot_id, settings) |
| logging.debug('Bot channel: %s', channel) |
| expected_version, _ = bot_code.get_bot_version(channel) |
| if res.version != expected_version: |
| bot_event('request_update') |
| self._cmd_update(expected_version) |
| return |
| |
| # If the server-side per-bot config for the bot has changed, we need |
| # to restart this particular bot, so it picks up new config in /handshake. |
| # Do this check only for bots that know about server-side per-bot configs |
| # already (such bots send 'bot_group_cfg_version' state attribute). |
| cur_bot_cfg_ver = res.state.get('bot_group_cfg_version') |
| logging.debug('bot_config version: %s, latest: %s', cur_bot_cfg_ver, |
| res.bot_group_cfg.version) |
| if cur_bot_cfg_ver and cur_bot_cfg_ver != res.bot_group_cfg.version: |
| bot_event('request_restart') |
| self._cmd_bot_restart('Restarting to pick up new bots.cfg config') |
| return |
| |
| if quarantined: |
| bot_event('request_sleep') |
| self._cmd_sleep(sleep_streak, quarantined) |
| return |
| |
| # |
| # At that point, the bot should be in relatively good shape since it's |
| # running the right version. It is still possible that invalid code was |
| # pushed to the server, so be diligent about it. |
| # |
| |
| # If a bot advertise itself with a key state 'maintenance', do not give |
| # a task to it until this key is removed. |
| # |
| # It's an 'hack' because this is not listed in the DB as a separate state, |
| # which hinders system monitoring. See bot_management.BotInfo. In practice, |
| # ts_mon_metrics.py can look a BotInfo.get('maintenance') to determine if a |
| # bot is in maintenance or idle. |
| if res.state.get('maintenance'): |
| bot_event('request_sleep') |
| # Tell the bot it's considered quarantined. |
| self._cmd_sleep(sleep_streak, True) |
| return |
| |
| bot_info = bot_management.get_info_key(res.bot_id).get(use_cache=False, |
| use_memcache=False) |
| # TODO(crbug.com/1077188): |
| # avoid assigning to bots with another task assigned. |
| if bot_info and bot_info.task_id: |
| logging.error('Task %s is already assigned to the bot %s', |
| bot_info.task_id, res.bot_id) |
| |
| # The bot is in good shape. |
| |
| # True if a hybrid RBE bot wants to poll from Swarming scheduler. |
| force_poll = res.request.get('force') |
| if res.rbe_instance and res.rbe_hybrid_mode and force_poll: |
| logging.info('Force-polling Swarming from an RBE bot') |
| |
| # We need to make two separate decisions based on RBE migration status |
| # of the bot: whether the bot should be registered in the Swarming |
| # scheduler, and whether it should *use* the Swarming scheduler. |
| # |
| # The answers are different for hybrid bots: we want to always register them |
| # in the scheduler, but we *use* the scheduler only when `force` is True. |
| if not res.rbe_instance: |
| # Native Swarming bot. Need to use the Swarming scheduler. |
| scheduler_reg = True |
| scheduler_use = True |
| elif not res.rbe_hybrid_mode: |
| # Pure RBE-mode bot. Don't use the Swarming scheduler. |
| scheduler_reg = False |
| scheduler_use = False |
| else: |
| # Hybrid RBE-mode bot. Decide based on `force` request field. |
| scheduler_reg = True |
| scheduler_use = force_poll |
| |
| # Register in the Swarming scheduler, get queues to poll. |
| queues = None |
| if scheduler_reg: |
| try: |
| queues = task_queues.assert_bot(res.dimensions) |
| except self.TIMEOUT_EXCEPTIONS as e: |
| self.abort_by_timeout('assert_bot', e) |
| |
| # Grab the task from the Swarming scheduler if actually using it. |
| request = None |
| if scheduler_use: |
| # Try to grab a task. Leave ~10s for bot_event(...) transaction below. |
| reap_deadline = deadline - datetime.timedelta(seconds=10) |
| request_uuid = res.request.get('request_uuid') |
| try: |
| (request, secret_bytes, |
| run_result), is_deduped = api_helpers.cache_request( |
| 'bot_poll', request_uuid, lambda: task_scheduler.bot_reap_task( |
| res.dimensions, queues, res.bot_details, reap_deadline)) |
| if is_deduped: |
| logging.info('Reusing request cache with uuid %s', request_uuid) |
| except self.TIMEOUT_EXCEPTIONS as e: |
| self.abort_by_timeout('bot_reap_task', e) |
| |
| if not request: |
| # No tasks found in the Swarming scheduler or not using it at all. |
| if not res.rbe_instance: |
| # If this is a native Swarming bot, tell it to sleep a bit. |
| bot_event('request_sleep') |
| self._cmd_sleep(sleep_streak, False) |
| else: |
| # If this is an RBE Swarming bot, tell it to switch to RBE or keep using |
| # RBE if it already does. It is important we record an appropriate bot |
| # event here. They are necessary to keep the bot alive in Swarming |
| # datastore and to keep track of bot idleness status. |
| if res.state.get('rbe_idle') is False: |
| bot_event('bot_polling') # the bot got a task from RBE recently |
| else: |
| bot_event('bot_idle') # the bot is not seeing any RBE tasks |
| self._cmd_rbe(sleep_streak, res) |
| return |
| |
| # This part is tricky since it intentionally runs a transaction after |
| # another one. |
| if request.task_slice( |
| run_result.current_task_slice).properties.is_terminate: |
| if res.rbe_instance: |
| logging.warning('RBE: termination through Swarming scheduler') |
| bot_event('bot_terminate', task_id=run_result.task_id) |
| self._cmd_terminate(run_result.task_id) |
| else: |
| if res.rbe_instance: |
| logging.warning('RBE: task through Swarming scheduler') |
| bot_event('request_task', |
| task_id=run_result.task_id, |
| task_name=request.name) |
| self._cmd_run(request, secret_bytes, run_result, res) |
| |
| def _cmd_run(self, request, secret_bytes, run_result, bot_request_info): |
| logging.info('Run: %s', request.task_id) |
| manifest = self.prepare_manifest(request, secret_bytes, run_result, |
| bot_request_info) |
| out = { |
| 'cmd': 'run', |
| 'manifest': manifest, |
| } |
| if bot_request_info.rbe_instance: |
| out['rbe'] = bot_request_info.rbe_params(0) |
| self.send_response(out) |
| |
| def _cmd_sleep(self, sleep_streak, quarantined): |
| duration = task_scheduler.exponential_backoff(sleep_streak) |
| logging.debug('Sleep: streak: %d; duration: %ds; quarantined: %s', |
| sleep_streak, duration, quarantined) |
| out = { |
| 'cmd': 'sleep', |
| 'duration': duration, |
| 'quarantined': quarantined, # TODO(vadimsh): Appears to be ignored. |
| } |
| self.send_response(out) |
| |
| def _cmd_rbe(self, sleep_streak, bot_request_info): |
| logging.info('RBE mode: %s%s', bot_request_info.rbe_instance, |
| ' (hybrid)' if bot_request_info.rbe_hybrid_mode else '') |
| out = { |
| 'cmd': 'rbe', |
| 'rbe': bot_request_info.rbe_params(sleep_streak), |
| } |
| self.send_response(out) |
| |
| def _cmd_terminate(self, task_id): |
| logging.info('Terminate: %s', task_id) |
| out = { |
| 'cmd': 'terminate', |
| 'task_id': task_id, |
| } |
| self.send_response(out) |
| |
| def _cmd_update(self, expected_version): |
| logging.info('Update: %s', expected_version) |
| out = { |
| 'cmd': 'update', |
| 'version': expected_version, |
| } |
| self.send_response(out) |
| |
| def _cmd_bot_restart(self, message): |
| logging.info('Restarting bot: %s', message) |
| out = { |
| 'cmd': 'bot_restart', |
| 'message': message, |
| } |
| self.send_response(out) |
| |
| |
| class BotClaimHandler(_BotBaseHandler): |
| """Called by a bot that wants to claim a pending TaskToRun for itself. |
| |
| This transactionally assigns the task to this bot or returns a rejection error |
| if the TaskToRun is no longer pending. |
| |
| Used by bots in RBE mode after they get the lease from RBE. |
| """ |
| TSMON_ENDPOINT_ID = 'bot/claim' |
| EXPECTED_KEYS = _BotBaseHandler.EXPECTED_KEYS | { |
| u'claim_id', # an opaque string used to make the request idempotent |
| u'task_id', # TaskResultSummary packed ID |
| u'task_to_run_shard', # shard index identifying TaskToRunShardXXX class |
| u'task_to_run_id', # TaskToRunShardXXX integer entity ID |
| } |
| |
| @auth.public # auth happens in self.process() |
| def post(self): |
| res = self.process() |
| |
| logging.info('Claiming task %s (shard %s, id %s)', res.request['task_id'], |
| res.request['task_to_run_shard'], |
| res.request['task_to_run_id']) |
| |
| # Get TaskToRunShardXXX entity key that identifies the slice to claim. |
| try: |
| to_run_key = task_to_run.task_to_run_key_from_parts( |
| task_pack.get_request_and_result_keys(res.request['task_id'])[0], |
| res.request['task_to_run_shard'], res.request['task_to_run_id']) |
| except ValueError as e: |
| self.abort_with_error(400, error=str(e)) |
| |
| # Try to transactionally claim the slice. |
| try: |
| request, secret_bytes, run_result = task_scheduler.bot_claim_slice( |
| bot_dimensions=res.dimensions, |
| bot_details=res.bot_details, |
| to_run_key=to_run_key, |
| claim_id='%s:%s' % (res.bot_id, res.request['claim_id'])) |
| except self.TIMEOUT_EXCEPTIONS as e: |
| self.abort_by_timeout('bot_claim_slice', e) |
| except task_scheduler.ClaimError as e: |
| # The slice was already claimed by someone else (or it has expired). |
| self._cmd_skip(str(e)) |
| return |
| |
| # Update the state of the bot in the DB. It is now presumably works on |
| # the claimed slice. |
| # |
| # TODO(vadimsh): Record bot_event in the same transaction that claims the |
| # slice. |
| is_terminate = request.task_slice( |
| run_result.current_task_slice).properties.is_terminate |
| event_type = 'bot_terminate' if is_terminate else 'request_task' |
| try: |
| bot_management.bot_event( |
| event_type=event_type, |
| bot_id=res.bot_id, |
| external_ip=self.request.remote_addr, |
| authenticated_as=auth.get_peer_identity().to_bytes(), |
| dimensions=res.dimensions, |
| state=res.state, |
| version=res.version, |
| task_id=run_result.task_id, |
| task_name=None if is_terminate else request.name) |
| except self.TIMEOUT_EXCEPTIONS as e: |
| self.abort_by_timeout('bot_event:%s' % event_type, e) |
| except datastore_errors.BadValueError as e: |
| logging.warning('Invalid BotInfo or BotEvent values', exc_info=True) |
| self.abort_with_error(400, error=str(e)) |
| |
| # Return the payload to the bot. |
| if is_terminate: |
| self._cmd_terminate(run_result.task_id) |
| else: |
| self._cmd_run(request, secret_bytes, run_result, res) |
| |
| def _cmd_skip(self, reason): |
| logging.info('Skip: %s', reason) |
| self.send_response({ |
| 'cmd': 'skip', |
| 'reason': reason, |
| }) |
| |
| def _cmd_terminate(self, task_id): |
| logging.info('Terminate: %s', task_id) |
| self.send_response({ |
| 'cmd': 'terminate', |
| 'task_id': task_id, |
| }) |
| |
| def _cmd_run(self, request, secret_bytes, run_result, bot_request_info): |
| logging.info('Run: %s', request.task_id) |
| manifest = self.prepare_manifest(request, secret_bytes, run_result, |
| bot_request_info) |
| self.send_response({ |
| 'cmd': 'run', |
| 'manifest': manifest, |
| }) |
| |
| |
| class BotEventHandler(_BotBaseHandler): |
| """On signal that a bot had an event worth logging.""" |
| |
| EXPECTED_KEYS = _BotBaseHandler.EXPECTED_KEYS | {u'event', u'message'} |
| |
| ALLOWED_EVENTS = ('bot_error', 'bot_log', 'bot_rebooting', 'bot_shutdown') |
| |
| @auth.public # auth happens in self.process() |
| def post(self): |
| res = self.process() |
| event = res.request.get('event') |
| if event not in self.ALLOWED_EVENTS: |
| logging.error('Unexpected event type: %s', event) |
| self.abort_with_error(400, error='Unsupported event type') |
| message = res.request.get('message') |
| # Record the event in a BotEvent entity so it can be listed on the bot's |
| # page. The dimensions won't be applied to BotInfo since they may not be |
| # valid, but will be applied to BotEvent for analysis purpose. |
| try: |
| bot_management.bot_event( |
| event_type=event, |
| bot_id=res.bot_id, |
| external_ip=self.request.remote_addr, |
| authenticated_as=auth.get_peer_identity().to_bytes(), |
| dimensions=res.dimensions, |
| state=res.state, |
| version=res.version, |
| quarantined=bool(res.quarantined_msg), |
| maintenance_msg=res.maintenance_msg, |
| event_msg=message) |
| except datastore_errors.BadValueError as e: |
| logging.warning('Invalid BotInfo or BotEvent values', exc_info=True) |
| return self.abort_with_error(400, error=str(e)) |
| |
| if event == 'bot_error': |
| # Also logs this to ereporter2, so it will be listed in the server's |
| # hourly ereporter2 report. THIS IS NOISY so it should only be done with |
| # issues requiring action. In this case, include again the bot's URL since |
| # there's no context in the report. Redundantly include the bot id so |
| # messages are bucketted by bot. |
| line = ('%s\n' |
| '\nhttps://%s/restricted/bot/%s') % ( |
| message, app_identity.get_default_version_hostname(), |
| res.bot_id) |
| ereporter2.log_request(self.request, source='bot', message=line) |
| self.send_response({}) |
| |
| |
| ### Bot Security API RPC handlers |
| |
| |
| class _BotTokenHandler(_BotApiHandler): |
| """Base class for BotOAuthTokenHandler and BotIDTokenHandler.""" |
| |
| TOKEN_KIND = None # to be set in subclasses |
| TOKEN_RESPONSE_KEY = None # to be set in subclasses |
| |
| ACCEPTED_KEYS = None # to be set in subclasses |
| REQUIRED_KEYS = None # to be set in subclasses |
| |
| def extract_token_params(self, request): |
| """Validates and extract fields specific to the requested token type. |
| |
| Args: |
| request: a dict with the token request body. |
| |
| Returns: |
| Tuple (scopes, audience). |
| """ |
| raise NotImplementedError() |
| |
| @auth.public # auth happens in bot_auth.authenticate_bot() |
| def post(self): |
| request = self.parse_body() |
| logging.debug('Request body: %s', request) |
| msg = log_unexpected_subset_keys(self.ACCEPTED_KEYS, self.REQUIRED_KEYS, |
| request, self.request, 'bot', 'keys') |
| if msg: |
| self.abort_with_error(400, error=msg) |
| |
| account_id = request['account_id'] |
| # TODO(crbug.com/1015701): take from X-Luci-Swarming-Bot-ID header. |
| bot_id = request['id'] |
| task_id = request.get('task_id') |
| |
| # Only two flavors of accounts are supported. |
| if account_id not in ('system', 'task'): |
| self.abort_with_error( |
| 400, error='Unknown "account_id", expecting "task" or "system"') |
| |
| # If using 'task' account, 'task_id' is required. We'll double check the bot |
| # still executes this task (based on data in datastore), and if so, will |
| # use a service account associated with this particular task. |
| if account_id == 'task' and not task_id: |
| self.abort_with_error( |
| 400, error='"task_id" is required when using "account_id" == "task"') |
| |
| # Validate fields specific to the requested token kind in the subclass. |
| scopes, audience = self.extract_token_params(request) |
| |
| # Make sure bot self-reported ID matches the authentication token. Raises |
| # auth.AuthorizationError if not. Also fetches corresponding BotGroupConfig |
| # that contains system service account email for this bot. |
| bot_group_cfg, _ = bot_auth.authenticate_bot(bot_id) |
| |
| # At this point, the request is valid structurally, and the bot used proper |
| # authentication when making it. |
| if self.TOKEN_KIND == service_accounts.TOKEN_KIND_ACCESS_TOKEN: |
| logging.info( |
| 'Requesting a "%s" access token with scopes %s', account_id, scopes) |
| elif self.TOKEN_KIND == service_accounts.TOKEN_KIND_ID_TOKEN: |
| logging.info( |
| 'Requesting a "%s" ID token with audience "%s"', account_id, audience) |
| else: |
| raise AssertionError('Unrecognized token kind %s' % self.TOKEN_KIND) |
| |
| # Check 'task_id' matches the task currently assigned to the bot. This is |
| # mostly a precaution against confused bot processes. We can always just use |
| # 'current_task_id' to look up per-task service account. Datastore is the |
| # source of truth here, not whatever bot reports. |
| if account_id == 'task': |
| current_task_id = None |
| bot_info = bot_management.get_info_key(bot_id).get(use_cache=False, |
| use_memcache=False) |
| if bot_info: |
| current_task_id = bot_info.task_id |
| if task_id != current_task_id: |
| logging.error( |
| 'Bot %s requested "task" token for task %s, but runs %s', |
| bot_id, task_id, current_task_id) |
| self.abort_with_error( |
| 400, error='Wrong task_id: the bot is not executing this task') |
| |
| account = None # an email or 'bot' or 'none' |
| token = None # service_accounts.AccessToken |
| try: |
| if account_id == 'task': |
| account, token = service_accounts.get_task_account_token( |
| task_id, bot_id, |
| self.TOKEN_KIND, |
| scopes=scopes, audience=audience) |
| elif account_id == 'system': |
| account, token = service_accounts.get_system_account_token( |
| bot_group_cfg.system_service_account, |
| self.TOKEN_KIND, |
| scopes=scopes, audience=audience) |
| else: |
| raise AssertionError('Impossible, there is a check above') |
| except service_accounts.PermissionError as exc: |
| self.abort_with_error(403, error=str(exc)) |
| except service_accounts.MisconfigurationError as exc: |
| self.abort_with_error(400, error=str(exc)) |
| except service_accounts.InternalError as exc: |
| self.abort_with_error(500, error=str(exc)) |
| |
| # Note: the token info is already logged by service_accounts.get_*_token. |
| if token: |
| self.send_response({ |
| 'service_account': account, |
| self.TOKEN_RESPONSE_KEY: token.access_token, |
| 'expiry': token.expiry, |
| }) |
| else: |
| assert account in ('bot', 'none'), account |
| self.send_response({'service_account': account}) |
| |
| |
| class BotOAuthTokenHandler(_BotTokenHandler): |
| """Called when bot wants to get a service account OAuth access token. |
| |
| There are two flavors of service accounts the bot may use: |
| * 'system': this account is associated directly with the bot (in bots.cfg), |
| and can be used at any time (when running a task or not). |
| * 'task': this account is associated with the task currently executing on |
| the bot, and may be used only when bot is actually running this task. |
| |
| The flavor of account is specified via 'account_id' request field. See |
| ACCEPTED_KEYS for format of other keys. |
| |
| The returned token is expected to be alive for at least ~5 min, but can live |
| longer (but no longer than ~1h). In general assume the token is short-lived. |
| |
| Multiple bots may share exact same access token if their configuration match |
| (the token is cached by Swarming for performance reasons). |
| |
| Besides the token, the response also contains the actual service account |
| email (if it is really configured), or two special strings in place of the |
| email: |
| * "none" if the bot is not configured to use service accounts at all. |
| * "bot" if the bot should use tokens produced by bot_config.py hook. |
| |
| The response body on success is a JSON dict: |
| { |
| "service_account": <str email> or "none" or "bot", |
| "access_token": <str with actual token (if account is configured)>, |
| "expiry": <int with unix timestamp in seconds (if account is configured)> |
| } |
| |
| May also return: |
| HTTP 400 - on a bad request or if the service account is misconfigured. |
| HTTP 403 - if the caller is not allowed to use the service account. |
| HTTP 500 - on retriable transient errors. |
| """ |
| TOKEN_KIND = service_accounts.TOKEN_KIND_ACCESS_TOKEN |
| TOKEN_RESPONSE_KEY = 'access_token' |
| |
| ACCEPTED_KEYS = { |
| u'account_id', # 'system' or 'task' |
| u'id', # bot ID |
| u'scopes', # list of requested OAuth scopes |
| u'task_id', # optional task ID, required if using 'task' account |
| u'session', # the session token |
| } |
| REQUIRED_KEYS = {u'account_id', u'id', u'scopes'} |
| |
| def extract_token_params(self, request): |
| scopes = request['scopes'] |
| if (not scopes or not isinstance(scopes, list) or |
| not all(isinstance(s, basestring) for s in scopes)): |
| self.abort_with_error(400, error='"scopes" must be a list of strings') |
| return scopes, None |
| |
| |
| class BotIDTokenHandler(_BotTokenHandler): |
| """Called when bot wants to get a service account ID token. |
| |
| Similar to BotOAuthTokenHandler, except returns ID tokens instead of OAuth |
| tokens. See BotOAuthTokenHandler doc for details. |
| |
| The response body on success is a JSON dict: |
| { |
| "service_account": <str email> or "none" or "bot", |
| "id_token": <str with actual token (if account is configured)>, |
| "expiry": <int with unix timestamp in seconds (if account is configured)> |
| } |
| |
| May also return: |
| HTTP 400 - on a bad request or if the service account is misconfigured. |
| HTTP 403 - if the caller is not allowed to use the service account. |
| HTTP 500 - on retriable transient errors. |
| """ |
| TOKEN_KIND = service_accounts.TOKEN_KIND_ID_TOKEN |
| TOKEN_RESPONSE_KEY = 'id_token' |
| |
| ACCEPTED_KEYS = { |
| u'account_id', # 'system' or 'task' |
| u'id', # bot ID |
| u'audience', # the string audience to put into the token |
| u'task_id', # optional task ID, required if using 'task' account |
| u'session', # the session token |
| } |
| REQUIRED_KEYS = {u'account_id', u'id', u'audience'} |
| |
| def extract_token_params(self, request): |
| audience = request['audience'] |
| if not audience or not isinstance(audience, basestring): |
| self.abort_with_error(400, error='"audience" must be a string') |
| return None, audience |
| |
| |
| ### Bot Task API RPC handlers |
| |
| |
| class BotTaskUpdateHandler(_BotApiHandler): |
| """Receives updates from a Bot for a task. |
| |
| The handler verifies packets are processed in order and will refuse |
| out-of-order packets. |
| """ |
| ACCEPTED_KEYS = { |
| u'bot_overhead', |
| u'cache_trim_stats', |
| u'cas_output_root', |
| u'cipd_pins', |
| u'cipd_stats', |
| u'cleanup_stats', |
| u'cost_usd', |
| u'duration', |
| u'exit_code', |
| u'hard_timeout', |
| u'id', |
| u'io_timeout', |
| u'isolated_stats', |
| u'named_caches_stats', |
| u'output', |
| u'output_chunk_start', |
| u'session', |
| u'task_id', |
| } |
| REQUIRED_KEYS = {u'id', u'task_id'} |
| |
| @decorators.silence(apiproxy_errors.RPCFailedError) |
| @auth.public # auth happens in bot_auth.authenticate_bot() |
| def post(self, task_id=None): |
| # Unlike handshake and poll, we do not accept invalid keys here. This code |
| # path is much more strict. |
| |
| request = self.parse_body() |
| msg = log_unexpected_subset_keys(self.ACCEPTED_KEYS, self.REQUIRED_KEYS, |
| request, self.request, 'bot', 'keys') |
| if msg: |
| self.abort_with_error(400, error=msg) |
| |
| # TODO(crbug.com/1015701): take from X-Luci-Swarming-Bot-ID header. |
| bot_id = request['id'] |
| task_id = request['task_id'] |
| |
| # Make sure bot self-reported ID matches the authentication token. Raises |
| # auth.AuthorizationError if not. |
| bot_auth.authenticate_bot(bot_id) |
| |
| bot_overhead = request.get('bot_overhead') |
| cipd_pins = request.get('cipd_pins') |
| cipd_stats = request.get('cipd_stats') |
| cost_usd = request.get('cost_usd', 0) |
| duration = request.get('duration') |
| exit_code = request.get('exit_code') |
| hard_timeout = request.get('hard_timeout') |
| io_timeout = request.get('io_timeout') |
| isolated_stats = request.get('isolated_stats') |
| cache_trim_stats = request.get('cache_trim_stats') |
| named_caches_stats = request.get('named_caches_stats') |
| cleanup_stats = request.get('cleanup_stats') |
| output = request.get('output') |
| output_chunk_start = request.get('output_chunk_start') |
| cas_output_root = request.get('cas_output_root') |
| canceled = request.get('canceled') |
| |
| if (isolated_stats or cipd_stats) and bot_overhead is None: |
| ereporter2.log_request( |
| request=self.request, |
| source='server', |
| category='task_failure', |
| message='Failed to update task: %s' % task_id) |
| self.abort_with_error( |
| 400, |
| error='isolated_stats and cipd_stats require bot_overhead to be set' |
| '\nbot_overhead: %s\nisolate_stats: %s' % |
| (bot_overhead, isolated_stats)) |
| |
| run_result_key = task_pack.unpack_run_result_key(task_id) |
| performance_stats = None |
| if bot_overhead is not None: |
| performance_stats = task_result.PerformanceStats( |
| bot_overhead=bot_overhead) |
| if isolated_stats: |
| download = isolated_stats.get('download') or {} |
| upload = isolated_stats.get('upload') or {} |
| |
| def unpack_base64(d, k): |
| x = d.get(k) |
| if x: |
| return base64.b64decode(x) |
| |
| performance_stats.isolated_download = task_result.CASOperationStats( |
| duration=download.get('duration'), |
| initial_number_items=download.get('initial_number_items'), |
| initial_size=download.get('initial_size'), |
| items_cold=unpack_base64(download, 'items_cold'), |
| items_hot=unpack_base64(download, 'items_hot')) |
| performance_stats.isolated_upload = task_result.CASOperationStats( |
| duration=upload.get('duration'), |
| items_cold=unpack_base64(upload, 'items_cold'), |
| items_hot=unpack_base64(upload, 'items_hot')) |
| if cipd_stats: |
| performance_stats.package_installation = task_result.OperationStats( |
| duration=cipd_stats.get('duration')) |
| if cache_trim_stats: |
| performance_stats.cache_trim = task_result.OperationStats( |
| duration=cache_trim_stats.get('duration')) |
| if named_caches_stats: |
| install = named_caches_stats.get('install', {}) |
| uninstall = named_caches_stats.get('uninstall', {}) |
| performance_stats.named_caches_install = task_result.OperationStats( |
| duration=install.get('duration')) |
| performance_stats.named_caches_uninstall = task_result.OperationStats( |
| duration=uninstall.get('duration')) |
| if cleanup_stats: |
| performance_stats.cleanup = task_result.OperationStats( |
| duration=cleanup_stats.get('duration')) |
| |
| if output is not None: |
| try: |
| output = base64.b64decode(output) |
| except UnicodeEncodeError as e: |
| logging.error('Failed to decode output\n%s\n%r', e, output) |
| output = output.encode('ascii', 'replace') |
| except TypeError as e: |
| # Save the output as-is instead. The error will be logged in ereporter2 |
| # and returning a HTTP 500 would only force the bot to stay in a retry |
| # loop. |
| logging.error('Failed to decode output\n%s\n%r', e, output) |
| if cas_output_root: |
| cas_output_root = task_request.CASReference( |
| cas_instance=cas_output_root['cas_instance'], |
| digest=task_request.Digest(**cas_output_root['digest'])) |
| |
| if cipd_pins: |
| cipd_pins = task_result.CipdPins( |
| client_package=task_request.CipdPackage( |
| **cipd_pins['client_package']), |
| packages=[ |
| task_request.CipdPackage(**args) for args in cipd_pins['packages'] |
| ]) |
| |
| # Notify the Swarming scheduler this bot is still alive. Don't do this for |
| # bots in pure RBE mode: they aren't using Swarming scheduler. Note that |
| # the bot doesn't send its pools with this RPC, so we either need to |
| # fetch them from the datastore or from configs. We use configs. |
| bot_group_cfg = bot_groups_config.get_bot_group_config(bot_id) |
| if bot_group_cfg: |
| pools = bot_group_cfg.dimensions.get('pool') |
| rbe_cfg = rbe.get_rbe_config_for_bot(bot_id, pools) |
| pure_rbe = rbe_cfg and not rbe_cfg.hybrid_mode |
| if not pure_rbe: |
| logging.debug('Keeping swarming queues alive') |
| task_queues.freshen_up_queues(bot_id) |
| |
| try: |
| state = task_scheduler.bot_update_task( |
| run_result_key=run_result_key, |
| bot_id=bot_id, |
| output=output, |
| output_chunk_start=output_chunk_start, |
| exit_code=exit_code, |
| duration=duration, |
| hard_timeout=hard_timeout, |
| io_timeout=io_timeout, |
| cost_usd=cost_usd, |
| cas_output_root=cas_output_root, |
| cipd_pins=cipd_pins, |
| performance_stats=performance_stats, |
| canceled=canceled) |
| if not state: |
| logging.info('Failed to update, please retry') |
| self.abort_with_error(500, error='Failed to update, please retry') |
| |
| if state in (task_result.State.COMPLETED, task_result.State.TIMED_OUT): |
| action = 'task_completed' |
| elif state == task_result.State.KILLED: |
| action = 'task_killed' |
| else: |
| assert state in (task_result.State.BOT_DIED, |
| task_result.State.RUNNING), state |
| action = 'task_update' |
| bot_management.bot_event( |
| event_type=action, |
| bot_id=bot_id, |
| external_ip=self.request.remote_addr, |
| authenticated_as=auth.get_peer_identity().to_bytes(), |
| task_id=task_id) |
| except ValueError as e: |
| ereporter2.log_request( |
| request=self.request, |
| source='server', |
| category='task_failure', |
| message='Failed to update task: %s' % e) |
| self.abort_with_error(400, error=str(e)) |
| except webob.exc.HTTPException: |
| raise |
| except Exception as e: |
| logging.exception('Internal error: %s', e) |
| self.abort_with_error(500, error=str(e)) |
| # - BOT_DIED will occur when the following conditions are true: |
| # - The bot polled correctly, but then stopped updating for at least |
| # task_result.BOT_PING_TOLERANCE. (It can occur if the host went to |
| # sleep, or the OS was overwhelmed). |
| # - /internal/cron/abort_bot_died runs, detects the bot is MIA, kills the |
| # task. |
| # - Bot wakes up, starts sending updates again. |
| # - KILLED is when the client uses the kill API to forcibly stop a running |
| # task. |
| must_stop = state in (task_result.State.BOT_DIED, task_result.State.KILLED) |
| if must_stop: |
| logging.info('asking bot to kill the task') |
| self.send_response({'must_stop': must_stop, 'ok': True}) |
| |
| |
| class BotTaskErrorHandler(_BotApiHandler): |
| """It is a specialized version of ereporter2's /ereporter2/api/v1/on_error |
| that also attaches a task id to it. |
| |
| This formally terminates the task, marking it as an internal failure. |
| This can be used by bot_main.py to kill the task when task_runner misbehaved. |
| """ |
| |
| ACCEPTED_KEYS = { |
| u'id', |
| u'message', |
| u'task_id', |
| u'client_error', |
| u'session', |
| } |
| REQUIRED_KEYS = {u'id', u'task_id'} |
| |
| @auth.public # auth happens in bot_auth.authenticate_bot |
| def post(self, task_id=None): |
| start_time = utils.utcnow() |
| request = self.parse_body() |
| # TODO(crbug.com/1015701): take from X-Luci-Swarming-Bot-ID header. |
| bot_id = request.get('id') |
| task_id = request.get('task_id', '') |
| message = request.get('message', 'unknown') |
| client_errors = request.get('client_error') |
| |
| # Make sure bot self-reported ID matches the authentication token. Raises |
| # auth.AuthorizationError if not. |
| bot_auth.authenticate_bot(bot_id) |
| |
| bot_management.bot_event( |
| event_type='task_error', |
| bot_id=bot_id, |
| external_ip=self.request.remote_addr, |
| authenticated_as=auth.get_peer_identity().to_bytes(), |
| event_msg=message, |
| task_id=task_id) |
| line = ('Bot: https://%s/restricted/bot/%s\n' |
| 'Task failed: https://%s/user/task/%s\n' |
| '%s') % (app_identity.get_default_version_hostname(), bot_id, |
| app_identity.get_default_version_hostname(), task_id, |
| message) |
| ereporter2.log_request(self.request, source='bot', message=line) |
| |
| msg = log_unexpected_subset_keys(self.ACCEPTED_KEYS, self.REQUIRED_KEYS, |
| request, self.request, 'bot', 'keys') |
| if msg: |
| self.abort_with_error(400, error=msg) |
| msg = task_scheduler.bot_terminate_task( |
| task_pack.unpack_run_result_key(task_id), bot_id, start_time, |
| client_errors) |
| if msg: |
| logging.error(msg) |
| self.abort_with_error(400, error=msg) |
| self.send_response({}) |
| |
| |
| def get_routes(): |
| routes = [] |
| |
| # Add a copy of the route under "/python/...". This is needed to allow setup |
| # traffic routing between Python and Go services. Requests that must be |
| # executed by Python will be prefixed by "/python/...". |
| def add(route, handler): |
| routes.extend([ |
| (route, handler), |
| ('/python' + route, handler), |
| ]) |
| |
| # Generic handlers (no auth) |
| add('/swarming/api/v1/bot/server_ping', ServerPingHandler) |
| |
| # Bot code (bootstrap and swarming_bot.zip) handlers |
| add('/bootstrap', BootstrapHandler) |
| add('/bot_code', BotCodeHandler) |
| # 40 for old sha1 digest so old bot can still update, 64 for current |
| # sha256 digest. |
| add('/swarming/api/v1/bot/bot_code/<version:[0-9a-f]{40,64}>', BotCodeHandler) |
| |
| # Bot API RPCs |
| |
| # Bot Session API RPC handlers |
| add('/swarming/api/v1/bot/handshake', BotHandshakeHandler) |
| add('/swarming/api/v1/bot/poll', BotPollHandler) |
| add('/swarming/api/v1/bot/claim', BotClaimHandler) |
| add('/swarming/api/v1/bot/event', BotEventHandler) |
| |
| # Bot Security API RPC handlers |
| add('/swarming/api/v1/bot/oauth_token', BotOAuthTokenHandler) |
| add('/swarming/api/v1/bot/id_token', BotIDTokenHandler) |
| |
| # Bot Task API RPC handlers |
| add('/swarming/api/v1/bot/task_update', BotTaskUpdateHandler) |
| add('/swarming/api/v1/bot/task_update/<task_id:[a-f0-9]+>', |
| BotTaskUpdateHandler) |
| add('/swarming/api/v1/bot/task_error', BotTaskErrorHandler) |
| add('/swarming/api/v1/bot/task_error/<task_id:[a-f0-9]+>', |
| BotTaskErrorHandler) |
| |
| return [webapp2.Route(*i) for i in routes] |