| # Copyright 2015 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """This module defines Swarming Server endpoints handlers.""" |
| |
| import datetime |
| import json |
| import logging |
| import os |
| import re |
| |
| from google.appengine.api import app_identity |
| from google.appengine.api import datastore_errors |
| from google.appengine.api import memcache |
| from google.appengine.ext import ndb |
| |
| import endpoints |
| import gae_ts_mon |
| from protorpc import messages |
| from protorpc import message_types |
| from protorpc import protojson |
| from protorpc import remote |
| |
| from components import auth |
| from components import datastore_utils |
| from components import endpoints_webapp2 |
| from components import utils |
| |
| import message_conversion |
| import swarming_rpcs |
| from server import acl |
| from server import bot_code |
| from server import bot_management |
| from server import config |
| from server import pools_config |
| from server import realms |
| from server import service_accounts |
| from server import service_accounts_utils |
| from server import task_pack |
| from server import task_queues |
| from server import task_request |
| from server import task_result |
| from server import task_scheduler |
| |
| |
| ### Helper Methods |
| |
| |
| # Used by _get_task_request_async(), clearer than using True/False and important |
| # as this is part of the security boundary. |
| _CANCEL = object() |
| _VIEW = object() |
| |
| |
| # Add support for BooleanField in protorpc in endpoints GET requests. |
| _old_decode_field = protojson.ProtoJson.decode_field |
| def _decode_field(self, field, value): |
| if (isinstance(field, messages.BooleanField) and |
| isinstance(value, basestring)): |
| return value.lower() == 'true' |
| return _old_decode_field(self, field, value) |
| protojson.ProtoJson.decode_field = _decode_field |
| |
| |
| def _to_keys(task_id): |
| """Returns request and result keys, handling failure.""" |
| try: |
| return task_pack.get_request_and_result_keys(task_id) |
| except ValueError: |
| raise endpoints.BadRequestException('%s is an invalid key.' % task_id) |
| |
| |
| @ndb.tasklet |
| def _get_task_request_async(task_id, request_key, viewing): |
| """Returns the TaskRequest corresponding to a task ID. |
| |
| Enforces the ACL for users. Allows bots all access for the moment. |
| |
| Returns: |
| TaskRequest instance. |
| """ |
| request = yield request_key.get_async() |
| if not request: |
| raise endpoints.NotFoundException('%s not found.' % task_id) |
| if viewing == _VIEW: |
| realms.check_task_get_acl(request) |
| elif viewing == _CANCEL: |
| realms.check_task_cancel_acl(request) |
| else: |
| raise endpoints.InternalServerErrorException('_get_task_request_async()') |
| raise ndb.Return(request) |
| |
| |
| def _get_request_and_result(task_id, viewing, trust_memcache): |
| """Returns the TaskRequest and task result corresponding to a task ID. |
| |
| For the task result, first do an explict lookup of the caches, and then decide |
| if it is necessary to fetch from the DB. |
| |
| Arguments: |
| task_id: task ID as provided by the user. |
| viewing: one of _CANCEL or _VIEW |
| trust_memcache: bool to state if memcache should be trusted for running |
| task. If False, when a task is still pending/running, do a DB fetch. |
| |
| Returns: |
| tuple(TaskRequest, result): result can be either for a TaskRunResult or a |
| TaskResultSummay. |
| """ |
| request_key, result_key = _to_keys(task_id) |
| # The task result has a very high odd of taking much more time to fetch than |
| # the TaskRequest, albeit it is the TaskRequest that enforces ACL. Do the task |
| # result fetch first, the worst that will happen is unnecessarily fetching the |
| # task result. |
| result_future = result_key.get_async( |
| use_cache=True, use_memcache=True, use_datastore=False) |
| |
| # The TaskRequest has P(99.9%) chance of being fetched from memcache since it |
| # is immutable. |
| request_future = _get_task_request_async(task_id, request_key, viewing) |
| |
| result = result_future.get_result() |
| if (not result or |
| (result.state in task_result.State.STATES_RUNNING and not |
| trust_memcache)): |
| # Either the entity is not in cache, or we don't trust memcache for a |
| # running task result. Do the DB fetch, which is slow. |
| result = result_key.get( |
| use_cache=False, use_memcache=False, use_datastore=True) |
| |
| request = request_future.get_result() |
| |
| if not result: |
| raise endpoints.NotFoundException('%s not found.' % task_id) |
| return request, result |
| |
| |
| def get_or_raise(key): |
| """Returns an entity or raises an endpoints exception if it does not exist.""" |
| result = key.get() |
| if not result: |
| raise endpoints.NotFoundException('%s not found.' % key.id()) |
| return result |
| |
| |
| def apply_server_property_defaults(properties): |
| """Fills ndb task properties with default values read from server settings. |
| |
| Essentially: |
| - If a property is set by the task explicitly, use that. Else: |
| - If a property is set by the task's template, use that. Else: |
| - If a property is set by the server's settings.cfg, use that. |
| """ |
| settings = config.settings() |
| # TODO(iannucci): This was an artifact of the existing test harnesses; |
| # get_pool_config raises on None, but the way it's mocked in |
| # ./test_env_handlers.py allows `get_pool_config` to return None in this case. |
| # This try/except will be cleaned up in a subsequent CL, once I remove these |
| # default services from `config`. |
| try: |
| pool_cfg = pools_config.get_pool_config(properties.pool) |
| except ValueError: |
| pool_cfg = None |
| if not settings and not pool_cfg: |
| return |
| |
| _apply_isolate_server_defaults(properties, settings, pool_cfg) |
| _apply_cipd_defaults(properties, settings, pool_cfg) |
| |
| |
| def _apply_isolate_server_defaults(properties, settings, pool_cfg): |
| # Do not set Isolate server defaults when CAS input is set. |
| if properties.cas_input_root: |
| return |
| |
| iso_server = settings.isolate.default_server |
| iso_ns = settings.isolate.default_namespace |
| if pool_cfg and pool_cfg.default_isolate: |
| iso_server = pool_cfg.default_isolate.server |
| iso_ns = pool_cfg.default_isolate.namespace |
| |
| if iso_server and iso_ns: |
| properties.inputs_ref = properties.inputs_ref or task_request.FilesRef() |
| properties.inputs_ref.isolatedserver = ( |
| properties.inputs_ref.isolatedserver or iso_server) |
| properties.inputs_ref.namespace = ( |
| properties.inputs_ref.namespace or iso_ns) |
| |
| |
| def _apply_cipd_defaults(properties, settings, pool_cfg): |
| cipd_server = settings.cipd.default_server |
| cipd_client = settings.cipd.default_client_package.package_name |
| cipd_vers = settings.cipd.default_client_package.version |
| if pool_cfg and pool_cfg.default_cipd: |
| cipd_server = pool_cfg.default_cipd.server |
| cipd_client = pool_cfg.default_cipd.package_name |
| cipd_vers = pool_cfg.default_cipd.client_version |
| |
| if cipd_server and properties.cipd_input: |
| properties.cipd_input.server = ( |
| properties.cipd_input.server or cipd_server) |
| properties.cipd_input.client_package = ( |
| properties.cipd_input.client_package or task_request.CipdPackage()) |
| # TODO(iannucci) - finish removing 'client_package' as a task-configurable |
| # setting. |
| properties.cipd_input.client_package.package_name = ( |
| properties.cipd_input.client_package.package_name or cipd_client) |
| properties.cipd_input.client_package.version = ( |
| properties.cipd_input.client_package.version or cipd_vers) |
| |
| |
| ### API |
| |
| |
| swarming_api = auth.endpoints_api( |
| name='swarming', |
| version='v1', |
| description= |
| 'API to interact with the Swarming service. Permits to create, ' |
| 'view and cancel tasks, query tasks and bots') |
| |
| |
| PermissionsRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| bot_id=messages.StringField(1), |
| task_id=messages.StringField(2), |
| tags=messages.StringField(3, repeated=True), |
| ) |
| |
| |
| @swarming_api.api_class(resource_name='server', path='server') |
| class SwarmingServerService(remote.Service): |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| message_types.VoidMessage, swarming_rpcs.ServerDetails, |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def details(self, _request): |
| """Returns information about the server.""" |
| host = 'https://' + os.environ['HTTP_HOST'] |
| |
| cfg = config.settings() |
| isolate, _cipd = pools_config.get_default_external_services() |
| |
| default_isolate_server = cfg.isolate.default_server |
| default_isolate_namespace = cfg.isolate.default_namespace |
| if isolate: |
| default_isolate_server = isolate.server |
| default_isolate_namespace = isolate.namespace |
| |
| server_version = utils.get_app_version() |
| chops_git_version = utils.get_chops_git_version() |
| |
| # chops_git_version contains the chops git version |
| # eg. '5629-2cfcb6'. |
| # In cases where the Appengine version is set to the |
| # chops git version chops_git_version will be identical |
| # to server_version. |
| if not chops_git_version: |
| chops_git_version = server_version |
| |
| return swarming_rpcs.ServerDetails( |
| bot_version=bot_code.get_bot_version(host)[0], |
| project_id=app_identity.get_application_id(), |
| server_version=server_version, |
| chops_git_version=chops_git_version, |
| display_server_url_template=cfg.display_server_url_template, |
| luci_config=config.config.config_service_hostname(), |
| default_isolate_server=default_isolate_server, |
| default_isolate_namespace=default_isolate_namespace, |
| cas_viewer_server=cfg.cas.viewer_server) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| message_types.VoidMessage, swarming_rpcs.BootstrapToken) |
| @auth.require(acl.can_create_bot, log_identity=True) |
| def token(self, _request): |
| """Returns a token to bootstrap a new bot. |
| |
| This may seem strange to be a POST and not a GET, but it's very |
| important to make sure GET requests are idempotent and safe |
| to be pre-fetched; generating a token is neither of those things. |
| """ |
| return swarming_rpcs.BootstrapToken( |
| bootstrap_token = bot_code.generate_bootstrap_token(), |
| ) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| PermissionsRequest, swarming_rpcs.ClientPermissions, http_method='GET') |
| @auth.public |
| def permissions(self, request): |
| """Returns the caller's permissions.""" |
| pools_list_bots = [ |
| p for p in pools_config.known() if realms.can_list_bots(p) |
| ] |
| pools_list_tasks = [ |
| p for p in pools_config.known() if realms.can_list_tasks(p) |
| ] |
| return swarming_rpcs.ClientPermissions( |
| delete_bot=acl.can_delete_bot(), |
| terminate_bot=realms.can_terminate_bot(request.bot_id), |
| get_configs=acl.can_view_config(), |
| put_configs=acl.can_edit_config(), |
| cancel_task=self._can_cancel_task(request.task_id), |
| cancel_tasks=realms.can_cancel_tasks( |
| bot_management.get_pools_from_dimensions_flat(request.tags)), |
| get_bootstrap_token=acl.can_create_bot(), |
| list_bots=pools_list_bots, |
| list_tasks=pools_list_tasks) |
| |
| def _can_cancel_task(self, task_id): |
| if not task_id: |
| # TODO(crbug.com/1066839): |
| # This is for the compatibility until Web clients send task_id. |
| # return False if task_id is not given. |
| return acl._is_privileged_user() |
| task_key, _ = _to_keys(task_id) |
| task = task_key.get() |
| if not task: |
| raise endpoints.NotFoundException('%s not found.' % task_id) |
| return realms.can_cancel_task(task) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| message_types.VoidMessage, swarming_rpcs.FileContent, |
| http_method='GET') |
| @auth.require(acl.can_view_config, log_identity=True) |
| def get_bootstrap(self, _request): |
| """Retrieves the current version of bootstrap.py.""" |
| obj = bot_code.get_bootstrap('', '') |
| return swarming_rpcs.FileContent( |
| content=obj.content.decode('utf-8'), |
| who=obj.who, |
| when=obj.when, |
| version=obj.version) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| message_types.VoidMessage, swarming_rpcs.FileContent, |
| http_method='GET') |
| @auth.require(acl.can_view_config, log_identity=True) |
| def get_bot_config(self, _request): |
| """Retrieves the current version of bot_config.py.""" |
| obj, _ = bot_code.get_bot_config() |
| return swarming_rpcs.FileContent( |
| content=obj.content.decode('utf-8'), |
| who=obj.who, |
| when=obj.when, |
| version=obj.version) |
| |
| |
| TaskId = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| task_id=messages.StringField(1, required=True)) |
| |
| |
| TaskIdWithPerf = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| task_id=messages.StringField(1, required=True), |
| include_performance_stats=messages.BooleanField(2, default=False)) |
| |
| |
| TaskCancel = endpoints.ResourceContainer( |
| swarming_rpcs.TaskCancelRequest, |
| task_id=messages.StringField(1, required=True)) |
| |
| |
| TaskIdWithOffset = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| task_id=messages.StringField(1, required=True), |
| offset=messages.IntegerField(2, default=0), |
| length=messages.IntegerField(3, default=0)) |
| |
| |
| @swarming_api.api_class(resource_name='task', path='task') |
| class SwarmingTaskService(remote.Service): |
| """Swarming's task-related API.""" |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TaskIdWithPerf, swarming_rpcs.TaskResult, |
| name='result', |
| path='{task_id}/result', |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def result(self, request): |
| """Reports the result of the task corresponding to a task ID. |
| |
| It can be a 'run' ID specifying a specific retry or a 'summary' ID hidding |
| the fact that a task may have been retried transparently, when a bot reports |
| BOT_DIED. |
| |
| A summary ID ends with '0', a run ID ends with '1' or '2'. |
| """ |
| logging.debug('%s', request) |
| # Workaround a bug in ndb where if a memcache set fails, a stale copy can be |
| # kept in memcache indefinitely. In the case of task result, if this happens |
| # on the very last store where the task is saved to NDB to be marked as |
| # completed, and that the DB store succeeds *but* the memcache update fails, |
| # this API will *always* return the stale version. |
| try: |
| _, result = _get_request_and_result(request.task_id, _VIEW, False) |
| except ValueError: |
| raise endpoints.BadRequestException('Invalid task ID') |
| return message_conversion.task_result_to_rpc( |
| result, request.include_performance_stats) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TaskId, swarming_rpcs.TaskRequest, |
| name='request', |
| path='{task_id}/request', |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def request(self, request): |
| """Returns the task request corresponding to a task ID.""" |
| logging.debug('%s', request) |
| request_key, _ = _to_keys(request.task_id) |
| request_obj = _get_task_request_async( |
| request.task_id, request_key, _VIEW).get_result() |
| return message_conversion.task_request_to_rpc(request_obj) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TaskCancel, swarming_rpcs.CancelResponse, |
| name='cancel', |
| path='{task_id}/cancel') |
| @auth.require(acl.can_access, log_identity=True) |
| def cancel(self, request): |
| """Cancels a task. |
| |
| If a bot was running the task, the bot will forcibly cancel the task. |
| """ |
| logging.debug('request %s', request) |
| request_key, result_key = _to_keys(request.task_id) |
| request_obj = _get_task_request_async(request.task_id, request_key, |
| _CANCEL).get_result() |
| ok, was_running = task_scheduler.cancel_task( |
| request_obj, result_key, request.kill_running or False, None) |
| return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TaskIdWithOffset, swarming_rpcs.TaskOutput, |
| name='stdout', |
| path='{task_id}/stdout', |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def stdout(self, request): |
| """Returns the output of the task corresponding to a task ID.""" |
| logging.debug('%s', request) |
| if not request.length: |
| # Maximum content fetched at once, mostly for compatibility with previous |
| # behavior. pRPC implementation should limit to a multiple of CHUNK_SIZE |
| # (one or two?) for efficiency. |
| request.length = 16*1000*1024 |
| _, result = _get_request_and_result(request.task_id, _VIEW, True) |
| output = result.get_output(request.offset or 0, request.length) |
| if output: |
| # That was an error, don't do that in pRPC: |
| output = output.decode('utf-8', 'replace') |
| return swarming_rpcs.TaskOutput( |
| output=output, |
| state=swarming_rpcs.TaskState(result.state)) |
| |
| |
| TasksRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| limit=messages.IntegerField(1, default=200), |
| cursor=messages.StringField(2), |
| # These should be DateTimeField but endpoints + protorpc have trouble |
| # encoding this message in a GET request, this is due to DateTimeField's |
| # special encoding in protorpc-1.0/protorpc/message_types.py that is |
| # bypassed when using endpoints-1.0/endpoints/protojson.py to add GET query |
| # parameter support. |
| end=messages.FloatField(3), |
| start=messages.FloatField(4), |
| state=messages.EnumField(swarming_rpcs.TaskStateQuery, 5, default='ALL'), |
| tags=messages.StringField(6, repeated=True), |
| sort=messages.EnumField(swarming_rpcs.TaskSort, 7, default='CREATED_TS'), |
| include_performance_stats=messages.BooleanField(8, default=False)) |
| |
| |
| TaskStatesRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| task_id=messages.StringField(1, repeated=True)) |
| |
| |
| TasksCountRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| end=messages.FloatField(3), |
| start=messages.FloatField(4), |
| state=messages.EnumField(swarming_rpcs.TaskStateQuery, 5, default='ALL'), |
| tags=messages.StringField(6, repeated=True)) |
| |
| |
| @swarming_api.api_class(resource_name='tasks', path='tasks') |
| class SwarmingTasksService(remote.Service): |
| """Swarming's tasks-related API.""" |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| swarming_rpcs.NewTaskRequest, swarming_rpcs.TaskRequestMetadata) |
| @auth.require( |
| acl.can_create_task, 'User cannot create tasks.', log_identity=True) |
| def new(self, request): |
| """Creates a new task. |
| |
| The task will be enqueued in the tasks list and will be executed at the |
| earliest opportunity by a bot that has at least the dimensions as described |
| in the task request. |
| """ |
| sb = (request.properties.secret_bytes |
| if request.properties is not None else None) |
| if sb is not None: |
| request.properties.secret_bytes = "HIDDEN" |
| logging.debug('%s', request) |
| if sb is not None: |
| request.properties.secret_bytes = sb |
| |
| try: |
| request_obj, secret_bytes, template_apply = ( |
| message_conversion.new_task_request_from_rpc( |
| request, utils.utcnow())) |
| |
| # Retrieve pool_cfg, and check the existence. |
| pool = request_obj.pool |
| pool_cfg = pools_config.get_pool_config(pool) |
| if not pool_cfg: |
| logging.warning('Pool "%s" is not in pools.cfg', pool) |
| # TODO(crbug.com/1086058): It currently returns 403 Forbidden, but |
| # should return 400 BadRequest or 422 Unprocessable Entity, instead. |
| raise auth.AuthorizationError( |
| 'Can\'t submit tasks to pool "%s", not defined in pools.cfg' % pool) |
| |
| task_request.init_new_request( |
| request_obj, acl.can_schedule_high_priority_tasks(), |
| template_apply) |
| for index in range(request_obj.num_task_slices): |
| apply_server_property_defaults(request_obj.task_slice(index).properties) |
| # We need to call the ndb.Model pre-put check earlier because the |
| # following checks assume that the request itself is valid and could crash |
| # otherwise. |
| request_obj._pre_put_hook() |
| except (datastore_errors.BadValueError, TypeError, ValueError) as e: |
| logging.warning('Incorrect new task request', exc_info=True) |
| raise endpoints.BadRequestException(e.message) |
| |
| # TODO(crbug.com/1109378): Check ACLs before calling init_new_request to |
| # avoid leaking information about pool templates to unauthorized callers. |
| |
| # If the task request supplied a realm it means the task is in a realm-aware |
| # mode and it wants *all* realm ACLs to be enforced. Otherwise assume |
| # the task runs in pool_cfg.default_task_realm and enforce only permissions |
| # specified in enforced_realm_permissions pool config (using legacy ACLs |
| # for the rest). This should simplify the transition to realm ACLs. |
| enforce_realms_acl = False |
| if request_obj.realm: |
| logging.info('Using task realm %r', request_obj.realm) |
| enforce_realms_acl = True |
| elif pool_cfg.default_task_realm: |
| logging.info('Using default_task_realm %r', pool_cfg.default_task_realm) |
| request_obj.realm = pool_cfg.default_task_realm |
| else: |
| logging.info('Not using realms') |
| |
| # Warn if the pool has realms configured, but the task is using old ACLs. |
| if pool_cfg.realm and not request_obj.realm: |
| logging.warning( |
| 'crbug.com/1066839: %s: %r is not using realms', |
| pool, request_obj.name) |
| |
| # Realm permission 'swarming.pools.createInRealm' checks if the |
| # caller is allowed to create a task in the task realm. |
| request_obj.realms_enabled = realms.check_tasks_create_in_realm( |
| request_obj.realm, pool_cfg, enforce_realms_acl) |
| |
| # Realm permission 'swarming.pools.create' checks if the caller is allowed |
| # to create a task in the pool. |
| realms.check_pools_create_task(pool_cfg, enforce_realms_acl) |
| |
| # If the request has a service account email, check if the service account |
| # is allowed to run. |
| if service_accounts_utils.is_service_account(request_obj.service_account): |
| if not service_accounts.has_token_server(): |
| raise endpoints.BadRequestException( |
| 'This Swarming server doesn\'t support task service accounts ' |
| 'because Token Server URL is not configured') |
| |
| # Realm permission 'swarming.tasks.actAs' checks if the service account is |
| # allowed to run in the task realm. |
| realms.check_tasks_act_as(request_obj, pool_cfg, enforce_realms_acl) |
| |
| # If using legacy ACLs for service accounts, use the legacy mechanism to |
| # mint oauth token as well. Note that this path will be deprecated after |
| # migration to MintServiceAccountToken rpc which accepts realm. |
| # It contacts the token server to generate "OAuth token grant" (or grab a |
| # cached one). By doing this we check that the given service account usage |
| # is allowed by the token server rules at the time the task is posted. |
| # This check is also performed later (when running the task), when we get |
| # the actual OAuth access token. |
| if not request_obj.realms_enabled: |
| max_lifetime_secs = request_obj.max_lifetime_secs |
| try: |
| duration = datetime.timedelta(seconds=max_lifetime_secs) |
| request_obj.service_account_token = ( |
| service_accounts.get_oauth_token_grant( |
| service_account=request_obj.service_account, |
| validity_duration=duration)) |
| except service_accounts.PermissionError as exc: |
| raise auth.AuthorizationError(exc.message) |
| except service_accounts.MisconfigurationError as exc: |
| raise endpoints.BadRequestException(exc.message) |
| except service_accounts.InternalError as exc: |
| raise endpoints.InternalServerErrorException(exc.message) |
| |
| # If the user only wanted to evaluate scheduling the task, but not actually |
| # schedule it, return early without a task_id. |
| if request.evaluate_only: |
| request_obj._pre_put_hook() |
| return swarming_rpcs.TaskRequestMetadata( |
| request=message_conversion.task_request_to_rpc(request_obj)) |
| |
| if request.request_uuid and not re.match( |
| r'^[\da-fA-F]{8}-[\da-fA-F]{4}-[\da-fA-F]{4}-[\da-fA-F]{4}-' |
| r'[\da-fA-F]{12}$', request.request_uuid): |
| raise endpoints.BadRequestException( |
| 'invalid uuid is given as request_uuid') |
| |
| # TODO(crbug.com/997221): move this to task_scheduler.py after |
| # crbug.com/1018982. |
| request_idempotency_key = None |
| if request.request_uuid: |
| request_idempotency_key = 'request_id/%s/%s' % ( |
| request.request_uuid, auth.get_current_identity().to_bytes()) |
| |
| if request_idempotency_key: |
| # This check is for idempotency when creating new tasks. |
| # TODO(crbug.com/997221): Make idempotency robust. |
| # There is still possibility of duplicate task creation if requests with |
| # the same uuid are sent in a short period of time. |
| request_metadata = memcache.get( |
| request_idempotency_key, namespace='task_new') |
| if request_metadata is not None: |
| # request_obj does not have task_id, so need to delete before |
| # validation. |
| task_id_orig = request_metadata.request.task_id |
| request_metadata.request.task_id = None |
| if request_metadata.request != message_conversion.task_request_to_rpc( |
| request_obj): |
| logging.warning( |
| 'the same request_uuid value was reused for different task ' |
| 'requests') |
| |
| request_metadata.request.task_id = task_id_orig |
| logging.info('Reusing task %s with uuid %s', task_id_orig, |
| request.request_uuid) |
| return request_metadata |
| |
| try: |
| enable_resultdb = request.resultdb and request.resultdb.enable |
| if enable_resultdb and not request_obj.realms_enabled: |
| raise endpoints.BadRequestException( |
| 'ResultDB is enabled, but realm is not set') |
| |
| result_summary = task_scheduler.schedule_request(request_obj, |
| secret_bytes, |
| enable_resultdb) |
| except (datastore_errors.BadValueError, TypeError, ValueError) as e: |
| logging.exception("got exception around task_scheduler.schedule_request") |
| raise endpoints.BadRequestException(e.message) |
| |
| returned_result = message_conversion.task_result_to_rpc( |
| result_summary, False) |
| |
| request_metadata = swarming_rpcs.TaskRequestMetadata( |
| request=message_conversion.task_request_to_rpc(request_obj), |
| task_id=task_pack.pack_result_summary_key(result_summary.key), |
| task_result=returned_result) |
| |
| # TODO(crbug.com/997221): move this to task_scheduler.py after |
| # crbug.com/1018982. |
| if request_idempotency_key: |
| memcache.add( |
| request_idempotency_key, |
| request_metadata, |
| time=60 * 60, |
| namespace='task_new') |
| |
| return request_metadata |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TasksRequest, swarming_rpcs.TaskList, |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def list(self, request): |
| """Returns full task results based on the filters. |
| |
| This endpoint is significantly slower than 'count'. Use 'count' when |
| possible. If you just want the state of tasks, use 'get_states'. |
| """ |
| # TODO(maruel): Rename 'list' to 'results'. |
| # TODO(maruel): Rename 'TaskList' to 'TaskResults'. |
| logging.debug('%s', request) |
| |
| # Check permission. |
| # If the caller has global permission, it can access all tasks. |
| # Otherwise, it requires pool dimension to check ACL. |
| pools = bot_management.get_pools_from_dimensions_flat(request.tags) |
| realms.check_tasks_list_acl(pools) |
| |
| now = utils.utcnow() |
| try: |
| items, cursor = datastore_utils.fetch_page( |
| self._query_from_request(request), request.limit, request.cursor) |
| except ValueError as e: |
| raise endpoints.BadRequestException( |
| 'Inappropriate filter for tasks/list: %s' % e) |
| except datastore_errors.NeedIndexError as e: |
| logging.error('%s', e) |
| raise endpoints.BadRequestException( |
| 'Requires new index, ask admin to create one.') |
| except datastore_errors.BadArgumentError as e: |
| logging.error('%s', e) |
| raise endpoints.BadRequestException( |
| 'This combination is unsupported, sorry.') |
| return swarming_rpcs.TaskList( |
| cursor=cursor, |
| items=[ |
| message_conversion.task_result_to_rpc( |
| i, request.include_performance_stats) |
| for i in items |
| ], |
| now=now) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TaskStatesRequest, swarming_rpcs.TaskStates, |
| http_method='GET') |
| # TODO(martiniss): users should be able to view their state. This requires |
| # looking up each TaskRequest. |
| @auth.require(acl.can_view_all_tasks, log_identity=True) |
| def get_states(self, request): |
| """Returns task state for a specific set of tasks. |
| """ |
| logging.debug('%s', request) |
| result_keys = [_to_keys(task_id)[1] for task_id in request.task_id] |
| |
| # Hot path. Fetch everything we can from memcache. |
| entities = ndb.get_multi( |
| result_keys, use_cache=True, use_memcache=True, use_datastore=False) |
| states = [t.state if t else task_result.State.PENDING for t in entities] |
| # Now fetch both the ones in non-stable state or not in memcache. |
| missing_keys = [ |
| result_keys[i] for i, state in enumerate(states) |
| if state in task_result.State.STATES_RUNNING |
| ] |
| if missing_keys: |
| more = ndb.get_multi( |
| missing_keys, use_cache=False, use_memcache=False, use_datastore=True) |
| # This relies on missing_keys being in the same order as states (for |
| # common elements). |
| for i, s in enumerate(states): |
| if s in task_result.State.STATES_RUNNING: |
| states[i] = more.pop(0).state |
| |
| return swarming_rpcs.TaskStates( |
| states=[swarming_rpcs.TaskState(state) for state in states]) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TasksRequest, swarming_rpcs.TaskRequests, |
| http_method='GET') |
| @auth.require(acl.can_view_all_tasks, log_identity=True) |
| def requests(self, request): |
| """Returns tasks requests based on the filters. |
| |
| This endpoint is slightly slower than 'list'. Use 'list' or 'count' when |
| possible. |
| """ |
| logging.debug('%s', request) |
| if request.include_performance_stats: |
| raise endpoints.BadRequestException( |
| 'Can\'t set include_performance_stats for tasks/list') |
| now = utils.utcnow() |
| try: |
| # Get the TaskResultSummary keys, then fetch the corresponding |
| # TaskRequest entities. |
| keys, cursor = datastore_utils.fetch_page( |
| self._query_from_request(request), |
| request.limit, request.cursor, keys_only=True) |
| items = ndb.get_multi( |
| task_pack.result_summary_key_to_request_key(k) for k in keys) |
| except ValueError as e: |
| raise endpoints.BadRequestException( |
| 'Inappropriate filter for tasks/requests: %s' % e) |
| except datastore_errors.NeedIndexError as e: |
| logging.error('%s', e) |
| raise endpoints.BadRequestException( |
| 'Requires new index, ask admin to create one.') |
| except datastore_errors.BadArgumentError as e: |
| logging.error('%s', e) |
| raise endpoints.BadRequestException( |
| 'This combination is unsupported, sorry.') |
| return swarming_rpcs.TaskRequests( |
| cursor=cursor, |
| items=[message_conversion.task_request_to_rpc(i) for i in items], |
| now=now) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| swarming_rpcs.TasksCancelRequest, swarming_rpcs.TasksCancelResponse, |
| http_method='POST') |
| @auth.require(acl.can_access, log_identity=True) |
| def cancel(self, request): |
| """Cancel a subset of pending tasks based on the tags. |
| |
| Cancellation happens asynchronously, so when this call returns, |
| cancellations will not have completed yet. |
| """ |
| logging.debug('request %s', request) |
| if not request.tags: |
| # Prevent accidental cancellation of everything. |
| raise endpoints.BadRequestException( |
| 'You must specify tags when cancelling multiple tasks.') |
| |
| # Check permission. |
| # If the caller has global permission, it can access all tasks. |
| # Otherwise, it requires a pool tag to check ACL. |
| pools = bot_management.get_pools_from_dimensions_flat(request.tags) |
| realms.check_tasks_cancel_acl(pools) |
| |
| now = utils.utcnow() |
| cond = task_result.TaskResultSummary.state == task_result.State.PENDING |
| if request.kill_running: |
| cond = ndb.OR( |
| cond, |
| task_result.TaskResultSummary.state == task_result.State.RUNNING) |
| q = task_result.TaskResultSummary.query(cond).order( |
| task_result.TaskResultSummary.key) |
| for tag in request.tags: |
| q = q.filter(task_result.TaskResultSummary.tags == tag) |
| |
| tasks, cursor = datastore_utils.fetch_page(q, request.limit, request.cursor) |
| |
| if tasks: |
| payload = json.dumps( |
| { |
| 'tasks': [t.task_id for t in tasks], |
| 'kill_running': request.kill_running or False, |
| }) |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/tasks/cancel', 'cancel-tasks', |
| payload=payload) |
| if not ok: |
| raise endpoints.InternalServerErrorException( |
| 'Could not enqueue cancel request, try again later') |
| else: |
| logging.info('No tasks to cancel.') |
| |
| return swarming_rpcs.TasksCancelResponse( |
| cursor=cursor, |
| matched=len(tasks), |
| now=now) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TasksCountRequest, swarming_rpcs.TasksCount, |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def count(self, request): |
| """Counts number of tasks in a given state.""" |
| logging.debug('%s', request) |
| |
| # Check permission. |
| # If the caller has global permission, it can access all tasks. |
| # Otherwise, it requires pool dimension to check ACL. |
| pools = bot_management.get_pools_from_dimensions_flat(request.tags) |
| realms.check_tasks_list_acl(pools) |
| |
| if not request.start: |
| raise endpoints.BadRequestException('start (as epoch) is required') |
| now = utils.utcnow() |
| mem_key = self._memcache_key(request, now) |
| count = memcache.get(mem_key, namespace='tasks_count') |
| if count is not None: |
| return swarming_rpcs.TasksCount(count=count, now=now) |
| |
| try: |
| count = self._query_from_request(request, 'created_ts').count() |
| memcache.add(mem_key, count, 24*60*60, namespace='tasks_count') |
| except ValueError as e: |
| raise endpoints.BadRequestException( |
| 'Inappropriate filter for tasks/count: %s' % e) |
| return swarming_rpcs.TasksCount(count=count, now=now) |
| |
| def _memcache_key(self, request, now): |
| # Floor now to minute to account for empty "end" |
| end = request.end or now.replace(second=0, microsecond=0) |
| request.tags.sort() |
| return '%s|%s|%s|%s' % (request.tags, request.state, request.start, end) |
| |
| def _query_from_request(self, request, sort=None): |
| """Returns a TaskResultSummary query.""" |
| start = message_conversion.epoch_to_datetime(request.start) |
| end = message_conversion.epoch_to_datetime(request.end) |
| return task_result.get_result_summaries_query( |
| start, end, |
| sort or request.sort.name.lower(), |
| request.state.name.lower(), |
| request.tags) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| message_types.VoidMessage, swarming_rpcs.TasksTags, |
| http_method='GET') |
| @auth.require(acl.can_view_all_tasks, log_identity=True) |
| def tags(self, _request): |
| """Returns the cached set of tags currently seen in the fleet.""" |
| tags = task_result.TagAggregation.KEY.get() |
| ft = [ |
| swarming_rpcs.StringListPair(key=t.tag, value=t.values) |
| for t in tags.tags |
| ] |
| return swarming_rpcs.TasksTags(tasks_tags=ft, ts=tags.ts) |
| |
| |
| TaskQueuesRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| # Note that it's possible that the RPC returns a tad more or less items than |
| # requested limit. |
| limit=messages.IntegerField(1, default=200), |
| cursor=messages.StringField(2)) |
| |
| |
| @swarming_api.api_class(resource_name='queues', path='queues') |
| class SwarmingQueuesService(remote.Service): |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| TaskQueuesRequest, swarming_rpcs.TaskQueueList, |
| http_method='GET') |
| @auth.require(acl.can_view_all_tasks, log_identity=True) |
| def list(self, request): |
| logging.debug('%s', request) |
| now = utils.utcnow() |
| # Disable the in-process local cache. This is important, as there can be up |
| # to a thousand entities loaded in memory, and this is a pure memory leak, |
| # as there's no chance this specific instance will need these again, |
| # therefore this leads to 'Exceeded soft memory limit' AppEngine errors. |
| q = task_queues.TaskDimensions.query( |
| default_options=ndb.QueryOptions(use_cache=False)) |
| cursor = request.cursor |
| out = [] |
| count = 0 |
| # As there can be a lot of terminate tasks, try to loop a few times (max 5) |
| # to get more items. |
| while len(out) < request.limit and (cursor or not count) and count < 5: |
| items, cursor = datastore_utils.fetch_page( |
| q, request.limit - len(out), cursor) |
| for i in items: |
| for s in i.sets: |
| # Ignore the tasks that are only id specific, since they are |
| # termination tasks. There may be one per bot, and it is not really |
| # useful for the user, the user may just query the list of bots. |
| if (len(s.dimensions_flat) == 1 and |
| s.dimensions_flat[0].startswith('id:')): |
| # A terminate task. |
| continue |
| out.append(swarming_rpcs.TaskQueue( |
| dimensions=s.dimensions_flat, valid_until_ts=s.valid_until_ts)) |
| count += 1 |
| return swarming_rpcs.TaskQueueList(cursor=cursor, items=out, now=now) |
| |
| |
| BotId = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| bot_id=messages.StringField(1, required=True)) |
| |
| |
| BotEventsRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| bot_id=messages.StringField(1, required=True), |
| limit=messages.IntegerField(2, default=200), |
| cursor=messages.StringField(3), |
| # end, start are seconds since epoch. |
| end=messages.FloatField(4), |
| start=messages.FloatField(5)) |
| |
| |
| BotTasksRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| bot_id=messages.StringField(1, required=True), |
| limit=messages.IntegerField(2, default=200), |
| cursor=messages.StringField(3), |
| # end, start are seconds since epoch. |
| end=messages.FloatField(4), |
| start=messages.FloatField(5), |
| state=messages.EnumField(swarming_rpcs.TaskStateQuery, 6, default='ALL'), |
| sort=messages.EnumField(swarming_rpcs.TaskSort, 7, default='CREATED_TS'), |
| include_performance_stats=messages.BooleanField(8, default=False)) |
| |
| |
| @swarming_api.api_class(resource_name='bot', path='bot') |
| class SwarmingBotService(remote.Service): |
| """Bot-related API. Permits querying information about the bot's properties""" |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotId, swarming_rpcs.BotInfo, |
| name='get', |
| path='{bot_id}/get', |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def get(self, request): |
| """Returns information about a known bot. |
| |
| This includes its state and dimensions, and if it is currently running a |
| task. |
| """ |
| logging.debug('%s', request) |
| bot_id = request.bot_id |
| |
| # Check permission. |
| # The caller needs to have global permission, or any permissions of the |
| # pools that the bot belongs to. |
| realms.check_bot_get_acl(bot_id) |
| |
| bot = bot_management.get_info_key(bot_id).get() |
| deleted = False |
| if not bot: |
| # If there is not BotInfo, look if there are BotEvent child of this |
| # entity. If this is the case, it means the bot was deleted but it's |
| # useful to show information about it to the user even if the bot was |
| # deleted. |
| events = bot_management.get_events_query(bot_id, True).fetch(1) |
| if not events: |
| raise endpoints.NotFoundException('%s not found.' % bot_id) |
| bot = bot_management.BotInfo( |
| key=bot_management.get_info_key(bot_id), |
| dimensions_flat=task_queues.bot_dimensions_to_flat( |
| events[0].dimensions), |
| state=events[0].state, |
| external_ip=events[0].external_ip, |
| authenticated_as=events[0].authenticated_as, |
| version=events[0].version, |
| quarantined=events[0].quarantined, |
| maintenance_msg=events[0].maintenance_msg, |
| task_id=events[0].task_id, |
| last_seen_ts=events[0].ts) |
| # message_conversion.bot_info_to_rpc calls `is_dead`, `is_alive`. And |
| # Those properties require components to be calculated. The calculation |
| # is done in _pre_put_hook usually. But the BotInfo shouldn't be stored |
| # in this case, as it's already deleted. |
| bot.composite = bot._calc_composite() |
| deleted = True |
| |
| return message_conversion.bot_info_to_rpc(bot, deleted=deleted) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotId, swarming_rpcs.DeletedResponse, |
| name='delete', |
| path='{bot_id}/delete') |
| @auth.require(acl.can_delete_bot, log_identity=True) |
| def delete(self, request): |
| """Deletes the bot corresponding to a provided bot_id. |
| |
| At that point, the bot will not appears in the list of bots but it is still |
| possible to get information about the bot with its bot id is known, as |
| historical data is not deleted. |
| |
| It is meant to remove from the DB the presence of a bot that was retired, |
| e.g. the VM was shut down already. Use 'terminate' instead of the bot is |
| still alive. |
| """ |
| logging.debug('%s', request) |
| bot_info_key = bot_management.get_info_key(request.bot_id) |
| get_or_raise(bot_info_key) # raises 404 if there is no such bot |
| # BotRoot is parent to BotInfo. It is important to note that the bot is |
| # not there anymore, so it is not a member of any task queue. |
| task_queues.cleanup_after_bot(bot_info_key.parent()) |
| bot_info_key.delete() |
| return swarming_rpcs.DeletedResponse(deleted=True) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotEventsRequest, swarming_rpcs.BotEvents, |
| name='events', |
| path='{bot_id}/events', |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def events(self, request): |
| """Returns events that happened on a bot.""" |
| logging.debug('%s', request) |
| bot_id = request.bot_id |
| |
| # Check permission. |
| # The caller needs to have global permission, or any permissions of the |
| # pools that the bot belongs to. |
| realms.check_bot_get_acl(bot_id) |
| |
| try: |
| now = utils.utcnow() |
| start = message_conversion.epoch_to_datetime(request.start) |
| end = message_conversion.epoch_to_datetime(request.end) |
| order = not (start or end) |
| q = bot_management.get_events_query(bot_id, order) |
| if not order: |
| q = q.order(-bot_management.BotEvent.ts, bot_management.BotEvent.key) |
| if start: |
| q = q.filter(bot_management.BotEvent.ts >= start) |
| if end: |
| q = q.filter(bot_management.BotEvent.ts < end) |
| items, cursor = datastore_utils.fetch_page( |
| q, request.limit, request.cursor) |
| except ValueError as e: |
| raise endpoints.BadRequestException( |
| 'Inappropriate filter for bot.events: %s' % e) |
| return swarming_rpcs.BotEvents( |
| cursor=cursor, |
| items=[message_conversion.bot_event_to_rpc(r) for r in items], |
| now=now) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotId, swarming_rpcs.TerminateResponse, |
| name='terminate', |
| path='{bot_id}/terminate') |
| @auth.require(acl.can_access, log_identity=True) |
| def terminate(self, request): |
| """Asks a bot to terminate itself gracefully. |
| |
| The bot will stay in the DB, use 'delete' to remove it from the DB |
| afterward. This request returns a pseudo-taskid that can be waited for to |
| wait for the bot to turn down. |
| |
| This command is particularly useful when a privileged user needs to safely |
| debug a machine specific issue. The user can trigger a terminate for one of |
| the bot exhibiting the issue, wait for the pseudo-task to run then access |
| the machine with the guarantee that the bot is not running anymore. |
| """ |
| # TODO(maruel): Disallow a terminate task when there's one currently |
| # pending or if the bot is considered 'dead', e.g. no contact since 10 |
| # minutes. |
| logging.debug('%s', request) |
| bot_id = unicode(request.bot_id) |
| |
| # Check permission. |
| # The caller needs to have global permission, or a permission in any pools |
| # that the bot belongs to. |
| realms.check_bot_terminate_acl(bot_id) |
| |
| bot_key = bot_management.get_info_key(bot_id) |
| get_or_raise(bot_key) # raises 404 if there is no such bot |
| try: |
| # Craft a special priority 0 task to tell the bot to shutdown. |
| request = task_request.create_termination_task( |
| bot_id, wait_for_capacity=True) |
| except (datastore_errors.BadValueError, TypeError, ValueError) as e: |
| raise endpoints.BadRequestException(e.message) |
| |
| result_summary = task_scheduler.schedule_request( |
| request, secret_bytes=None, enable_resultdb=False) |
| return swarming_rpcs.TerminateResponse( |
| task_id=task_pack.pack_result_summary_key(result_summary.key)) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotTasksRequest, swarming_rpcs.BotTasks, |
| name='tasks', |
| path='{bot_id}/tasks', |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def tasks(self, request): |
| """Lists a given bot's tasks within the specified date range. |
| |
| In this case, the tasks are effectively TaskRunResult since it's individual |
| task tries sent to this specific bot. |
| |
| It is impossible to search by both tags and bot id. If there's a need, |
| TaskRunResult.tags will be added (via a copy from TaskRequest.tags). |
| """ |
| logging.debug('%s', request) |
| |
| # Check permission. |
| # The caller needs to have global permission, or any permissions of the |
| # pools that the bot belongs to. |
| realms.check_bot_tasks_acl(request.bot_id) |
| |
| try: |
| start = message_conversion.epoch_to_datetime(request.start) |
| end = message_conversion.epoch_to_datetime(request.end) |
| now = utils.utcnow() |
| q = task_result.get_run_results_query( |
| start, end, |
| request.sort.name.lower(), |
| request.state.name.lower(), |
| request.bot_id) |
| items, cursor = datastore_utils.fetch_page( |
| q, request.limit, request.cursor) |
| except ValueError as e: |
| raise endpoints.BadRequestException( |
| 'Inappropriate filter for bot.tasks: %s' % e) |
| return swarming_rpcs.BotTasks( |
| cursor=cursor, |
| items=[ |
| message_conversion.task_result_to_rpc( |
| r, request.include_performance_stats) |
| for r in items |
| ], |
| now=now) |
| |
| |
| BotsRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| limit=messages.IntegerField(1, default=200), |
| cursor=messages.StringField(2), |
| # Must be a list of 'key:value' strings to filter the returned list of bots |
| # on. |
| dimensions=messages.StringField(3, repeated=True), |
| quarantined=messages.EnumField( |
| swarming_rpcs.ThreeStateBool, 4, default='NONE'), |
| in_maintenance=messages.EnumField( |
| swarming_rpcs.ThreeStateBool, 8, default='NONE'), |
| is_dead=messages.EnumField(swarming_rpcs.ThreeStateBool, 5, default='NONE'), |
| is_busy=messages.EnumField(swarming_rpcs.ThreeStateBool, 6, default='NONE')) |
| |
| |
| BotsCountRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, |
| dimensions=messages.StringField(1, repeated=True)) |
| |
| |
| BotsDimensionsRequest = endpoints.ResourceContainer( |
| message_types.VoidMessage, pool=messages.StringField(1)) |
| |
| |
| @swarming_api.api_class(resource_name='bots', path='bots') |
| class SwarmingBotsService(remote.Service): |
| """Bots-related API.""" |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotsRequest, swarming_rpcs.BotList, |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def list(self, request): |
| """Provides list of known bots. |
| |
| Deleted bots will not be listed. |
| """ |
| logging.debug('%s', request) |
| |
| # Check permission. |
| # If the caller has global permission, it can access all bots. |
| # Otherwise, it requires pool dimension to check ACL. |
| pools = bot_management.get_pools_from_dimensions_flat(request.dimensions) |
| realms.check_bots_list_acl(pools) |
| |
| now = utils.utcnow() |
| # Disable the in-process local cache. This is important, as there can be up |
| # to a thousand entities loaded in memory, and this is a pure memory leak, |
| # as there's no chance this specific instance will need these again, |
| # therefore this leads to 'Exceeded soft memory limit' AppEngine errors. |
| q = bot_management.BotInfo.query( |
| default_options=ndb.QueryOptions(use_cache=False)) |
| try: |
| q = bot_management.filter_dimensions(q, request.dimensions) |
| q = bot_management.filter_availability( |
| q, swarming_rpcs.to_bool(request.quarantined), |
| swarming_rpcs.to_bool(request.in_maintenance), |
| swarming_rpcs.to_bool(request.is_dead), |
| swarming_rpcs.to_bool(request.is_busy)) |
| except ValueError as e: |
| raise endpoints.BadRequestException(str(e)) |
| |
| bots, cursor = datastore_utils.fetch_page(q, request.limit, request.cursor) |
| return swarming_rpcs.BotList( |
| cursor=cursor, |
| death_timeout=config.settings().bot_death_timeout_secs, |
| items=[message_conversion.bot_info_to_rpc(bot) for bot in bots], |
| now=now) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotsCountRequest, swarming_rpcs.BotsCount, |
| http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def count(self, request): |
| """Counts number of bots with given dimensions.""" |
| logging.debug('%s', request) |
| |
| # Check permission. |
| # If the caller has global permission, it can access all bots. |
| # Otherwise, it requires pool dimension to check ACL. |
| pools = bot_management.get_pools_from_dimensions_flat(request.dimensions) |
| realms.check_bots_list_acl(pools) |
| |
| now = utils.utcnow() |
| q = bot_management.BotInfo.query() |
| try: |
| q = bot_management.filter_dimensions(q, request.dimensions) |
| except ValueError as e: |
| raise endpoints.BadRequestException(str(e)) |
| |
| f_count = q.count_async() |
| f_dead = bot_management.filter_availability( |
| q, None, None, True, None).count_async() |
| f_quarantined = bot_management.filter_availability( |
| q, True, None, None, None).count_async() |
| f_maintenance = bot_management.filter_availability( |
| q, None, True, None, None).count_async() |
| f_busy = bot_management.filter_availability( |
| q, None, None, None, True).count_async() |
| return swarming_rpcs.BotsCount( |
| count=f_count.get_result(), |
| quarantined=f_quarantined.get_result(), |
| maintenance=f_maintenance.get_result(), |
| dead=f_dead.get_result(), |
| busy=f_busy.get_result(), |
| now=now) |
| |
| @gae_ts_mon.instrument_endpoint() |
| @auth.endpoints_method( |
| BotsDimensionsRequest, swarming_rpcs.BotsDimensions, http_method='GET') |
| @auth.require(acl.can_access, log_identity=True) |
| def dimensions(self, request): |
| """Returns the cached set of dimensions currently in use in the fleet.""" |
| # The caller should be allowed to list bots in the specified pool or all |
| # pools. |
| realms.check_bots_list_acl([request.pool] if request.pool else None) |
| |
| # TODO(jwata): change 'current' to 'all' once the entity is ready. |
| agg = bot_management.get_aggregation_key(request.pool or 'current').get() |
| if not agg: |
| raise endpoints.NotFoundException( |
| 'Dimension aggregation for pool %s does not exit' % request.pool) |
| return swarming_rpcs.BotsDimensions( |
| bots_dimensions=[ |
| swarming_rpcs.StringListPair(key=d.dimension, value=d.values) |
| for d in agg.dimensions |
| ], |
| ts=agg.ts) |
| |
| |
| def get_routes(): |
| return endpoints_webapp2.api_routes([ |
| SwarmingServerService, |
| SwarmingTaskService, |
| SwarmingTasksService, |
| SwarmingQueuesService, |
| SwarmingBotService, |
| SwarmingBotsService, |
| # components.config endpoints for validation and configuring of luci-config |
| # service URL. |
| config.ConfigApi], base_path='/api') |