blob: 63b9f96b89cfeae3bcac70d9290c3183cddce83d [file] [log] [blame]
# Copyright 2015 The Swarming Authors. All rights reserved.
# Use of this source code is governed by the Apache v2.0 license that can be
# found in the LICENSE file.
"""This module defines Swarming Server endpoints handlers."""
import datetime
import json
from google.appengine.api import datastore_errors
from google.appengine.datastore import datastore_query
import endpoints
from protorpc import message_types
from protorpc import remote
from components import auth
from components import utils
import message_conversion
import swarming_rpcs
from server import acl
from server import bot_management
from server import config
from server import task_pack
from server import task_request
from server import task_result
from server import task_scheduler
### Helper Methods
def get_result_key(task_id):
"""Provides the key corresponding to a task ID."""
key = None
summary_key = None
try:
key = task_pack.unpack_result_summary_key(task_id)
summary_key = key
except ValueError:
try:
key = task_pack.unpack_run_result_key(task_id)
summary_key = task_pack.run_result_key_to_result_summary_key(key)
except ValueError:
raise endpoints.BadRequestException(
'Task ID %s produces an invalid key.' % task_id)
return key, summary_key
def get_or_raise(key):
"""Returns an entity or raises an endpoints exception if it does not exist."""
result = key.get()
if not result:
raise endpoints.NotFoundException('Key %s not found.' % str(key))
return result
def get_result_entity(task_id):
"""Returns the entity (TaskResultSummary or TaskRunResult) for a given ID."""
key, _ = get_result_key(task_id)
return get_or_raise(key)
def _transform_request(request_dict):
"""Makes a task request compatible with the "old" API."""
exp_secs = request_dict.pop('expiration_secs', None)
if exp_secs is not None:
request_dict['scheduling_expiration_secs'] = exp_secs
request_dict.setdefault('properties', {})
command = request_dict['properties'].pop('command', None)
if command is not None:
request_dict['properties']['commands'] = [command]
data = request_dict['properties'].pop('data', None)
if data is not None:
request_dict['properties']['data'] = [
[pair['key'], pair['value']] for pair in data]
for key in ['dimensions', 'env']:
old_list = request_dict['properties'].get(key, [])
request_dict['properties'][key] = {
pair['key']: pair['value'] for pair in old_list}
request_dict['properties'].setdefault('data', [])
def _get_range(request):
"""Get (start, end) from request types that specify date ranges."""
end = request.end or utils.utcnow()
return (request.start or end - datetime.timedelta(days=1), end)
### API
swarming_api = auth.endpoints_api(name='swarming', version='v1')
@swarming_api.api_class(resource_name='tasks', path='tasks')
class SwarmingTaskService(remote.Service):
"""Swarming's task-related API."""
### API Methods
@auth.endpoints_method(message_types.VoidMessage, swarming_rpcs.ServerDetails)
@auth.require(acl.is_bot_or_user)
def server_details(self, _request):
"""Returns information about the server."""
return swarming_rpcs.ServerDetails(server_version=utils.get_app_version())
@auth.endpoints_method(swarming_rpcs.TaskId, swarming_rpcs.TaskResultSummary)
@auth.require(acl.is_bot_or_user)
def result(self, request):
"""Reports the result of the task corresponding to a task ID."""
entity = get_result_entity(request.task_id)
return message_conversion.task_result_summary_from_dict(
utils.to_json_encodable(entity))
@auth.endpoints_method(swarming_rpcs.TaskId, swarming_rpcs.TaskRequest)
@auth.require(acl.is_bot_or_user)
def request(self, request):
"""Returns the task result corresponding to a task ID."""
_, summary_key = get_result_key(request.task_id)
request_key = task_pack.result_summary_key_to_request_key(summary_key)
entity = get_or_raise(request_key)
return message_conversion.task_request_from_dict(entity.to_dict())
@auth.endpoints_method(swarming_rpcs.TaskId, swarming_rpcs.CancelResponse)
@auth.require(acl.is_admin)
def cancel(self, request):
"""Cancels a task and indicate success."""
summary_key = task_pack.unpack_result_summary_key(request.task_id)
ok, was_running = task_scheduler.cancel_task(summary_key)
return swarming_rpcs.CancelResponse(ok=ok, was_running=was_running)
@auth.endpoints_method(swarming_rpcs.TaskId, swarming_rpcs.TaskOutput)
@auth.require(acl.is_bot_or_user)
def result_output(self, request):
"""Reports the output of the task corresponding to a task ID."""
result = get_result_entity(request.task_id)
output = result.get_command_output_async(0).get_result()
if output:
output = output.decode('utf-8', 'replace')
return swarming_rpcs.TaskOutput(output=output)
@auth.endpoints_method(
swarming_rpcs.TaskRequest, swarming_rpcs.TaskRequestMetadata)
@auth.require(acl.is_bot_or_user)
def new(self, request):
"""Provides a TaskRequest and receive its metadata."""
request_dict = json.loads(remote.protojson.encode_message(request))
_transform_request(request_dict)
# If the priority is below 100, make the the user has right to do so.
if request_dict.get('priority', 255) < 100 and not acl.is_bot_or_admin():
# Silently drop the priority of normal users.
request_dict['priority'] = 100
try:
posted_request = task_request.make_request(request_dict)
except (datastore_errors.BadValueError, TypeError, ValueError) as e:
raise endpoints.BadRequestException(e.message)
result_summary = task_scheduler.schedule_request(posted_request)
posted_dict = utils.to_json_encodable(posted_request)
return swarming_rpcs.TaskRequestMetadata(
request=message_conversion.task_request_from_dict(posted_dict),
task_id=task_pack.pack_result_summary_key(result_summary.key))
@auth.endpoints_method(swarming_rpcs.TasksRequest, swarming_rpcs.TaskList)
@auth.require(acl.is_privileged_user)
def list(self, request):
"""Provides a list of available tasks."""
state = request.state.name.lower()
uses = sum([bool(request.tag), state != 'all'])
if state != 'all':
raise endpoints.BadRequestException(
'Querying by state is not yet supported. '
'Received argument state=%s.' % state)
if uses > 1:
raise endpoints.BadRequestException(
'Only one of tag (1 or many) or state can be used.')
# get the tasks
try:
start, end = _get_range(request)
items, cursor_str, state = task_result.get_tasks_with_filter(
request.tag, request.cursor, start, end, state, request.batch_size)
return swarming_rpcs.TaskList(
cursor=cursor_str,
items=[message_conversion.task_result_summary_from_dict(
utils.to_json_encodable(item)) for item in items])
except ValueError as e:
raise endpoints.BadRequestException(
'Inappropriate batch_size for tasks/list: %s' % e)
@swarming_api.api_class(resource_name='bots', path='bots')
class SwarmingBotService(remote.Service):
"""Swarming's bot-related API."""
@auth.endpoints_method(swarming_rpcs.BotId, swarming_rpcs.BotInfo)
@auth.require(acl.is_privileged_user)
def get(self, request):
"""Provides BotInfo corresponding to a provided bot_id."""
bot = get_or_raise(bot_management.get_info_key(request.bot_id))
entity_dict = bot.to_dict_with_now(utils.utcnow())
return message_conversion.bot_info_from_dict(entity_dict)
@auth.endpoints_method(
swarming_rpcs.BotId, swarming_rpcs.DeletedResponse, http_method='DELETE')
@auth.require(acl.is_admin)
def delete(self, request):
"""Deletes the bot corresponding to a provided bot_id."""
bot_key = bot_management.get_info_key(request.bot_id)
get_or_raise(bot_key) # raises 404 if there is no such bot
bot_key.delete()
return swarming_rpcs.DeletedResponse(deleted=True)
@auth.endpoints_method(swarming_rpcs.BotTasksRequest, swarming_rpcs.BotTasks)
@auth.require(acl.is_privileged_user)
def tasks(self, request):
"""Lists all of a given bot's tasks."""
try:
start, end = _get_range(request)
run_results, cursor, more = task_result.get_run_results(
request.cursor, request.bot_id, start, end, request.batch_size)
except ValueError as e:
raise endpoints.BadRequestException(
'Inappropriate batch_size for bots/list: %s' % e)
result_list = [message_conversion.task_run_result_from_dict(
utils.to_json_encodable(run_result)) for run_result in run_results]
response = swarming_rpcs.BotTasks(
cursor=cursor.urlsafe() if cursor and more else None,
items=result_list,
now=utils.utcnow())
return response
@auth.endpoints_method(swarming_rpcs.BotsRequest, swarming_rpcs.BotList)
@auth.require(acl.is_privileged_user)
def list(self, request):
"""Provides list of bots."""
now = utils.utcnow()
cursor = datastore_query.Cursor(urlsafe=request.cursor)
q = bot_management.BotInfo.query().order(bot_management.BotInfo.key)
bots, cursor, more = q.fetch_page(request.batch_size, start_cursor=cursor)
return swarming_rpcs.BotList(
cursor=cursor.urlsafe() if cursor and more else None,
death_timeout=config.settings().bot_death_timeout_secs,
items=[message_conversion.bot_info_from_dict(bot.to_dict_with_now(
now)) for bot in bots],
now=now)