blob: 48f147032221b1984f2934de0f5254dd1eb8287b [file] [log] [blame]
# Copyright 2018 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Helper functions for interacting with an external scheduler."""
import collections
import json
import logging
from components import utils
from components import datastore_utils
from components.prpc import client
from google.appengine.api import taskqueue
from google.appengine.ext import ndb
from google.protobuf import json_format
from proto.api import swarming_pb2
from proto.plugin import plugin_pb2
from proto.plugin import plugin_prpc_pb2
from server import config
from server import pools_config
from server import task_queues
class ExternalSchedulerException(Exception):
"""Raised when an external-scheduler related error occurs."""
def _get_client(address):
"""Get a prpc client instance for given address."""
return client.Client(
address,
plugin_prpc_pb2.ExternalSchedulerServiceDescription,
insecure=utils.is_local_dev_server(),
timeout=20,
max_attempts=2)
def _creds():
"""Get the correct credentials argument for this environment."""
return (None if utils.is_local_dev_server() else
client.service_account_credentials())
def _bot_pool_cfg(bot_dimensions):
"""Retrieves the PoolConfig for a bot.
Arguments:
- bot_dimensions: The dimensions of the bot as a dictionary in
{string key: list of string values} format.
Returns:
PoolConfig for the bot if it exists, or None otherwise.
"""
pools = bot_dimensions.get(u'pool')
if not pools:
return None
if len(pools) == 1:
return pools_config.get_pool_config(pools[0])
logging.warning(
'Bot with dimensions %s was found to be in multiple '
'pools. Unable to determine pool config.', bot_dimensions)
def _config_for_dimensions(pool_cfg, dimensions_flat):
"""Determines the external scheduler for pool config and dimension set.
Pool's dimensions are matched with each config from top to down. The
last config in the file should be the default one.
"""
if not pool_cfg or not pool_cfg.external_schedulers:
return None
for e in pool_cfg.external_schedulers:
if e.dimensions.issubset(dimensions_flat):
return e if e.enabled else None
return None
### Public API.
def config_for_bot(bot_dimensions):
"""Retrieves the ExternalSchedulerConfig for this bot, if any.
Arguments:
- bot_dimensions: The dimensions of the bot as a dictionary in
{string key: list of string values} format.
Returns:
pools_config.ExternalSchedulerConfig for external scheduler to use for
this bot, if it exists, or None otherwise.
"""
pool_cfg = _bot_pool_cfg(bot_dimensions)
bot_dimensions_flat = set(task_queues.bot_dimensions_to_flat(bot_dimensions))
return _config_for_dimensions(pool_cfg, bot_dimensions_flat)
def config_for_task(request):
"""Retrieves the ExternalSchedulerConfig for this task request, if any.
Arguments:
request: a task_request.TaskRequest instance.
Returns:
pools_config.ExternalSchedulerConfig for external scheduler to use for
this bot, if it exists, or None otherwise.
"""
s0 = request.task_slice(0)
pool = s0.properties.pool
if not pool:
return None
pool_cfg = pools_config.get_pool_config(pool)
if not pool_cfg or not pool_cfg.external_schedulers:
return None
# Determine the dimension intersection across all task slices.
common_dimensions = set(
task_queues.bot_dimensions_to_flat(s0.properties.dimensions))
for i in range(1, request.num_task_slices):
s = request.task_slice(i)
common_dimensions.intersection_update(
task_queues.bot_dimensions_to_flat(s.properties.dimensions))
return _config_for_dimensions(pool_cfg, common_dimensions)
def assign_task(es_cfg, bot_dimensions):
"""Calls external scheduler for a single idle bot with given dimensions.
Arguments:
es_cfg: pools_config.ExternalSchedulerConfig instance.
bot_dimensions: dimensions {string key: list of string values}
Returns:
(Task id string, slice number) tuple or (None, None) if no task
to assign.
"""
bot_id = bot_dimensions[u'id'][0]
logging.debug('Using external scheduler address: %s id: %s for bot %s',
es_cfg.address, es_cfg.id, bot_id)
req = plugin_pb2.AssignTasksRequest()
idle_bot = req.idle_bots.add()
idle_bot.bot_id = bot_id
idle_bot.dimensions.extend(task_queues.bot_dimensions_to_flat(bot_dimensions))
req.scheduler_id = es_cfg.id
req.time.GetCurrentTime()
c = _get_client(es_cfg.address)
# TODO(akeshet): Catch or handle errors appropriately.
resp = c.AssignTasks(req, credentials=_creds())
if not resp or not resp.assignments:
return None, None
assert len(resp.assignments) == 1
assert resp.assignments[0].bot_id == bot_id
return resp.assignments[0].task_id, resp.assignments[0].slice_number
# TODO(linxinan): Remove the default argument of batch_mode.
def notify_requests(es_cfg, requests, use_tq, is_callback, batch_mode=False):
"""Calls external scheduler to notify it of a task state.
Arguments:
- es_cfg: pools_config.ExternalSchedulerConfig for external scheduler to
notify.
- requests:
A list of (task_request.TaskRequest,
task_result.TaskResultSummary or task_result.TaskRunResult)
tuples.
- use_tq: If true, make this call on a task queue (within the current
datastore transaction).
- is_callback: If true, indicates that this notification was in response
to a external-scheduler-requested callback. This is for
- batch_mode: If true, the notifications will be sent in a batched mode
along with others, to reduce traffic to external scheduler.
Only valid when use_tq and global config's
enable_batch_es_notifications are true.
Returns: Nothing.
"""
logging.debug(
'notify_requests(es_cfg=(%s,%s), requests=%s, use_tq=%s, '
'is_callback=%s, batch_mode=%s)', es_cfg.address, es_cfg.id,
[r.task_id for r, _ in requests], use_tq, is_callback, batch_mode)
req = plugin_pb2.NotifyTasksRequest()
req.is_callback = is_callback
for request, result_summary in requests:
item = req.notifications.add()
# TODO(akeshet): This time should possibly come from the read time from
# datastore, rather than the local server clock.
item.time.FromDatetime(utils.utcnow())
item.task.id = request.task_id
item.task.tags.extend(request.tags)
item.task.enqueued_time.FromDatetime(request.created_ts)
for i in range(request.num_task_slices):
s = request.task_slice(i)
flat_dimensions = task_queues.bot_dimensions_to_flat(
s.properties.dimensions)
s_pb = item.task.slices.add()
s_pb.dimensions.extend(flat_dimensions)
res = swarming_pb2.TaskResult()
result_summary.to_proto(res)
item.task.state = res.state
if result_summary.bot_id:
# TODO(akeshet): We should only actually set this is state is running.
item.task.bot_id = result_summary.bot_id
req.scheduler_id = es_cfg.id
if not use_tq:
# Ignore return value, the response proto is empty.
notify_request_now(es_cfg.address, req)
return
request_json = json_format.MessageToJson(req)
# If enable_batch_es_notifications is true, the notifications will be sent in
# a batched mode along with others, to reduce traffic to external scheduler.
if batch_mode and config.settings().enable_batch_es_notifications:
payload = {'es_host': es_cfg.address, 'request_json': request_json}
req = taskqueue.Task(payload=json.dumps(payload), method='PULL')
if not req.add(
queue_name='es-notify-tasks-batch', transactional=ndb.in_transaction()):
raise datastore_utils.CommitError('Failed to enqueue task')
stats = taskqueue.QueueStatistics.fetch('es-notify-kick')
# Add a kicker task if there are fewer than 10 minutes worth.
if stats.tasks < 600:
job_enqueued = utils.enqueue_task(
'/internal/taskqueue/important/external_scheduler/notify-kick',
'es-notify-kick',
transactional=ndb.in_transaction())
if not job_enqueued:
logging.info('Failed to add a notify-kick for request.')
return
enqueued = utils.enqueue_task(
'/internal/taskqueue/important/external_scheduler/notify-tasks',
'es-notify-tasks',
params={
'es_host': es_cfg.address,
'request_json': request_json
},
transactional=ndb.in_transaction())
if not enqueued:
raise datastore_utils.CommitError('Failed to enqueue task')
def notify_request_now(es_host, proto):
"""Calls external scheduler's NotifyTask endpoint immediately.
Arguments:
es_host: Address of external scheduler to use.
proto: plugin_pb2.NotifyTasksRequest instance to call with.
"""
c = _get_client(es_host)
return c.NotifyTasks(proto, credentials=_creds())
def task_batch_handle_notifications():
"""Batches notifications from pull queue, and forwards to push queue."""
# Number of seconds to lease the tasks. Once it expires, the
# tasks will be available again for the next worker.
LEASE_SEC = 60
# The maximum number of tasks to lease from the pull queue.
MAX_TASKS = 1000
queue = taskqueue.Queue('es-notify-tasks-batch')
tasks = queue.lease_tasks(LEASE_SEC, MAX_TASKS)
if not tasks:
return
requests = {}
tasks_per_scheduler = collections.defaultdict(list)
for task in tasks:
proto = plugin_pb2.NotifyTasksRequest()
payload = json.loads(task.payload)
json_format.Parse(payload['request_json'], proto)
s_tuple = (proto.scheduler_id, payload['es_host'])
tasks_per_scheduler[s_tuple].append(task)
if s_tuple not in requests:
requests[s_tuple] = proto
else:
requests[s_tuple].notifications.extend(proto.notifications)
# pylint: disable=consider-iterating-dictionary
for s_id, address in requests.keys():
request_json = json_format.MessageToJson(requests[s_id, address])
enqueued = utils.enqueue_task(
'/internal/taskqueue/important/external_scheduler/notify-tasks',
'es-notify-tasks',
params={'es_host': address, 'request_json': request_json},
transactional=ndb.in_transaction())
if not enqueued:
logging.warning('Failed to enqueue external scheduler task, skipping')
continue
queue.delete_tasks(tasks_per_scheduler[s_id, address])
def get_cancellations(es_cfg):
"""Calls external scheduler and returns task cancellations."""
req = plugin_pb2.GetCancellationsRequest()
req.scheduler_id = es_cfg.id
c = _get_client(es_cfg.address)
resp = c.GetCancellations(req, credentials=_creds())
return resp.cancellations
def get_callbacks(es_cfg):
"""Calls external scheduler and returns callback task ids."""
req = plugin_pb2.GetCallbacksRequest()
req.scheduler_id = es_cfg.id
c = _get_client(es_cfg.address)
resp = c.GetCallbacks(req, credentials=_creds())
return resp.task_ids