| # Copyright 2014 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """High level tasks execution scheduling API. |
| |
| This is the interface closest to the HTTP handlers. |
| """ |
| |
| import collections |
| import datetime |
| import json |
| import logging |
| import math |
| import random |
| import time |
| import urlparse |
| import uuid |
| |
| from google.appengine.api import app_identity |
| from google.appengine.ext import ndb |
| from google.protobuf import timestamp_pb2 |
| |
| from components import auth |
| from components import datastore_utils |
| from components import pubsub |
| from components import utils |
| |
| import backend_conversions |
| import handlers_exceptions |
| import ts_mon_metrics |
| |
| from proto.api_v2.swarming_pb2 import TaskState |
| from server import bot_management |
| from server import config |
| from server import external_scheduler |
| from server import pools_config |
| from server import rbe |
| from server import resultdb |
| from server import service_accounts_utils |
| from server import task_pack |
| from server import task_queues |
| from server import task_request |
| from server import task_result |
| from server import task_to_run |
| |
| from bb.go.chromium.org.luci.buildbucket.proto import task_pb2 |
| |
| |
| ### Private stuff. |
| |
| |
| _PROBABILITY_OF_QUICK_COMEBACK = 0.05 |
| |
| # When falling back from external scheduler, requests that belong to any |
| # external scheduler are ignored for this duration at the beginning of their |
| # life. This number should be larger than the bot polling period. |
| _ES_FALLBACK_SLACK = datetime.timedelta(minutes=6) |
| |
| |
| # Non-essential bot information for reaping a task |
| BotDetails = collections.namedtuple('BotDetails', |
| ['bot_version', 'logs_cloud_project']) |
| |
| |
| class Error(Exception): |
| pass |
| |
| |
| class ClaimError(Error): |
| pass |
| |
| |
| class TaskExistsException(Error): |
| pass |
| |
| |
| def _expire_slice_tx(request, to_run_key, terminal_state, capacity, es_cfg): |
| """Expires a TaskToRunShardXXX and enqueues the next one, if necessary. |
| |
| Called as a ndb transaction by _expire_slice(). |
| |
| Arguments: |
| request: the TaskRequest instance with all slices. |
| to_run_key: the TaskToRunShard to expire. |
| terminal_state: the task state to set if this slice is the last one. |
| capacity: dict {slice index => True if can run it}, None to skip the check. |
| es_cfg: ExternalSchedulerConfig for this task. |
| |
| Returns: |
| ( |
| TaskResultSummary if updated it, |
| TaskToRunShardXXX matching to_run_key if expired it, |
| TaskToRunShardXXX with new enqueued slice if created it, |
| True if TaskResultSummary.state has changed by these actions, |
| ) |
| """ |
| assert ndb.in_transaction() |
| assert to_run_key.parent() == request.key |
| |
| now = utils.utcnow() |
| slice_index = task_to_run.task_to_run_key_slice_index(to_run_key) |
| result_summary_key = task_pack.request_key_to_result_summary_key(request.key) |
| |
| # Check the TaskToRunShardXXX is pending. Fetch TaskResultSummary while at it. |
| to_run, result_summary = ndb.get_multi([to_run_key, result_summary_key], |
| use_cache=False, |
| use_memcache=False) |
| if not to_run or not result_summary: |
| logging.warning('%s/%s: already gone', request.task_id, slice_index) |
| return None, None, None, False |
| if not to_run.is_reapable: |
| logging.info('%s/%s: already processed', request.task_id, slice_index) |
| return None, None, None, False |
| |
| # TaskResultSummary always "tracks" the current slice and there can be only |
| # one current slice (i.e. with is_reapable==True), and it is `to_run`. |
| # |
| # TODO(vadimsh): Unfortunately these invariants are violated by |
| # _ensure_active_slice used with External Scheduler flow. Not sure if this is |
| # intentional or it is a bug. Either way, these asserts will fire for tasks |
| # scheduled through the external scheduler, so skip them. The rest should be |
| # fine. |
| if not es_cfg: |
| assert to_run.task_slice_index == slice_index, to_run |
| assert result_summary.current_task_slice == slice_index, result_summary |
| assert not result_summary.try_number, result_summary |
| |
| # Will accumulate all entities that need to be stored at the end. |
| to_put = [] |
| |
| # Record the expiration delay if the slice expired by reaching its deadline. |
| # It may end up negative if there's a clock drift between the process that ran |
| # yield_expired_task_to_run() and the process that runs this transaction. This |
| # should be rare. |
| if terminal_state == task_result.State.EXPIRED: |
| delay = (now - to_run.expiration_ts).total_seconds() |
| if delay < 0: |
| logging.warning( |
| '_expire_slice_tx: the task is not expired. task_id=%s slice=%d ' |
| 'expiration_ts=%s, delay=%f', to_run.task_id, to_run.task_slice_index, |
| to_run.expiration_ts, delay) |
| delay = 0.0 |
| to_run.expiration_delay = delay |
| |
| # Mark the current TaskToRunShardXXX as consumed. |
| to_run.consume(None) |
| to_put.append(to_run) |
| |
| # Check if there's a fallback slice we have capacity to run. |
| new_to_run = None |
| for idx in range(slice_index + 1, request.num_task_slices): |
| if capacity is None or capacity[idx]: |
| new_to_run = task_to_run.new_task_to_run(request, idx) |
| to_put.append(new_to_run) |
| break |
| |
| orig_summary_state = result_summary.state |
| to_put.append(result_summary) |
| |
| if new_to_run: |
| # Start "tracking" the new slice. |
| result_summary.current_task_slice = new_to_run.task_slice_index |
| result_summary.modified_ts = now |
| else: |
| # There's no fallback, giving up. |
| result_summary.state = terminal_state |
| result_summary.internal_failure = ( |
| terminal_state == task_result.State.BOT_DIED) |
| result_summary.modified_ts = now |
| result_summary.abandoned_ts = now |
| result_summary.completed_ts = now |
| if terminal_state == task_result.State.EXPIRED: |
| delay = (now - request.expiration_ts).total_seconds() |
| result_summary.expiration_delay = max(0.0, delay) |
| |
| futures = ndb.put_multi_async(to_put) |
| _maybe_taskupdate_notify_via_tq(result_summary, |
| request, |
| es_cfg, |
| transactional=True) |
| if new_to_run and request.rbe_instance: |
| rbe.enqueue_rbe_task(request, new_to_run) |
| for f in futures: |
| f.check_success() |
| |
| state_changed = result_summary.state != orig_summary_state |
| return result_summary, to_run, new_to_run, state_changed |
| |
| |
| def _expire_slice(request, to_run_key, terminal_state, claim, txn_retries, |
| txn_catch_errors, reason): |
| """Expires a single TaskToRunShard if it is still pending. |
| |
| If the task has more slices, enqueues the next slice. Otherwise marks the |
| whole task as expired by updating TaskResultSummary. |
| |
| Arguments: |
| request: the TaskRequest instance with all slices. |
| to_run_key: the TaskToRunShard to expire. |
| terminal_state: the task state to set if this slice is the last one. |
| claim: if True, obtain task_to_run.Claim before touching the task. |
| txn_retries: how many times to retry the transaction on collisions. |
| txn_catch_errors: if True, ignore datastore_utils.CommitError. |
| reason: a string tsmon label used to identify how expiration happened. |
| |
| Returns: |
| (TaskResultSummary if updated it, new TaskToRunShard if enqueued a slice). |
| """ |
| assert to_run_key.parent() == request.key |
| slice_index = task_to_run.task_to_run_key_slice_index(to_run_key) |
| |
| # Obtain a claim if asked. This prevents other concurrent calls from touching |
| # this task. This is a best effort mechanism to reduce datastore contention. |
| if claim and not task_to_run.Claim.obtain(to_run_key): |
| logging.warning('%s/%s is marked as claimed, proceeding anyway', |
| request.task_id, slice_index) |
| |
| # Look if the TaskToRunShard is reapable once before doing the check inside |
| # the transaction. This allows to skip the transaction completely if the |
| # entity was already reaped. |
| to_run = to_run_key.get() |
| if not to_run: |
| logging.warning('%s/%s: already gone', request.task_id, slice_index) |
| return None, None |
| if not to_run.is_reapable: |
| logging.info('%s/%s: already processed', request.task_id, slice_index) |
| return None, None |
| |
| # None means "skip the check and always try to schedule the task". |
| capacity = None |
| |
| # For tasks scheduled through native Swarming scheduler do a check for |
| # capacity for the remaining slices (if any) before the |
| # transaction, as has_capacity() cannot be called within a transaction. |
| if not request.rbe_instance: |
| capacity = {} |
| for idx in range(slice_index + 1, request.num_task_slices): |
| ts = request.task_slice(idx) |
| capacity[idx] = ts.wait_for_capacity or bot_management.has_capacity( |
| ts.properties.dimensions) |
| |
| # RBE tasks don't use the external scheduler, don't need to check the config. |
| es_cfg = None |
| if not request.rbe_instance: |
| es_cfg = external_scheduler.config_for_task(request) |
| |
| try: |
| summary, old_ttr, new_ttr, state_changed = datastore_utils.transaction( |
| lambda: _expire_slice_tx(request, to_run_key, terminal_state, capacity, |
| es_cfg), |
| retries=txn_retries) |
| except datastore_utils.CommitError as exc: |
| if not txn_catch_errors: |
| raise |
| logging.warning('_expire_slice_tx failed: %s', exc) |
| return None, None |
| |
| if summary: |
| logging.info('Expired %s/%s', request.task_id, slice_index) |
| ts_mon_metrics.on_task_expired(summary, old_ttr, reason) |
| if state_changed: |
| ts_mon_metrics.on_task_status_change_scheduler_latency(summary) |
| |
| return summary, new_ttr |
| |
| |
| def _reap_task(bot_dimensions, |
| bot_details, |
| to_run_key, |
| request, |
| claim_id=None, |
| txn_retries=0, |
| txn_catch_errors=True): |
| """Reaps a task and insert the results entity. |
| |
| Returns: |
| (TaskRunResult, SecretBytes) if successful, (None, None) otherwise. |
| """ |
| assert request.key == task_to_run.task_to_run_key_to_request_key(to_run_key) |
| result_summary_key = task_pack.request_key_to_result_summary_key(request.key) |
| bot_id = bot_dimensions[u'id'][0] |
| bot_info = bot_management.get_info_key(bot_id).get(use_cache=False, |
| use_memcache=False) |
| if not bot_info: |
| raise ClaimError('Bot %s doesn\'t exist.' % bot_id) |
| |
| now = utils.utcnow() |
| # Log before the task id in case the function fails in a bad state where the |
| # DB TX ran but the reply never comes to the bot. This is the worst case as |
| # this leads to a task that results in BOT_DIED without ever starting. This |
| # case is specifically handled in cron_handle_bot_died(). |
| logging.info('_reap_task(%s, %s)', |
| task_pack.pack_result_summary_key(result_summary_key), claim_id) |
| |
| es_cfg = external_scheduler.config_for_task(request) |
| |
| keys_to_fetch = [to_run_key, result_summary_key] |
| |
| # Fetch SecretBytes as well if the slice uses secrets. |
| slice_index = task_to_run.task_to_run_key_slice_index(to_run_key) |
| if request.task_slice(slice_index).properties.has_secret_bytes: |
| keys_to_fetch.append(request.secret_bytes_key) |
| |
| def run(): |
| entities = ndb.get_multi(keys_to_fetch, use_cache=False, use_memcache=False) |
| to_run, result_summary = entities[0], entities[1] |
| secret_bytes = entities[2] if len(entities) == 3 else None |
| orig_summary_state = result_summary.state |
| |
| if not to_run: |
| logging.error('Missing TaskToRunShard?\n%s', result_summary.task_id) |
| if claim_id: |
| raise ClaimError('No task slice') |
| return None, None, None, None, False |
| |
| if not to_run.is_reapable: |
| if claim_id: |
| if to_run.claim_id != claim_id: |
| raise ClaimError('No longer available') |
| # The caller already holds the claim and this is a retry. Just fetch all |
| # entities that should already exist. |
| run_result = task_pack.result_summary_key_to_run_result_key( |
| result_summary_key).get(use_cache=False, use_memcache=False) |
| if not run_result: |
| raise Error('TaskRunResult unexpectedly missing on retry') |
| if run_result.current_task_slice != to_run.task_slice_index: |
| raise ClaimError('Obsolete') |
| return run_result, secret_bytes, result_summary, None, False |
| logging.info('%s is not reapable', result_summary.task_id) |
| return None, None, None, None, False |
| |
| if result_summary.bot_id == bot_id: |
| # This means two things, first it's a retry, second it's that the first |
| # try failed and the retry is being reaped by the same bot. Deny that, as |
| # the bot may be deeply broken and could be in a killing spree. |
| # TODO(maruel): Allow retry for bot locked task using 'id' dimension. |
| # TODO(vadimsh): This should not be possible, retries were removed. |
| logging.warning('%s can\'t retry its own internal failure task', |
| result_summary.task_id) |
| return None, None, None, None, False |
| |
| to_run.consume(claim_id) |
| run_result = task_result.new_run_result(request, to_run, bot_id, |
| bot_details, bot_dimensions, |
| result_summary.resultdb_info) |
| # Upon bot reap, both .started_ts and .modified_ts matches. They differ on |
| # the first ping. |
| run_result.started_ts = now |
| run_result.modified_ts = now |
| # The bot may became available at this request. Use current time in that |
| # case. |
| run_result.bot_idle_since_ts = bot_info.idle_since_ts or now |
| # Upon bot reap, set .dead_after_ts taking into consideration the |
| # user-provided keep-alive value. This is updated after each ping |
| # from the bot." |
| run_result.dead_after_ts = now + datetime.timedelta( |
| seconds=request.bot_ping_tolerance_secs) |
| result_summary.set_from_run_result(run_result, request) |
| ndb.put_multi([to_run, run_result, result_summary]) |
| state_changed = result_summary.state != orig_summary_state |
| if result_summary.state != orig_summary_state: |
| _maybe_taskupdate_notify_via_tq(result_summary, |
| request, |
| es_cfg, |
| transactional=True) |
| state_changed = True |
| return run_result, secret_bytes, result_summary, to_run, state_changed |
| |
| run_result = None |
| secret_bytes = None |
| summary = None |
| to_run = None |
| state_changed = False |
| try: |
| run_result, secret_bytes, summary, to_run, state_changed = \ |
| datastore_utils.transaction(run, retries=txn_retries, deadline=30) |
| except datastore_utils.CommitError: |
| if not txn_catch_errors: |
| raise |
| # The challenge here is that the transaction may have failed because: |
| # - The DB had an hickup and the TaskToRunShard, TaskRunResult and |
| # TaskResultSummary haven't been updated. |
| # - The entities had been updated by a concurrent transaction on another |
| # handler so it was not reapable anyway. This does cause exceptions as |
| # both GET returns the TaskToRunShard.queue_number != None but only one |
| # succeed at the PUT. |
| # |
| # In the first case, we may want to release the claim, while we don't |
| # want to in the later case. The trade off are one of: |
| # - the claim is not released, so the task is not reapable for 15s |
| # - releasing the claim would cause even more contention |
| # |
| # We chose the first one here for now, as the when the DB starts misbehaving |
| # and the index becomes stale, it means the DB is *already* not in good |
| # shape, so it is preferable to not put more stress on it, and skipping a |
| # few tasks for 15s may even actively help the DB to stabilize. |
| # |
| # The bot will reap the next available task in case of failure, no big deal. |
| logging.info('CommitError; reaping failed') |
| if state_changed: |
| ts_mon_metrics.on_task_status_change_scheduler_latency(summary) |
| if to_run: |
| ts_mon_metrics.on_task_to_run_consumed(summary, to_run) |
| return run_result, secret_bytes |
| |
| |
| def _detect_dead_task_async(run_result_key): |
| """Checks if the bot has stopped working on the task. |
| |
| Transactionally updates the entities depending on the state of this task. The |
| task may be retried automatically, canceled or left alone. |
| |
| Returns: |
| ndb.Future that returns tuple(ndb.Key, bool, datetime.timedelta, List[str]) |
| TaskRunResult key if the task was killed, otherwise None |
| True if the task state has changed |
| latency of task dead detection (state transition) |
| tags of task result summary |
| None if no action was done. |
| """ |
| result_summary_key = task_pack.run_result_key_to_result_summary_key( |
| run_result_key) |
| request = task_pack.result_summary_key_to_request_key( |
| result_summary_key).get() |
| if not request: |
| # That's a particularly broken task, there's no TaskRequest in the DB! |
| # |
| # The cleanest thing to do would be to delete the whole entity, but that's |
| # risky. Let's say there's a bug or a runtime issue that makes the DB GET |
| # fail spuriously, we don't want to delete a whole task due to a transient |
| # RPC failure. |
| # |
| # An other option is to create a temporary in-memory TaskRequest() entity, |
| # but it's more trouble than it look like, as we need to populate one that |
| # is valid, and the code in task_result really assumes it is in the DB. |
| # |
| # So for now, just skip it to unblock the cron job. |
| return None |
| |
| now = utils.utcnow() |
| es_cfg = external_scheduler.config_for_task(request) |
| |
| @ndb.tasklet |
| def run(): |
| """Obtain the result and update task state to either KILLED or BOT_DIED. |
| 1x GET, 1x GETs 2~3x PUT. |
| """ |
| run_result = run_result_key.get(use_cache=False, use_memcache=False) |
| run_result._request_cache = request |
| |
| if run_result.state != task_result.State.RUNNING: |
| # It was updated already or not updating last. Likely DB index was stale. |
| raise ndb.Return(None, False, None, None) |
| |
| if not run_result.dead_after_ts or run_result.dead_after_ts > now: |
| raise ndb.Return(None, False, None, None) |
| |
| old_abandoned_ts = run_result.abandoned_ts |
| old_dead_after_ts = run_result.dead_after_ts |
| |
| run_result.signal_server_version() |
| run_result.modified_ts = now |
| run_result.completed_ts = now |
| if not run_result.abandoned_ts: |
| run_result.abandoned_ts = now |
| # set .dead_after_ts to None since the task is terminated. |
| run_result.dead_after_ts = None |
| # mark as internal failure as the task doesn't get completed normally. |
| run_result.internal_failure = True |
| |
| result_summary = result_summary_key.get(use_cache=False, use_memcache=False) |
| result_summary._request_cache = request |
| result_summary.modified_ts = now |
| result_summary.completed_ts = now |
| if not result_summary.abandoned_ts: |
| result_summary.abandoned_ts = now |
| orig_summary_state = result_summary.state |
| |
| # Mark it as KILLED if run_result is in killing state. |
| # Otherwise, mark it BOT_DIED. the bot hasn't been sending for the task. |
| to_put = (run_result, result_summary) |
| if run_result.killing: |
| run_result.killing = False |
| run_result.state = task_result.State.KILLED |
| run_result = _set_fallbacks_to_exit_code_and_duration(run_result, now) |
| # run_result.abandoned_ts is set when run_result.killing == True |
| actual_state_change_time = old_abandoned_ts |
| else: |
| run_result.state = task_result.State.BOT_DIED |
| actual_state_change_time = old_dead_after_ts |
| result_summary.set_from_run_result(run_result, request) |
| |
| logging.warning( |
| 'Updating task state for bot missing. task:%s, state:%s, bot:%s', |
| run_result.task_id, task_result.State.to_string(run_result.state), |
| run_result.bot_id) |
| futures = ndb.put_multi_async(to_put) |
| |
| state_changed = orig_summary_state != result_summary.state |
| if state_changed: |
| _maybe_taskupdate_notify_via_tq(result_summary, |
| request, |
| es_cfg, |
| transactional=True) |
| yield futures |
| latency = utils.utcnow() - actual_state_change_time |
| logging.warning('Task state was successfully updated. task: %s', |
| run_result.task_id) |
| raise ndb.Return(run_result_key, state_changed, latency, |
| result_summary.tags) |
| |
| return datastore_utils.transaction_async(run) |
| |
| |
| def _maybe_pubsub_notify_now(result_summary, request): |
| """Examines result_summary and sends task completion PubSub message. |
| |
| Does it only if result_summary indicates a task in some finished state and |
| the request is specifying pubsub topic. |
| |
| Returns False to trigger the retry (on transient errors), or True if retry is |
| not needed (e.g. messages was sent successfully or fatal error happened). |
| """ |
| assert not ndb.in_transaction() |
| assert isinstance(result_summary, |
| task_result.TaskResultSummary), result_summary |
| assert isinstance(request, task_request.TaskRequest), request |
| if (result_summary.state not in task_result.State.STATES_RUNNING and |
| request.pubsub_topic): |
| task_id = task_pack.pack_result_summary_key(result_summary.key) |
| start_time = utils.milliseconds_since_epoch() |
| try: |
| _pubsub_notify(task_id, request.pubsub_topic, request.pubsub_auth_token, |
| request.pubsub_userdata, result_summary.tags, |
| result_summary.state, start_time) |
| except pubsub.TransientError: |
| return False |
| except pubsub.Error: |
| return True # do not retry it |
| return True |
| |
| |
| def _maybe_pubsub_send_build_task_update(bb_task, build_id, pubsub_topic): |
| """Sends an update message to buildbucket about the task's current status. |
| |
| Arguments: |
| bb_task task_pb2.Task: Created by caller of this funciton to send to |
| Buildbucket. |
| build_id string: buildbucket build id provided by buildbucket. |
| pubsub_topic string: pubsub topic to publish to. Provided by buildbucket. |
| |
| Returns: |
| bool: False if there was a pubsub.TransientError, True otherwise. If False, |
| then the function may be retried. |
| """ |
| assert not ndb.in_transaction() |
| assert isinstance(bb_task, task_pb2.Task), bb_task |
| msg = task_pb2.BuildTaskUpdate(build_id=build_id, task=bb_task) |
| try: |
| pubsub.publish(topic=pubsub_topic, |
| message=msg.SerializeToString(), |
| attributes=None) |
| except pubsub.TransientError as e: |
| http_status_code = e.inner.status_code |
| logging.exception( |
| 'Transient error (status_code=%s) when sending PubSub notification', |
| http_status_code) |
| return False |
| except pubsub.Error as e: |
| http_status_code = e.inner.status_code |
| logging.exception( |
| 'Fatal error (status_code=%s) when sending PubSub notification', |
| http_status_code) |
| return True # do not retry it |
| except Exception as e: |
| logging.exception("Unknown exception (%s) not handled by " \ |
| "_maybe_pubsub_send_build_task_update", e) |
| raise e # raise the error if it is unknown |
| return True |
| |
| |
| def _route_to_go(prod_pct=0, dev_pct=0): |
| """Returns True if it should route to Go. |
| |
| Arguments: |
| prod_pct: percentage of traffic in Prod that should route to Go. |
| dev_pct est: percentage of traffic on Prod that should route to Go. |
| """ |
| |
| pct = prod_pct |
| if utils.is_dev(): |
| pct = dev_pct |
| return random.randint(0, 99) < pct |
| |
| |
| def _maybe_taskupdate_notify_via_tq(result_summary, request, es_cfg, |
| transactional): |
| """Enqueues tasks to send PubSub, es, and bb notifications for given request. |
| |
| Arguments: |
| result_summary: a task_result.TaskResultSummary instance. |
| request: a task_request.TaskRequest instance. |
| es_cfg: a pool_config.ExternalSchedulerConfig instance if one exists |
| for this task, or None otherwise. |
| transactional: if runs as part of a db transaction. |
| |
| Raises CommitError on errors (to abort the transaction). |
| """ |
| assert transactional == ndb.in_transaction() |
| assert isinstance(result_summary, |
| task_result.TaskResultSummary), result_summary |
| assert isinstance(request, task_request.TaskRequest), request |
| task_id = task_pack.pack_result_summary_key(result_summary.key) |
| if request.pubsub_topic: |
| if ('projects/cr-buildbucket' in request.pubsub_topic |
| or _route_to_go(prod_pct=100, dev_pct=100)): |
| now = timestamp_pb2.Timestamp() |
| now.FromDatetime(utils.utcnow()) |
| payload = { |
| 'class': 'pubsub-go', |
| 'body': { |
| 'taskId': task_id, |
| 'topic': request.pubsub_topic, |
| 'authToken': request.pubsub_auth_token, |
| 'userdata': request.pubsub_userdata, |
| 'tags': result_summary.tags, |
| 'state': |
| TaskState.DESCRIPTOR.values_by_number[result_summary.state].name, |
| 'startTime': now.ToJsonString(), |
| } |
| } |
| ok = utils.enqueue_task('/internal/tasks/t/pubsub-go/%s' % task_id, |
| 'pubsub-v2', |
| transactional=transactional, |
| payload=utils.encode_to_json(payload)) |
| else: |
| payload = { |
| 'task_id': task_id, |
| 'topic': request.pubsub_topic, |
| 'auth_token': request.pubsub_auth_token, |
| 'userdata': request.pubsub_userdata, |
| 'tags': result_summary.tags, |
| 'state': result_summary.state, |
| 'start_time': utils.milliseconds_since_epoch() |
| } |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/pubsub/notify-task/%s' % task_id, |
| 'pubsub', |
| transactional=transactional, |
| payload=utils.encode_to_json(payload)) |
| logging.debug("Payload of PubSub msg that was tried to enqueued: %s", |
| str(payload)) |
| if not ok: |
| raise datastore_utils.CommitError('Failed to enqueue pubsub notify task') |
| |
| if es_cfg: |
| external_scheduler.notify_requests( |
| es_cfg, [(request, result_summary)], True, False) |
| |
| if request.has_build_task: |
| if _route_to_go(prod_pct=100, dev_pct=100): |
| go_payload = { |
| 'class': 'buildbucket-notify-go', |
| 'body': { |
| "taskId": task_id, |
| "state": |
| TaskState.DESCRIPTOR.values_by_number[result_summary.state].name, |
| "updateId": utils.time_time_ns(), |
| } |
| } |
| ok = utils.enqueue_task('/internal/tasks/t/buildbucket-notify-go/%s' % |
| task_id, |
| 'buildbucket-notify-go', |
| transactional=transactional, |
| payload=utils.encode_to_json(go_payload)) |
| else: |
| payload = { |
| 'task_id': task_id, |
| 'state': result_summary.state, |
| 'update_id': utils.time_time_ns(), |
| } |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/buildbucket/notify-task/%s' % task_id, |
| 'buildbucket-notify', |
| transactional=transactional, |
| payload=utils.encode_to_json(payload)) |
| if not ok: |
| raise datastore_utils.CommitError( |
| 'Failed to enqueue buildbucket notify task') |
| |
| |
| def _buildbucket_update(request_key, run_result_state, update_id): |
| """Handles sending a pubsub update to buildbucket or not. |
| Arguments: |
| request_key: ndb.Key for a task_request.TaskRequest object. |
| run_result_state: task_result.State enum. |
| update_id: int timestamp for when the update was made. Used by buildbucket |
| to only accept the most up to date updates. |
| Returns: |
| bool. False if there was a transient error. This will allow the caller of |
| the function to be able to retry. True otherwise. |
| """ |
| build_task = task_pack.request_key_to_build_task_key(request_key).get() |
| # Returning early if we shouldn't make the update. |
| if build_task.update_id > update_id: |
| return True |
| if run_result_state == build_task.latest_task_status: |
| return True |
| |
| result_summary = task_pack.request_key_to_result_summary_key(request_key).get( |
| use_cache=False, use_memcache=False) |
| # Need to try to get bot_dimensions from build_task first, if not get it |
| # from result_summary. |
| if build_task.bot_dimensions: |
| bot_dimensions = build_task.bot_dimensions |
| else: |
| bot_dimensions = result_summary.bot_dimensions |
| |
| task_id = task_pack.pack_result_summary_key( |
| task_pack.request_key_to_result_summary_key(request_key)) |
| bb_task = task_pb2.Task( |
| id=task_pb2.TaskID(id=task_id, |
| target="swarming://%s" % |
| app_identity.get_application_id()), |
| update_id=update_id, |
| details=backend_conversions.convert_backend_task_details(bot_dimensions), |
| ) |
| backend_conversions.convert_task_state_to_status(run_result_state, |
| result_summary.failure, |
| bb_task) |
| update_buildbucket_pubsub_success = _maybe_pubsub_send_build_task_update( |
| bb_task, build_task.build_id, build_task.pubsub_topic) |
| # Caller must retry if PubSub enqueue fails with a transient error. |
| if not update_buildbucket_pubsub_success: |
| return False |
| build_task.latest_task_status = run_result_state |
| build_task.update_id = update_id |
| # If build_task doesn't have bot_dimensions, add it. |
| if not build_task.bot_dimensions: |
| build_task.bot_dimensions = bot_dimensions |
| build_task.put() |
| return True |
| |
| |
| def _pubsub_notify(task_id, topic, auth_token, userdata, tags, state, |
| start_time): |
| """Sends PubSub notification about task completion. |
| |
| Raises pubsub.TransientError on transient errors otherwise raises pubsub.Error |
| for fatal errors. |
| """ |
| logging.debug( |
| 'Sending PubSub notify to "%s" (with userdata="%s", tags="%s",' |
| 'state="%s") about completion of "%s"', topic, userdata, tags, state, |
| task_id) |
| msg = {'task_id': task_id} |
| if userdata: |
| msg['userdata'] = userdata |
| http_status_code = 0 |
| try: |
| pubsub.publish( |
| topic=topic, |
| message=utils.encode_to_json(msg), |
| attributes={'auth_token': auth_token} if auth_token else None) |
| http_status_code = 200 |
| except pubsub.TransientError as e: |
| http_status_code = e.inner.status_code |
| logging.exception( |
| 'Transient error (status_code=%s) when sending PubSub notification', |
| http_status_code) |
| raise e |
| except pubsub.Error as e: |
| http_status_code = e.inner.status_code |
| logging.exception( |
| 'Fatal error (status_code=%s) when sending PubSub notification', |
| http_status_code) |
| raise e |
| except Exception as e: |
| logging.exception("Unknown exception (%s) not handled by _pubsub_notify", e) |
| raise e |
| finally: |
| if start_time is not None: |
| now = utils.milliseconds_since_epoch() |
| latency = now - start_time |
| if latency < 0: |
| logging.warning( |
| 'ts_mon_metric pubsub latency %dms (%d - %d) is negative. ' |
| 'Setting latency to 0', latency, now, start_time) |
| latency = 0 |
| logging.debug( |
| 'Updating ts_mon_metric pubsub with latency: %dms (%d - %d)', latency, |
| now, start_time) |
| ts_mon_metrics.on_task_status_change_pubsub_latency( |
| tags, state, http_status_code, latency) |
| |
| |
| def _find_dupe_task(now, h): |
| """Finds a previously run task that is also idempotent and completed. |
| |
| Fetch items that can be used to dedupe the task. See the comment for this |
| property for more details. |
| |
| Do not use "task_result.TaskResultSummary.created_ts > oldest" here because |
| this would require an index. It's unnecessary because TaskRequest.key is |
| equivalent to decreasing TaskRequest.created_ts, ordering by key works as |
| well and doesn't require an index. |
| """ |
| logging.info("_find_dupe_task for properties_hash: %s", h.encode('hex')) |
| # TODO(maruel): Make a reverse map on successful task completion so this |
| # becomes a simple ndb.get(). |
| cls = task_result.TaskResultSummary |
| q = cls.query(cls.properties_hash==h).order(cls.key) |
| for i, dupe_summary in enumerate(q.iter(batch_size=1)): |
| # It is possible for the query to return stale items. |
| if (dupe_summary.state != task_result.State.COMPLETED or |
| dupe_summary.failure): |
| if i == 2: |
| logging.info("indexes are very inconsistent, give up.") |
| return None |
| continue |
| |
| # Refuse tasks older than X days. This is due to the isolate server |
| # dropping files. |
| # TODO(maruel): The value should be calculated from the isolate server |
| # setting and be unbounded when no isolated input was used. |
| oldest = now - datetime.timedelta( |
| seconds=config.settings().reusable_task_age_secs) |
| if dupe_summary.created_ts <= oldest: |
| logging.info("found result (%s) is older than threshold (%s)", |
| dupe_summary.created_ts, oldest) |
| return None |
| logging.info("_find_dupe_task: dupped with %s", dupe_summary.task_id) |
| return dupe_summary |
| return None |
| |
| |
| def _copy_summary(src, dst, skip_list): |
| """Copies the attributes of entity src into dst. |
| |
| It doesn't copy the key nor any member in skip_list. |
| """ |
| # pylint: disable=unidiomatic-typecheck |
| assert type(src) == type(dst), '%s!=%s' % (src.__class__, dst.__class__) |
| # Access to a protected member _XX of a client class - pylint: disable=W0212 |
| kwargs = { |
| k: getattr(src, k) for k in src._properties_fixed() if k not in skip_list |
| } |
| dst.populate(**kwargs) |
| |
| |
| def _dedupe_result_summary(dupe_summary, result_summary, task_slice_index): |
| """Copies the results from dupe_summary into result_summary.""" |
| # PerformanceStats is not copied over, since it's not relevant, nothing |
| # ran. |
| _copy_summary( |
| dupe_summary, result_summary, |
| ('created_ts', 'modified_ts', 'name', 'user', 'tags')) |
| # Copy properties not covered by _properties_fixed(). |
| result_summary.bot_id = dupe_summary.bot_id |
| result_summary.missing_cas = dupe_summary.missing_cas |
| result_summary.missing_cipd = dupe_summary.missing_cipd |
| # Zap irrelevant properties. |
| result_summary.cost_saved_usd = dupe_summary.cost_usd |
| result_summary.costs_usd = [] |
| result_summary.current_task_slice = task_slice_index |
| result_summary.deduped_from = task_pack.pack_run_result_key( |
| dupe_summary.run_result_key) |
| # It is not possible to dedupe against a deduped task, so zap properties_hash. |
| result_summary.properties_hash = None |
| result_summary.try_number = 0 |
| |
| |
| def _is_allowed_to_schedule(pool_cfg): |
| """True if the current caller is allowed to schedule tasks in the pool.""" |
| caller_id = auth.get_current_identity() |
| |
| # Listed directly? |
| if caller_id in pool_cfg.scheduling_users: |
| logging.info( |
| 'Caller "%s" is allowed to schedule tasks in the pool "%s" by being ' |
| 'specified directly in the pool config', caller_id.to_bytes(), |
| pool_cfg.name) |
| return True |
| |
| # Listed through a group? |
| for group in pool_cfg.scheduling_groups: |
| if auth.is_group_member(group, caller_id): |
| logging.info( |
| 'Caller "%s" is allowed to schedule tasks in the pool "%s" by being ' |
| 'referenced via the group "%s" in the pool config', |
| caller_id.to_bytes(), pool_cfg.name, group) |
| return True |
| |
| # Using delegation? |
| delegation_token = auth.get_delegation_token() |
| if not delegation_token: |
| logging.info('No delegation token') |
| return False |
| |
| # Log relevant info about the delegation to simplify debugging. |
| peer_id = auth.get_peer_identity() |
| token_tags = set(delegation_token.tags or []) |
| logging.info( |
| 'Using delegation, delegatee is "%s", delegation tags are %s', |
| peer_id.to_bytes(), sorted(map(str, token_tags))) |
| |
| # Is the delegatee listed in the config? |
| trusted_delegatee = pool_cfg.trusted_delegatees.get(peer_id) |
| if not trusted_delegatee: |
| logging.warning('The delegatee "%s" is unknown', peer_id.to_bytes()) |
| return False |
| |
| # Are any of the required delegation tags present in the token? |
| cross = token_tags & trusted_delegatee.required_delegation_tags |
| if not cross: |
| cross = set() |
| wildcard_delegation_tags = filter( |
| lambda t: t.endswith('/*'), trusted_delegatee.required_delegation_tags) |
| for t in wildcard_delegation_tags: |
| t = t[:-1] |
| cross |= set(filter(lambda tt: tt.startswith(t), token_tags)) |
| if cross: |
| logging.info( |
| 'Caller "%s" is allowed to schedule tasks in the pool "%s" by acting ' |
| 'through a trusted delegatee "%s" that set the delegation tags %s', |
| caller_id.to_bytes(), pool_cfg.name, peer_id.to_bytes(), |
| sorted(map(str, cross))) |
| return True |
| |
| logging.warning( |
| 'Expecting any of %s tags, got %s, forbidding the call', |
| sorted(map(str, trusted_delegatee.required_delegation_tags)), |
| sorted(map(str, token_tags))) |
| return False |
| |
| |
| def _bot_update_tx(run_result_key, bot_id, output, output_chunk_start, |
| exit_code, duration, hard_timeout, io_timeout, cost_usd, |
| cas_output_root, cipd_pins, need_cancel, performance_stats, |
| now, result_summary_key, request, es_cfg, canceled): |
| """Runs the transaction for bot_update_task(). |
| |
| es_cfg is only required when need_cancel is True. |
| |
| Returns tuple(TaskRunResult, TaskResultSummary, str(error)). |
| |
| Any error is returned as a string to be passed to logging.error() instead of |
| logging inside the transaction for performance. |
| """ |
| # 2 or 3 consecutive GETs, one PUT. |
| # |
| # Assumptions: |
| # - duration and exit_code are both set or not set. That's not always true for |
| # TIMED_OUT/KILLED. |
| # - same for run_result. |
| # - if one of hard_timeout or io_timeout is set, duration is also set. |
| # - hard_timeout or io_timeout can still happen in the case of killing. This |
| # still needs to result in KILLED, not TIMED_OUT. |
| logging.info('Starting transaction attempt') |
| |
| run_result, result_summary = ndb.get_multi( |
| [run_result_key, result_summary_key], use_cache=False, use_memcache=False) |
| if not run_result: |
| return None, None, 'is missing' |
| |
| if run_result.bot_id != bot_id: |
| return None, None, ( |
| 'expected bot (%s) but had update from bot %s' % ( |
| run_result.bot_id, bot_id)) |
| |
| if not run_result.started_ts: |
| return None, None, 'TaskRunResult is broken; %s' % ( |
| run_result.to_dict()) |
| |
| if exit_code is not None: |
| if run_result.exit_code is not None: |
| # This happens as an HTTP request is retried when the DB write succeeded |
| # but it still returned HTTP 500. |
| if run_result.exit_code != exit_code: |
| return None, None, 'got 2 different exit_code; %s then %s' % ( |
| run_result.exit_code, exit_code) |
| if run_result.duration != duration: |
| return None, None, 'got 2 different durations; %s then %s' % ( |
| run_result.duration, duration) |
| else: |
| run_result.duration = duration |
| run_result.exit_code = exit_code |
| |
| if cas_output_root: |
| run_result.cas_output_root = cas_output_root |
| |
| if cipd_pins: |
| run_result.cipd_pins = cipd_pins |
| |
| if run_result.state in task_result.State.STATES_RUNNING: |
| # Task was still registered as running. Look if it should be terminated now. |
| if run_result.killing: |
| if canceled: |
| # A user requested to cancel the task while setting up the task. |
| # Since the task hasn't started running yet, we can mark the state as |
| # CANCELED. |
| run_result.killing = False |
| run_result.state = task_result.State.CANCELED |
| # reset duration, exit_code since not allowed |
| run_result.duration = None |
| run_result.exit_code = None |
| run_result.completed_ts = now |
| elif duration is not None: |
| # A user requested to cancel the task while it was running. Since the |
| # task is now stopped, we can tag the task result as KILLED. |
| run_result.killing = False |
| run_result.state = task_result.State.KILLED |
| run_result.completed_ts = now |
| else: |
| # The bot is still executing the task in this path. The server should |
| # return killing signal to the bot. After the bot stopped the task, |
| # the task update will include `duration` and go to the above path to |
| # mark TaskRunResult completed finally. |
| pass |
| else: |
| if hard_timeout or io_timeout: |
| # This needs to be changed with new state TERMINATING; |
| # https://crbug.com/916560 |
| run_result.state = task_result.State.TIMED_OUT |
| run_result.completed_ts = now |
| run_result = _set_fallbacks_to_exit_code_and_duration(run_result, now) |
| elif run_result.exit_code is not None: |
| run_result.state = task_result.State.COMPLETED |
| run_result.completed_ts = now |
| |
| run_result.signal_server_version() |
| to_put = [run_result] |
| if output: |
| # This does 1 multi GETs. This also modifies run_result in place. |
| to_put.extend(run_result.append_output(output, output_chunk_start or 0)) |
| if performance_stats: |
| performance_stats.key = task_pack.run_result_key_to_performance_stats_key( |
| run_result.key) |
| to_put.append(performance_stats) |
| |
| run_result.cost_usd = max(cost_usd, run_result.cost_usd or 0.) |
| run_result.modified_ts = now |
| if run_result.state in task_result.State.STATES_RUNNING: |
| run_result.dead_after_ts = now + datetime.timedelta( |
| seconds=request.bot_ping_tolerance_secs) |
| else: |
| run_result.dead_after_ts = None |
| |
| result_summary.set_from_run_result(run_result, request) |
| to_put.append(result_summary) |
| |
| if need_cancel and run_result.state in task_result.State.STATES_RUNNING: |
| logging.info('Calling _cancel_task_tx') |
| _cancel_task_tx(request, result_summary, True, bot_id, now, es_cfg, |
| run_result) |
| |
| logging.info('Storing %d entities: %s', len(to_put), [e.key for e in to_put]) |
| ndb.put_multi(to_put) |
| |
| logging.info('Committing transaction') |
| return result_summary, run_result, None |
| |
| |
| def _set_fallbacks_to_exit_code_and_duration(run_result, now): |
| """Sets fallback values to exit_code and duration""" |
| if run_result.exit_code is None: |
| run_result.exit_code = -1 |
| if not run_result.duration: |
| # Calculate an approximate time. |
| run_result.duration = (now - run_result.started_ts).total_seconds() |
| return run_result |
| |
| |
| def _cancel_task_tx(request, |
| result_summary, |
| kill_running, |
| bot_id, |
| now, |
| es_cfg, |
| run_result=None): |
| """Runs the transaction for cancel_task(). |
| |
| Arguments: |
| request: TaskRequest instance to cancel. |
| result_summary: result summary for request to cancel. Will be mutated in |
| place and then stored into Datastore. |
| kill_running: if true, allow cancelling a task in RUNNING state. |
| bot_id: if specified, only cancel task if it is RUNNING on this bot. Cannot |
| be specified if kill_running is False. |
| now: timestamp used to update result_summary and run_result. |
| es_cfg: pools_config.ExternalSchedulerConfig for external scheduler. |
| run_result: Used when this is given, otherwise took from result_summary. |
| |
| Returns: |
| tuple(bool, bool) |
| - True if the cancellation succeeded. Either the task atomically changed |
| from PENDING to CANCELED or it was RUNNING and killing bit has been set. |
| - True if the task was running while it was canceled. |
| """ |
| was_running = result_summary.state == task_result.State.RUNNING |
| |
| # Finished tasks can't be canceled. |
| if not result_summary.can_be_canceled: |
| return False, was_running |
| # Deny cancelling a non-running task if bot_id was specified. |
| if not was_running and bot_id: |
| return False, was_running |
| # Deny canceling a task that started. |
| if was_running and not kill_running: |
| return False, was_running |
| # Deny cancelling a task if it runs on unexpected bot. |
| if was_running and bot_id and bot_id != result_summary.bot_id: |
| return False, was_running |
| |
| to_put = [result_summary] |
| result_summary.abandoned_ts = now |
| result_summary.modified_ts = now |
| |
| to_runs = [] |
| if not was_running: |
| # The task is in PENDING state now and can be canceled right away. |
| result_summary.state = task_result.State.CANCELED |
| result_summary.completed_ts = now |
| |
| # This should return 1 entity (the pending TaskToRunShard). |
| to_runs = task_to_run.get_task_to_runs( |
| request, result_summary.current_task_slice or 0) |
| for to_run in to_runs: |
| to_run.consume(None) |
| to_put.append(to_run) |
| else: |
| # The task in RUNNING state now. Do not change state to KILLED yet. Instead, |
| # use a two step protocol: |
| # - set killing to True |
| # - on next bot report, tell the bot to kill the task |
| # - once the bot reports the task as terminated, set state to KILLED |
| run_result = run_result or result_summary.run_result_key.get( |
| use_cache=False, use_memcache=False) |
| run_result.killing = True |
| run_result.abandoned_ts = now |
| run_result.modified_ts = now |
| to_put.append(run_result) |
| |
| futures = ndb.put_multi_async(to_put) |
| _maybe_taskupdate_notify_via_tq(result_summary, |
| request, |
| es_cfg, |
| transactional=True) |
| |
| # Enqueue TQ tasks to cancel RBE reservations if in the RBE mode. |
| if request.rbe_instance: |
| for ttr in to_runs: |
| if ttr.rbe_reservation: |
| rbe.enqueue_rbe_cancel(request, ttr) |
| |
| for f in futures: |
| f.check_success() |
| |
| return True, was_running |
| |
| |
| def _get_task_from_external_scheduler(es_cfg, bot_dimensions): |
| """Gets a task to run from external scheduler. |
| |
| Arguments: |
| es_cfg: pool_config.ExternalSchedulerConfig instance. |
| bot_dimensions: dimensions {string key: list of string values} |
| |
| Returns: (TaskRequest, TaskToRunShard) if a task was available, |
| or (None, None) otherwise. |
| """ |
| task_id, slice_number = external_scheduler.assign_task(es_cfg, bot_dimensions) |
| if not task_id: |
| return None, None |
| |
| logging.info('Got task id %s', task_id) |
| request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
| logging.info('Determined request_key, result_key %s, %s', request_key, |
| result_key) |
| request = request_key.get() |
| result_summary = result_key.get(use_cache=False, use_memcache=False) |
| |
| logging.info('Determined slice_number %s', slice_number) |
| |
| to_run, raise_exception = _ensure_active_slice(request, slice_number) |
| if not to_run: |
| # We were unable to ensure the given request was at the desired slice. This |
| # means the external scheduler must have stale state about this request, so |
| # notify it of the newest state. |
| external_scheduler.notify_requests( |
| es_cfg, [(request, result_summary)], True, False) |
| if raise_exception: |
| raise external_scheduler.ExternalSchedulerException( |
| 'unable to ensure active slice for task %s' % task_id) |
| |
| return request, to_run |
| |
| |
| def _ensure_active_slice(request, task_slice_index): |
| """Ensures the existence of a TaskToRunShard for the given request, slice. |
| |
| Ensure that the given request is currently active at a given task_slice_index |
| (modifying the current try or slice if necessary), and that |
| no other TaskToRunShard is pending. |
| |
| This is intended for use as part of the external scheduler flow. |
| |
| Internally, this runs up to 2 GETs and 1 PUT in a transaction. |
| |
| Arguments: |
| request: TaskRequest instance |
| task_slice_index: slice index to ensure is active. |
| |
| Returns: |
| TaskToRunShard: A saved TaskToRunShard instance corresponding to the given |
| request, and slice, if exists, or None otherwise. |
| Boolean: Whether or not it should raise exception |
| """ |
| # External scheduler and RBE scheduler do not mix. |
| assert not request.rbe_instance |
| |
| def run(): |
| logging.debug('_ensure_active_slice(%s, %d)', request.task_id, |
| task_slice_index) |
| to_runs = task_to_run.get_task_to_runs(request, request.num_task_slices - 1) |
| to_runs = [r for r in to_runs if r.queue_number] |
| if to_runs: |
| if len(to_runs) != 1: |
| logging.warning('_ensure_active_slice: %s != 1 TaskToRunShards', |
| len(to_runs)) |
| return None, True |
| assert len(to_runs) == 1, 'Too many pending TaskToRunShards.' |
| |
| to_run = to_runs[0] if to_runs else None |
| |
| if to_run: |
| if to_run.task_slice_index == task_slice_index: |
| logging.debug('_ensure_active_slice: already active') |
| return to_run, False |
| |
| # Deactivate old TaskToRunShard, create new one. |
| to_run.consume(None) |
| new_to_run = task_to_run.new_task_to_run(request, task_slice_index) |
| ndb.put_multi([to_run, new_to_run]) |
| logging.debug('_ensure_active_slice: added new TaskToRunShard') |
| return new_to_run, False |
| |
| result_summary = task_pack.request_key_to_result_summary_key( |
| request.key).get(use_cache=False, use_memcache=False) |
| if not result_summary: |
| logging.warning( |
| '_ensure_active_slice: no TaskToRunShard or TaskResultSummary') |
| return None, True |
| |
| if not result_summary.is_pending: |
| logging.debug( |
| '_ensure_active_slice: request is not PENDING.' |
| ' state: "%s", task_id: "%s"', |
| result_summary.to_string(), result_summary.task_id) |
| # just notify to external scheudler without exception |
| return None, False |
| |
| new_to_run = task_to_run.new_task_to_run(request, task_slice_index) |
| new_to_run.put() |
| logging.debug( |
| 'ensure_active_slice: added new TaskToRunShard (no previous one)') |
| return new_to_run, False |
| |
| return datastore_utils.transaction(run) |
| |
| |
| def _bot_reap_task_external_scheduler(bot_dimensions, bot_details, es_cfg): |
| """Reaps a TaskToRunShard (chosen by external scheduler) if available. |
| |
| This is a simpler version of bot_reap_task that skips a lot of the steps |
| normally taken by the native scheduler. |
| |
| Arguments: |
| - bot_dimensions: The dimensions of the bot as a dictionary in |
| {string key: list of string values} format. |
| - bot_details: a BotDetails tuple, non-essential but would be propagated to |
| the task. |
| - es_cfg: ExternalSchedulerConfig for this bot. |
| """ |
| request, to_run = _get_task_from_external_scheduler(es_cfg, bot_dimensions) |
| if not request or not to_run: |
| return None, None, None |
| |
| run_result, secret_bytes = _reap_task(bot_dimensions, bot_details, to_run.key, |
| request) |
| if not run_result: |
| raise external_scheduler.ExternalSchedulerException( |
| 'failed to reap %s' % task_pack.pack_request_key(to_run.request_key)) |
| |
| logging.info('Reaped (external scheduler): %s', run_result.task_id) |
| return request, secret_bytes, run_result |
| |
| |
| def _should_allow_es_fallback(es_cfg, request): |
| """Determines whether to allow external scheduler fallback to the given task. |
| |
| Arguments: |
| - es_cfg: ExternalSchedulerConfig to potentially fallback from. |
| - request: TaskRequest in question to fallback to. |
| """ |
| task_es_cfg = external_scheduler.config_for_task(request) |
| if not task_es_cfg: |
| # Other task is not es-owned. Allow fallback. |
| return True |
| |
| if not es_cfg.allow_es_fallback: |
| # Fallback to es-owned task is disabled. |
| return False |
| |
| # Allow fallback if task uses precisely the same external scheduler as |
| # the one we are falling back from. |
| return task_es_cfg == es_cfg |
| |
| |
| ### Public API. |
| |
| |
| def exponential_backoff(attempt_num): |
| """Returns an exponential backoff value in seconds.""" |
| assert attempt_num >= 0 |
| if random.random() < _PROBABILITY_OF_QUICK_COMEBACK: |
| # Randomly ask the bot to return quickly. |
| return 1.0 |
| |
| # If the user provided a max then use it, otherwise use default 60s. |
| max_wait = config.settings().max_bot_sleep_time or 60. |
| return min(max_wait, math.pow(1.5, min(attempt_num, 10) + 1)) |
| |
| |
| def check_schedule_request_acl_caller(pool_cfg): |
| if not _is_allowed_to_schedule(pool_cfg): |
| raise auth.AuthorizationError( |
| 'User "%s" is not allowed to schedule tasks in the pool "%s", ' |
| 'see pools.cfg' % |
| (auth.get_current_identity().to_bytes(), pool_cfg.name)) |
| |
| |
| def check_schedule_request_acl_service_account(request): |
| # request.service_account can be 'bot' or 'none'. We don't care about these, |
| # they are always allowed. We care when the service account is a real email. |
| has_service_account = service_accounts_utils.is_service_account( |
| request.service_account) |
| if has_service_account: |
| raise auth.AuthorizationError( |
| 'Task service account "%s" as specified in the task request is not ' |
| 'allowed to be used in the pool "%s".' % |
| (request.service_account, request.pool)) |
| |
| |
| def schedule_request(request, |
| request_id=None, |
| enable_resultdb=False, |
| secret_bytes=None, |
| build_task=None): |
| """Creates and stores all the entities to schedule a new task request. |
| |
| Assumes the request was already processed by api_helpers.process_task_request |
| and the ACL check already happened there. |
| |
| Uses the scheduling algorithm specified by request.scheduling_algorithm field. |
| |
| The number of entities created is ~4: TaskRequest, TaskToRunShard and |
| TaskResultSummary and (optionally) SecretBytes. They are in single entity |
| group and saved in a single transaction. |
| |
| Arguments: |
| - request: TaskRequest entity to be saved in the DB. It's key must not be set |
| and the entity must not be saved in the DB yet. It will be mutated |
| to have the correct key and have `properties_hash` populated. |
| - request_id: Optional string. Used to pull TaskRequestID from datastore. |
| If request_id is not provided, there is no |
| guarantee of idempotency. |
| - enable_resultdb: Optional Boolean (default False) of whether we use resultdb |
| or not for this task. |
| - secret_bytes: Optional SecretBytes entity to be saved in the DB. It's key |
| will be set and the entity will be stored by this function. |
| - build_task: Optional BuildTask entity to be saved in the DB. It's key will |
| be set and the entity will be stored by this function. |
| Returns: |
| TaskResultSummary. TaskToRunShard is not returned. |
| """ |
| assert isinstance(request, task_request.TaskRequest), request |
| assert not request.key, request.key |
| |
| def _get_task_result_summary_from_task_id(task_id, request): |
| """Given a task_id, the corresponding TaskResultSummary is pulled. |
| The request object is also modified in place to associate the key |
| with the already existing key. |
| |
| Arguments: |
| - task_id: string id to pull the TaskResultSummary |
| - request: TaskRequest to be modified |
| Returns: |
| TaskResultSummary. |
| """ |
| req_key, result_summary_key = task_pack.get_request_and_result_keys(task_id) |
| request.key = req_key |
| return result_summary_key.get(use_cache=False, use_memcache=False) |
| |
| task_req_to_id_key = None |
| if request_id: |
| task_req_to_id_key = task_request.TaskRequestID.create_key(request_id) |
| task_req_to_id = task_req_to_id_key.get() |
| if task_req_to_id: |
| return _get_task_result_summary_from_task_id(task_req_to_id.task_id, |
| request) |
| |
| # Register the dimension set in the native scheduler only when not on RBE. |
| task_asserted_future = None |
| if not request.rbe_instance: |
| task_asserted_future = task_queues.assert_task_async(request) |
| |
| now = utils.utcnow() |
| |
| # Note: this key is not final. We may need to change it in the transaction |
| # below if it is already occupied. |
| request.key = task_request.new_request_key() |
| result_summary = task_result.new_result_summary(request) |
| result_summary.modified_ts = now |
| |
| # Precalculate all property hashes in advance. That way even if we end up |
| # using e.g. first task slice, all hashes will still be populated (for BQ |
| # export). |
| for i in range(request.num_task_slices): |
| request.task_slice(i).precalculate_properties_hash(secret_bytes) |
| |
| # If have results for any idempotent slice, reuse it. No need to run anything. |
| dupe_summary = None |
| for i in range(request.num_task_slices): |
| t = request.task_slice(i) |
| if t.properties.idempotent: |
| dupe_summary = _find_dupe_task(now, t.properties_hash) |
| if dupe_summary: |
| _dedupe_result_summary(dupe_summary, result_summary, i) |
| # In this code path, there's not much to do as the task will not be run, |
| # previous results are returned. We still need to store the TaskRequest |
| # and TaskResultSummary. |
| # Since the task is never scheduled, TaskToRunShard is not stored. |
| # Since the has_secret_bytes/has_build_task property is already set |
| # for UI purposes, and the task itself will never be run, we skip |
| # storing the SecretBytes/BuildTask, as they would never be read and |
| # will just consume space in the datastore (and the task we deduplicated |
| # with will have them stored anyway, if we really want to get them |
| # again). |
| secret_bytes = None |
| build_task = None |
| break |
| |
| to_run = None |
| resultdb_update_token_future = None |
| |
| if not dupe_summary: |
| # The task has to run. Find a slice index that should be launched based |
| # on available capacity. For RBE tasks always start with zeroth slice, we |
| # have no visibility into RBE capacity here. If there are slices that can't |
| # execute due to missing bots, there will be a ping pong game between |
| # Swarming and RBE skipping them. |
| for index in range(request.num_task_slices): |
| t = request.task_slice(index) |
| if (request.rbe_instance or t.wait_for_capacity |
| or bot_management.has_capacity(t.properties.dimensions)): |
| # Pick this slice as the current. |
| result_summary.current_task_slice = index |
| to_run = task_to_run.new_task_to_run(request, index) |
| if enable_resultdb: |
| # TODO(vadimsh): The invocation may end up associated with wrong |
| # task ID if we end up changing `request.key` in the transaction |
| # below due to a collision. |
| resultdb_update_token_future = resultdb.create_invocation_async( |
| task_pack.pack_run_result_key(to_run.run_result_key), |
| request.realm, request.execution_deadline) |
| break |
| else: |
| # No available capacity for any slice. Fail the task right away. |
| to_run = None |
| secret_bytes = None |
| result_summary.abandoned_ts = result_summary.created_ts |
| result_summary.completed_ts = result_summary.created_ts |
| result_summary.state = task_result.State.NO_RESOURCE |
| |
| # Determine external scheduler (if relevant) prior to making task live, to |
| # make HTTP handler return as fast as possible after making task live. |
| es_cfg = None |
| if not request.rbe_instance: |
| es_cfg = external_scheduler.config_for_task(request) |
| |
| # This occasionally triggers a task queue. May throw, which is surfaced to the |
| # user but it is safe as the task request wasn't stored yet. |
| if task_asserted_future: |
| task_asserted_future.get_result() |
| |
| # Wait until ResultDB invocation is created to get its update token. |
| if resultdb_update_token_future: |
| request.resultdb_update_token = resultdb_update_token_future.get_result() |
| result_summary.resultdb_info = task_result.ResultDBInfo( |
| hostname=urlparse.urlparse(config.settings().resultdb.server).hostname, |
| invocation=resultdb.get_invocation_name( |
| task_pack.pack_run_result_key(to_run.run_result_key)), |
| ) |
| |
| def reparent(key): |
| """Changes entity group key for all entities being stored.""" |
| request.key = key |
| result_summary.key = task_pack.request_key_to_result_summary_key(key) |
| if to_run: |
| to_run.key = ndb.Key(to_run.key.kind(), to_run.key.id(), parent=key) |
| if request.rbe_instance: |
| to_run.populate_rbe_reservation() |
| if secret_bytes: |
| assert request.secret_bytes_key |
| secret_bytes.key = request.secret_bytes_key |
| if build_task: |
| build_task.key = request.build_task_key |
| |
| # Populate all initial keys. |
| reparent(request.key) |
| |
| # This is used to detect if we actually already stored entities in `txn` |
| # on a retry after a transient error. |
| request.txn_uuid = str(uuid.uuid4()) |
| |
| def txn(): |
| """Returns True if stored everything, False on an ID collision.""" |
| # Checking if a request_uuid => task_id relationship exists. |
| task_req_to_id = None |
| if task_req_to_id_key: |
| task_req_to_id = task_req_to_id_key.get() |
| if task_req_to_id: |
| raise TaskExistsException(task_req_to_id.task_id) |
| expire_at = utils.utcnow() + datetime.timedelta(days=7) |
| task_req_to_id = task_request.TaskRequestID( |
| key=task_req_to_id_key, |
| task_id=task_pack.pack_result_summary_key(result_summary.key), |
| expire_at=expire_at) |
| |
| existing = request.key.get() |
| if existing: |
| return existing.txn_uuid == request.txn_uuid |
| if to_run and request.rbe_instance: |
| rbe.enqueue_rbe_task(request, to_run) |
| ndb.put_multi( |
| filter(bool, [ |
| request, result_summary, to_run, secret_bytes, build_task, |
| task_req_to_id |
| ])) |
| return True |
| |
| # Try to transactionally insert the request retrying on task ID collisions |
| # (which should be rare). |
| attempt = 0 |
| while True: |
| attempt += 1 |
| try: |
| if datastore_utils.transaction(txn, |
| retries=3, |
| xg=bool(task_req_to_id_key)): |
| break |
| # There was an existing *different* entity. We need a new root key. |
| prev = result_summary.task_id |
| reparent(task_request.new_request_key()) |
| logging.warning('Task ID collision: %s already exists, using %s instead', |
| prev, result_summary.task_id) |
| except datastore_utils.CommitError: |
| # The transaction likely failed. Retry with the same key to confirm. |
| pass |
| except TaskExistsException as e: |
| logging.warning( |
| 'Could not schedule new task because task already exists: %s' % |
| e.message) |
| return _get_task_result_summary_from_task_id(e.message, request) |
| logging.debug('Committed txn on attempt %d', attempt) |
| |
| # Note: This external_scheduler call is blocking, and adds risk |
| # of the HTTP handler being slow or dying after the task was already made |
| # live. On the other hand, this call is only being made for tasks in a pool |
| # that use an external scheduler, and which are not effectively live unless |
| # the external scheduler is aware of them. |
| # |
| # TODO(vadimsh): This should happen via a transactionally enqueued TQ task. |
| if es_cfg: |
| external_scheduler.notify_requests(es_cfg, [(request, result_summary)], |
| False, False) |
| |
| if dupe_summary: |
| logging.debug( |
| 'New request %s reusing %s', result_summary.task_id, |
| dupe_summary.task_id) |
| elif result_summary.state == task_result.State.NO_RESOURCE: |
| logging.warning( |
| 'New request %s denied with NO_RESOURCE', result_summary.task_id) |
| logging.debug('New request %s', result_summary.task_id) |
| else: |
| logging.debug('New request %s', result_summary.task_id) |
| |
| ts_mon_metrics.on_task_requested(result_summary, bool(dupe_summary)) |
| |
| # Either the task was deduped, or forcibly refused. Notify through PubSub. |
| if result_summary.state != task_result.State.PENDING: |
| # TODO(vadimsh): This should be moved into txn() above. |
| _maybe_taskupdate_notify_via_tq(result_summary, |
| request, |
| es_cfg, |
| transactional=False) |
| ts_mon_metrics.on_task_status_change_scheduler_latency(result_summary) |
| return result_summary |
| |
| |
| def bot_reap_task(bot_dimensions, queues, bot_details, deadline): |
| """Reaps a TaskToRunShard if one is available. |
| |
| The process is to find a TaskToRunShard where its .queue_number is set, then |
| create a TaskRunResult for it. |
| |
| Arguments: |
| - bot_dimensions: The dimensions of the bot as a dictionary in |
| {string key: list of string values} format. |
| - queues: a list of integers with dimensions hashes of queues to poll. |
| - bot_details: a BotDetails tuple, non-essential but would be propagated to |
| the task. |
| - deadline: datetime.datetime of when to give up. |
| |
| Returns: |
| tuple of (TaskRequest, SecretBytes, TaskRunResult) for the task that was |
| reaped. The TaskToRunShard involved is not returned. |
| """ |
| start = time.time() |
| bot_id = bot_dimensions[u'id'][0] |
| es_cfg = external_scheduler.config_for_bot(bot_dimensions) |
| if es_cfg: |
| request, secret_bytes, to_run_result = _bot_reap_task_external_scheduler( |
| bot_dimensions, bot_details, es_cfg) |
| if request: |
| return request, secret_bytes, to_run_result |
| logging.info('External scheduler did not reap any tasks, trying native ' |
| 'scheduler.') |
| |
| # Used to filter out task requests that don't match bot's dimensions. This |
| # can happen if there's a collision on dimension's hash (which is a 32bit |
| # number). |
| match_bot_dimensions = task_to_run.dimensions_matcher(bot_dimensions) |
| |
| # Pool is used exclusively for monitoring metrics to have some break down of |
| # performance by pool. |
| pool = (bot_dimensions.get(u'pool') or ['-'])[0] |
| |
| # Allocate ~10s for _reap_task. |
| scan_deadline = deadline - datetime.timedelta(seconds=10) |
| |
| iterated = 0 |
| reenqueued = 0 |
| expired = 0 |
| failures = 0 |
| stale_index = 0 |
| try: |
| q = task_to_run.yield_next_available_task_to_dispatch( |
| bot_id, pool, queues, match_bot_dimensions, scan_deadline) |
| for to_run in q: |
| iterated += 1 |
| request = task_to_run.task_to_run_key_to_request_key(to_run.key).get() |
| |
| # When falling back from external scheduler, ignore other es-owned tasks. |
| if es_cfg and not _should_allow_es_fallback(es_cfg, request): |
| logging.debug('Skipped es-owned request %s during es fallback', |
| request.task_id) |
| continue |
| |
| now = utils.utcnow() |
| |
| # Hard limit to schedule this task. |
| expiration_ts = request.expiration_ts |
| |
| # If we found bot that can run the task slice, even if it is slightly |
| # expired, we prefer to run it than fallback. |
| slice_expiration = request.created_ts |
| slice_index = task_to_run.task_to_run_key_slice_index(to_run.key) |
| for i in range(slice_index + 1): |
| t = request.task_slice(i) |
| slice_expiration += datetime.timedelta(seconds=t.expiration_secs) |
| if expiration_ts < now <= slice_expiration: |
| logging.info('Task slice is expired, but task is not expired; ' |
| 'slice index: %d, slice expiration: %s, ' |
| 'task expiration: %s', |
| slice_index, slice_expiration, expiration_ts) |
| |
| if expiration_ts < now: |
| # The whole task request has expired. |
| if expired >= 5: |
| # Do not try to expire too many tasks in one poll request, as this |
| # kills the polling performance in case of degenerate queue: this |
| # happens in the situation where a large backlog >10000 of tasks are |
| # expiring simultaneously. |
| logging.info('Too many inline expiration; skipping') |
| failures += 1 |
| continue |
| |
| # Expiring a TaskToRunShard for TaskSlice may reenqueue a new |
| # TaskToRunShard. Don't try to grab a claim: we already hold it. |
| summary, new_to_run = _expire_slice(request, |
| to_run.key, |
| task_result.State.EXPIRED, |
| claim=False, |
| txn_retries=1, |
| txn_catch_errors=True, |
| reason='bot_reap_task') |
| if not new_to_run: |
| if summary: |
| expired += 1 |
| else: |
| stale_index += 1 |
| continue |
| |
| # Under normal circumstances, this code path should not be executed, |
| # since we already looked for the whole task request expiration, instead |
| # of the current task slice expiration. But let's handle this, just in |
| # case. |
| reenqueued += 1 |
| # We need to do an adhoc validation to check the new TaskToRunShard, |
| # so see if we can harvest it too. This is slightly duplicating work in |
| # yield_next_available_task_to_dispatch(). |
| slice_index = task_to_run.task_to_run_key_slice_index(new_to_run.key) |
| t = request.task_slice(slice_index) |
| if not match_bot_dimensions(t.properties.dimensions): |
| continue |
| |
| # Try to claim it for ourselves right away. On failure, give it up for |
| # some other bot to claim. |
| if not task_to_run.Claim.obtain(new_to_run.key): |
| continue |
| to_run = new_to_run |
| |
| run_result, secret_bytes = _reap_task(bot_dimensions, bot_details, |
| to_run.key, request) |
| if not run_result: |
| failures += 1 |
| # Sad thing is that there is not way here to know the try number. |
| logging.info( |
| 'failed to reap: %s0', |
| task_pack.pack_request_key(to_run.request_key)) |
| continue |
| |
| # We successfully reaped a task. |
| logging.info('Reaped: %s', run_result.task_id) |
| if es_cfg: |
| # We reaped a task after falling back from external scheduler. Keep |
| # it informed about the reaped task. |
| external_scheduler.notify_requests(es_cfg, [(request, run_result)], |
| True, False) |
| logging.debug('TODO(crbug.com/1186759): expiration_ts %s', |
| to_run.expiration_ts) |
| return request, secret_bytes, run_result |
| return None, None, None |
| finally: |
| logging.debug( |
| 'bot_reap_task(%s) in %.3fs: %d iterated, %d reenqueued, %d expired, ' |
| '%d stale_index, %d failed', bot_id, |
| time.time() - start, iterated, reenqueued, expired, stale_index, |
| failures) |
| |
| |
| def bot_claim_slice(bot_dimensions, bot_details, to_run_key, claim_id): |
| """Transactionally assigns the given task slice to the given bot. |
| |
| Unlike bot_reap_task, this function doesn't try to find a matching task |
| in Swarming scheduler queues and instead accepts a candidate TaskToRun as |
| input. |
| |
| TODO(vadimsh): Stop returning TaskRunResult, it doesn't really carry any |
| new information (`task_id` and task slice index are already available in |
| `to_run_key`). This will require more refactorings in handlers_bot.py. |
| |
| Arguments: |
| bot_dimensions: The dimensions of the bot as a dictionary in |
| {string key: list of string values} format. |
| bot_details: a BotDetails tuple, non-essential but would be propagated to |
| the task. |
| to_run_key: a key of TaskToRunShardXXX entity identifying a slice to claim. |
| claim_id: a string identifying this claim operation, for idempotency. |
| |
| Returns: |
| Tuple of (TaskRequest, SecretBytes, TaskRunResult) on success. |
| |
| Raises: |
| ClaimError on fatal errors (e.g. the slice was already claimed or expired). |
| CommitError etc. on transient datastore errors. |
| """ |
| assert claim_id |
| |
| to_run = to_run_key.get() |
| if not to_run: |
| raise ClaimError('No task slice') |
| |
| # Check dimensions match before trusting any other inputs. This is a security |
| # check. A bot must not be able to claim slices it doesn't match even if the |
| # bot knows their IDs. |
| match_bot_dimensions = task_to_run.dimensions_matcher(bot_dimensions) |
| if not match_bot_dimensions(to_run.dimensions): |
| raise ClaimError('Dimensions mismatch') |
| |
| # Check the slice is still available before running the transaction. |
| if not to_run.is_reapable: |
| if to_run.claim_id != claim_id: |
| raise ClaimError('No longer available') |
| # The caller already holds the claim and this is a retry. Just fetch all |
| # entities that should already exist. |
| run_result_key = task_pack.request_key_to_run_result_key(to_run.request_key) |
| request, run_result = ndb.get_multi([to_run.request_key, run_result_key], |
| use_cache=False, |
| use_memcache=False) |
| if not run_result: |
| raise Error('TaskRunResult is unexpectedly missing') |
| if run_result.current_task_slice != to_run.task_slice_index: |
| raise ClaimError('Obsolete') |
| secret_bytes = None |
| if request.task_slice(to_run.task_slice_index).properties.has_secret_bytes: |
| secret_bytes = request.secret_bytes_key.get() |
| return request, secret_bytes, run_result |
| |
| # Transactionally claim the slice. This can raise ClaimError or CommitError. |
| request = to_run.request_key.get() |
| run_result, secret_bytes = _reap_task(bot_dimensions, bot_details, to_run_key, |
| request, claim_id, 3, False) |
| return request, secret_bytes, run_result |
| |
| |
| def bot_update_task(run_result_key, bot_id, output, output_chunk_start, |
| exit_code, duration, hard_timeout, io_timeout, cost_usd, |
| cas_output_root, cipd_pins, performance_stats, canceled): |
| """Updates a TaskRunResult and TaskResultSummary, along TaskOutputChunk. |
| |
| Arguments: |
| - run_result_key: ndb.Key to TaskRunResult. |
| - bot_id: Self advertised bot id to ensure it's the one expected. |
| - output: Data to append to this command output. |
| - output_chunk_start: Index of output in the stdout stream. |
| - exit_code: Mark that this task completed. |
| - duration: Time spent in seconds for this task, excluding overheads. |
| - hard_timeout: Bool set if an hard timeout occured. |
| - io_timeout: Bool set if an I/O timeout occured. |
| - cost_usd: Cost in $USD of this task up to now. |
| - cas_output_root: task_request.CASReference or None. |
| - cipd_pins: None or task_result.CipdPins |
| - performance_stats: task_result.PerformanceStats instance or None. Can only |
| be set when the task is completing. |
| - canceled: Bool set if the task was canceled before running. |
| Invalid states, these are flat out refused: |
| - A command is updated after it had an exit code assigned to. |
| |
| Returns: |
| TaskRunResult.state or None in case of failure. |
| """ |
| assert output_chunk_start is None or isinstance(output_chunk_start, int) |
| assert output is None or isinstance(output, str) |
| if cost_usd is not None and cost_usd < 0.: |
| raise ValueError('cost_usd must be None or greater or equal than 0') |
| if duration is not None and duration < 0.: |
| raise ValueError('duration must be None or greater or equal than 0') |
| if (duration is None) != (exit_code is None): |
| raise ValueError( |
| 'had unexpected duration; expected iff a command completes\n' |
| 'duration: %r; exit: %r' % (duration, exit_code)) |
| if performance_stats and duration is None: |
| raise ValueError( |
| 'duration must be set when performance_stats is set\n' |
| 'duration: %s; performance_stats: %s' % (duration, performance_stats)) |
| |
| packed = task_pack.pack_run_result_key(run_result_key) |
| logging.debug( |
| 'bot_update_task(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', packed, |
| bot_id, |
| len(output) if output else output, output_chunk_start, exit_code, |
| duration, hard_timeout, io_timeout, cost_usd, cas_output_root, cipd_pins, |
| performance_stats) |
| |
| result_summary_key = task_pack.run_result_key_to_result_summary_key( |
| run_result_key) |
| request = task_pack.result_summary_key_to_request_key( |
| result_summary_key).get() |
| |
| need_cancel = False |
| es_cfg = None |
| # Kill this task if parent task is not running nor pending. |
| if request.parent_task_id: |
| parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) |
| parent = parent_run_key.get(use_cache=False, use_memcache=False) |
| need_cancel = parent.state not in task_result.State.STATES_RUNNING |
| if need_cancel: |
| es_cfg = external_scheduler.config_for_task(request) |
| |
| now = utils.utcnow() |
| run = lambda: _bot_update_tx( |
| run_result_key, bot_id, output, output_chunk_start, exit_code, duration, |
| hard_timeout, io_timeout, cost_usd, cas_output_root, cipd_pins, |
| need_cancel, performance_stats, now, result_summary_key, request, es_cfg, |
| canceled) |
| try: |
| logging.info('Starting transaction') |
| smry, run_result, error = datastore_utils.transaction(run, retries=3) |
| logging.info('Transaction committed') |
| except datastore_utils.CommitError as e: |
| logging.info('Got commit error: %s', e) |
| # It is important that the caller correctly surface this error as the bot |
| # will retry on HTTP 500. |
| return None |
| if smry and smry.state != task_result.State.RUNNING: |
| # Take no chance and explicitly clear the ndb memcache entry. A very rare |
| # race condition is observed where a stale version of the entities it kept |
| # in memcache. |
| ndb.get_context()._clear_memcache([result_summary_key, |
| run_result_key]).check_success() |
| assert bool(error) != bool(run_result), (error, run_result) |
| if error: |
| logging.error('Task %s %s', packed, error) |
| return None |
| |
| update_pubsub_success = _maybe_pubsub_notify_now(smry, request) |
| |
| # Caller must retry if PubSub enqueue fails. |
| if not update_pubsub_success: |
| return None |
| if smry.state not in task_result.State.STATES_RUNNING: |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/tasks/cancel-children-tasks', |
| 'cancel-children-tasks', |
| payload=utils.encode_to_json({'task': smry.task_id}) |
| ) |
| if not ok: |
| raise Error( |
| 'Failed to push cancel children task for %s' % smry.task_id) |
| |
| # Hack a bit to tell the bot what it needs to hear (see handler_bot.py). It's |
| # kind of an ugly hack but the other option is to return the whole run_result. |
| run_result_state = run_result.state |
| if run_result.killing: |
| run_result_state = task_result.State.KILLED |
| if request.has_build_task: |
| update_id = utils.time_time_ns() |
| if not _buildbucket_update(request.key, run_result_state, update_id): |
| return None |
| return run_result_state |
| |
| |
| def bot_terminate_task(run_result_key, bot_id, start_time, client_error): |
| """Terminates a task that is currently running as an internal failure. |
| |
| Sets the TaskRunResult's state to |
| - CLIENT_ERROR if missing_cipd or missing_cas |
| - KILLED if it's canceled |
| - BOT_DIED otherwise |
| |
| Returns: |
| str if an error message. |
| """ |
| result_summary_key = task_pack.run_result_key_to_result_summary_key( |
| run_result_key) |
| request = task_pack.result_summary_key_to_request_key( |
| result_summary_key).get() |
| now = utils.utcnow() |
| packed = task_pack.pack_run_result_key(run_result_key) |
| es_cfg = external_scheduler.config_for_task(request) |
| |
| def run(): |
| run_result, result_summary = ndb.get_multi( |
| (run_result_key, result_summary_key), |
| use_cache=False, |
| use_memcache=False) |
| if bot_id and run_result.bot_id != bot_id: |
| return 'Bot %s sent task kill for task %s owned by bot %s' % ( |
| bot_id, packed, run_result.bot_id) |
| |
| if run_result.state == task_result.State.BOT_DIED: |
| # Ignore this failure. |
| return None |
| |
| run_result.signal_server_version() |
| missing_cas = None |
| missing_cipd = None |
| if client_error: |
| missing_cas = client_error.get('missing_cas') |
| missing_cipd = client_error.get('missing_cipd') |
| if run_result.killing: |
| run_result.killing = False |
| run_result.state = task_result.State.KILLED |
| run_result = _set_fallbacks_to_exit_code_and_duration(run_result, now) |
| run_result.internal_failure = True |
| # run_result.abandoned_ts is set when run_result.killing == True |
| actual_state_change_time = run_result.abandoned_ts |
| elif missing_cipd or missing_cas: |
| run_result.state = task_result.State.CLIENT_ERROR |
| actual_state_change_time = start_time |
| run_result = _set_fallbacks_to_exit_code_and_duration(run_result, now) |
| if missing_cipd: |
| run_result.missing_cipd = [ |
| task_request.CipdPackage.create_from_json(cipd) |
| for cipd in missing_cipd |
| ] |
| if missing_cas: |
| run_result.missing_cas = [ |
| task_request.CASReference.create_from_json(cas) |
| for cas in missing_cas |
| ] |
| run_result.internal_failure = False |
| else: |
| run_result.state = task_result.State.BOT_DIED |
| # this should technically be the exact time when the bot terminates the |
| # running this task as a bot died, but this is a close approximation |
| actual_state_change_time = start_time |
| run_result.internal_failure = True |
| run_result.abandoned_ts = now |
| run_result.completed_ts = now |
| run_result.modified_ts = now |
| run_result.dead_after_ts = None |
| result_summary.set_from_run_result(run_result, request) |
| |
| futures = ndb.put_multi_async((run_result, result_summary)) |
| _maybe_taskupdate_notify_via_tq(result_summary, |
| request, |
| es_cfg, |
| transactional=True) |
| for f in futures: |
| f.check_success() |
| latency = utils.utcnow() - actual_state_change_time |
| ts_mon_metrics.on_dead_task_detection_latency(result_summary.tags, latency, |
| False) |
| return None |
| |
| try: |
| msg = datastore_utils.transaction(run) |
| except datastore_utils.CommitError as e: |
| # At worst, the task will be tagged as BOT_DIED after BOT_PING_TOLERANCE |
| # seconds passed on the next cron_handle_bot_died cron job. |
| return 'Failed killing task %s: %s' % (packed, e) |
| return msg |
| |
| |
| def cancel_task_with_id(task_id, kill_running, bot_id): |
| """Cancels a task if possible, setting it to either CANCELED or KILLED. |
| |
| Warning: ACL check must have been done before. |
| |
| See cancel_task for argument and return value details.""" |
| if not task_id: |
| logging.error('Cannot cancel a blank task') |
| return False, False |
| request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
| if not request_key or not result_key: |
| logging.error('Cannot search for a falsey key. Request: %s Result: %s', |
| request_key, result_key) |
| return False, False |
| request_obj = request_key.get() |
| if not request_obj: |
| logging.error('Request for %s was not found.', request_key.id()) |
| return False, False |
| |
| return cancel_task(request_obj, result_key, kill_running, bot_id) |
| |
| |
| def cancel_task(request, result_key, kill_running, bot_id): |
| """Cancels a task if possible, setting it to either CANCELED or KILLED. |
| |
| Ensures that the associated TaskToRunShard is canceled (when pending) and |
| updates the TaskResultSummary/TaskRunResult accordingly. |
| The TaskRunResult.state is immediately set to KILLED for running tasks. |
| |
| Warning: ACL check must have been done before. |
| |
| Arguments: |
| request: TaskRequest instance to cancel. |
| result_key: result key for request to cancel. |
| kill_running: if true, allow cancelling a task in RUNNING state. |
| bot_id: if specified, only cancel task if it is RUNNING on this bot. Cannot |
| be specified if kill_running is False. |
| Returns: |
| tuple(bool, bool) |
| - True if the cancellation succeeded. Either the task atomically changed |
| from PENDING to CANCELED or it was RUNNING and killing bit has been set. |
| - True if the task was running while it was canceled. |
| |
| Raises: |
| datastore_utils.CommitError if the transaction failed. |
| """ |
| if bot_id: |
| assert kill_running, "Can't use bot_id if kill_running is False." |
| if result_key.kind() == 'TaskRunResult': |
| # Ignore the try number. A user may ask to cancel run result 1, but if it |
| # BOT_DIED, it is accepted to cancel try number #2 since the task is still |
| # "pending". |
| result_key = task_pack.run_result_key_to_result_summary_key(result_key) |
| now = utils.utcnow() |
| es_cfg = external_scheduler.config_for_task(request) |
| |
| if kill_running: |
| task_id = task_pack.pack_result_summary_key(result_key) |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/tasks/cancel-children-tasks', |
| 'cancel-children-tasks', |
| payload=utils.encode_to_json({ |
| 'task': task_id, |
| })) |
| if not ok: |
| raise Error('Failed to enqueue task to cancel-children-tasks queue;' |
| ' task_id: %s' % task_id) |
| |
| def run(): |
| result_summary = result_key.get(use_cache=False, use_memcache=False) |
| orig_state = result_summary.state |
| succeeded, was_running = _cancel_task_tx(request, result_summary, |
| kill_running, bot_id, now, es_cfg) |
| state_changed = result_summary.state != orig_state |
| return succeeded, was_running, state_changed, result_summary |
| |
| succeeded, was_running, state_changed, result_summary = \ |
| datastore_utils.transaction(run) |
| if state_changed: |
| ts_mon_metrics.on_task_status_change_scheduler_latency(result_summary) |
| return succeeded, was_running |
| |
| |
| def cancel_tasks(limit, query, cursor=None): |
| # type: (int, ndb.Query, Optional[str]) |
| # -> Tuple[str, Sequence[task_result.TaskResultSummary]] |
| """ |
| Raises: |
| ValueError if limit is not within [1, 1000] or cursor is not valid. |
| handlers_exceptions.InternalException if cancel request could not be |
| enqueued. |
| """ |
| results, cursor = datastore_utils.fetch_page(query, limit, cursor) |
| |
| if results: |
| payload = json.dumps({ |
| 'tasks': [r.task_id for r in results], |
| 'kill_running': True, |
| }) |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/tasks/cancel', |
| 'cancel-tasks', |
| payload=payload) |
| if not ok: |
| raise handlers_exceptions.InternalException( |
| 'Could not enqueue cancel request, try again later') |
| else: |
| logging.info('No tasks to cancel.') |
| |
| return cursor, results |
| |
| |
| def expire_slice(to_run_key, terminal_state, reason): |
| """Expires a slice represented by the given TaskToRunShard entity. |
| |
| Schedules the next slice, if possible, or terminates the task with the given |
| terminal_state if there are no more slices that can run. Intended to be used |
| for RBE tasks. |
| |
| Does nothing if the slice is not in pending state anymore or the task was |
| deleted already. |
| |
| Arguments: |
| to_run_key: an entity key of TaskToRunShard entity to expire. |
| terminal_state: the task state to set if this slice is the last one. |
| reason: a string tsmon label used to identify how expiration happened. |
| |
| Raises: |
| datastore_utils.CommitError on transaction errors. |
| """ |
| request_key = task_to_run.task_to_run_key_to_request_key(to_run_key) |
| request = request_key.get() |
| if not request: |
| logging.warning('TaskRequest doesn\'t exist') |
| return |
| _expire_slice(request, |
| to_run_key, |
| terminal_state, |
| claim=False, |
| txn_retries=4, |
| txn_catch_errors=False, |
| reason=reason) |
| |
| |
| ### Cron job. |
| |
| |
| def cron_abort_expired_task_to_run(): |
| """Aborts expired TaskToRunShard requests to execute a TaskRequest on a bot. |
| |
| Three reasons can cause this situation: |
| - Higher throughput of task requests incoming than the rate task requests |
| being completed, e.g. there's not enough bots to run all the tasks that gets |
| in at the current rate. That's normal overflow and must be handled |
| accordingly. |
| - No bot connected that satisfies the requested dimensions. This is trickier, |
| it is either a typo in the dimensions or bots all died and the admins must |
| reconnect them. |
| - Server has internal failures causing it to fail to either distribute the |
| tasks or properly receive results from the bots. |
| |
| This cron job just emits Task Queue tasks handled by task_expire_tasks(...). |
| """ |
| enqueued = [] |
| |
| def _enqueue_task(to_runs): |
| payload = { |
| 'entities': [(ttr.task_id, ttr.shard_index, ttr.key.integer_id()) |
| for ttr in to_runs], |
| } |
| logging.debug('Expire tasks: %s', payload['entities']) |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/tasks/expire', |
| 'task-expire', |
| payload=utils.encode_to_json(payload)) |
| if not ok: |
| logging.warning('Failed to enqueue task for %d tasks', len(to_runs)) |
| else: |
| enqueued.append(len(to_runs)) |
| |
| delay_sec = 0.0 |
| if pools_config.all_pools_migrated_to_rbe(): |
| logging.info('Delaying the expiration check to expire through RBE instead') |
| delay_sec = 60.0 |
| |
| task_to_runs = [] |
| try: |
| for to_run in task_to_run.yield_expired_task_to_run(delay_sec): |
| task_to_runs.append(to_run) |
| # Enqueue every 50 TaskToRunShards. |
| if len(task_to_runs) == 50: |
| _enqueue_task(task_to_runs) |
| task_to_runs = [] |
| # Enqueue remaining TaskToRunShards. |
| if task_to_runs: |
| _enqueue_task(task_to_runs) |
| finally: |
| logging.debug('Enqueued %d task for %d tasks', len(enqueued), sum(enqueued)) |
| |
| |
| def cron_handle_bot_died(): |
| """Aborts TaskRunResult where the bot stopped sending updates. |
| |
| The task will be canceled. |
| |
| Returns: |
| - task IDs killed |
| - number of task ignored |
| """ |
| count = {'total': 0, 'ignored': 0, 'killed': 0} |
| killed = [] |
| futures = [] |
| |
| def _handle_future(f): |
| key, state_changed, latency, tags = None, False, None, None |
| try: |
| key, state_changed, latency, tags = f.get_result() |
| except datastore_utils.CommitError as e: |
| logging.error('Failed to updated dead task. error=%s', e) |
| |
| if key: |
| killed.append(task_pack.pack_run_result_key(key)) |
| count['killed'] += 1 |
| else: |
| count['ignored'] += 1 |
| checked = count['killed'] + count['ignored'] |
| if checked % 500 == 0: |
| logging.info('Checked %d tasks', checked) |
| if state_changed: |
| ts_mon_metrics.on_dead_task_detection_latency(tags, latency, True) |
| |
| def _wait_futures(futs, cap): |
| while len(futs) > cap: |
| ndb.Future.wait_any(futs) |
| _futs = [] |
| for f in futs: |
| if f.done(): |
| _handle_future(f) |
| else: |
| _futs.append(f) |
| futs = _futs |
| return futs |
| |
| start = utils.utcnow() |
| # Timeout at 9.5 mins, we want to gracefully terminate prior to App Engine |
| # handler expiry. This will reduce the deadline exceeded 500 errors arising |
| # from the endpoint. |
| time_to_stop = start + datetime.timedelta(seconds=int(9.5 * 60)) |
| try: |
| cursor = None |
| q = task_result.TaskRunResult.query( |
| task_result.TaskRunResult.completed_ts == None) |
| while utils.utcnow() <= time_to_stop: |
| keys, cursor, more = q.fetch_page(500, |
| keys_only=True, |
| start_cursor=cursor) |
| if not keys: |
| break |
| count['total'] += len(keys) |
| logging.info('Fetched %d keys', count['total']) |
| for run_result_key in keys: |
| f = _detect_dead_task_async(run_result_key) |
| if f: |
| futures.append(f) |
| else: |
| count['ignored'] += 1 |
| # Limit the number of futures. |
| futures = _wait_futures(futures, 5) |
| # wait the remaining ones. |
| _wait_futures(futures, 0) |
| if not more: |
| break |
| finally: |
| now = utils.utcnow() |
| logging.info('cron_handle_bot_died time elapsed: %ss', |
| (now - start).total_seconds()) |
| if now > time_to_stop: |
| logging.warning('Terminating cron_handle_bot_died early') |
| if killed: |
| logging.warning('BOT_DIED!\n%d tasks:\n%s', count['killed'], |
| '\n'.join(' %s' % i for i in killed)) |
| logging.info('total %d, killed %d, ignored: %d', count['total'], |
| count['killed'], count['ignored']) |
| # These are returned primarily for unit testing verification. |
| return killed, count['ignored'] |
| |
| |
| def cron_handle_external_cancellations(): |
| """Fetch and handle external scheduler cancellations for all pools.""" |
| known_pools = pools_config.known() |
| for pool in known_pools: |
| pool_cfg = pools_config.get_pool_config(pool) |
| if not pool_cfg.external_schedulers: |
| continue |
| for es_cfg in pool_cfg.external_schedulers: |
| if not es_cfg.enabled: |
| continue |
| cancellations = external_scheduler.get_cancellations(es_cfg) |
| if not cancellations: |
| continue |
| |
| for c in cancellations: |
| data = { |
| u'bot_id': c.bot_id, |
| u'task_id': c.task_id, |
| } |
| payload = utils.encode_to_json(data) |
| if not utils.enqueue_task( |
| '/internal/taskqueue/important/tasks/cancel-task-on-bot', |
| queue_name='cancel-task-on-bot', payload=payload): |
| logging.error('Failed to enqueue task-cancellation.') |
| |
| |
| def cron_handle_get_callbacks(): |
| """Fetch and handle external desired callbacks for all pools.""" |
| known_pools = pools_config.known() |
| for pool in known_pools: |
| pool_cfg = pools_config.get_pool_config(pool) |
| if not pool_cfg.external_schedulers: |
| continue |
| for es_cfg in pool_cfg.external_schedulers: |
| if not es_cfg.enabled: |
| continue |
| request_ids = external_scheduler.get_callbacks(es_cfg) |
| if not request_ids: |
| continue |
| |
| items = [] |
| for task_id in request_ids: |
| request_key, result_key = task_pack.get_request_and_result_keys(task_id) |
| request = request_key.get() |
| result = result_key.get(use_cache=False, use_memcache=False) |
| items.append((request, result)) |
| # Send mini batch to avoid TaskTooLargeError. crbug.com/1175618 |
| if len(items) >= 20: |
| external_scheduler.notify_requests(es_cfg, items, True, True) |
| items = [] |
| if items: |
| external_scheduler.notify_requests(es_cfg, items, True, True) |
| |
| |
| ## Task queue tasks. |
| |
| |
| def task_handle_pubsub_task(payload): |
| """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| # Do not catch errors to trigger task queue task retry. Errors should not |
| # happen in normal case. |
| logging.debug("Received PubSub notify payload: (%s)", str(payload)) |
| try: |
| _pubsub_notify(payload['task_id'], payload['topic'], payload['auth_token'], |
| payload['userdata'], payload['tags'], payload['state'], |
| payload.get('start_time')) |
| except pubsub.Error: |
| logging.exception('Fatal error when sending PubSub notification') |
| |
| |
| def task_expire_tasks(task_to_runs): |
| """Expires TaskToRunShardXXX enqueued by cron_abort_expired_task_to_run. |
| |
| Arguments: |
| task_to_runs: a list of (<task ID>, <TaskToRunShard index>, <entity ID>). |
| """ |
| expired = [] |
| reenqueued = 0 |
| skipped = 0 |
| |
| try: |
| for task_id, shard_index, entity_id in task_to_runs: |
| # Convert arguments to TaskToRunShardXXX key. |
| request_key, _ = task_pack.get_request_and_result_keys(task_id) |
| to_run_key = task_to_run.task_to_run_key_from_parts( |
| request_key, shard_index, entity_id) |
| |
| # Retrieve the request to know what slice to enqueue next (if any). |
| request = request_key.get() |
| if not request: |
| logging.error('Task %s was not found.', task_id) |
| continue |
| |
| # Expire the slice and schedule the next one (if any). Obtain a claim |
| # to make sure bot_reap_task doesn't try to pick it up. |
| summary, new_to_run = _expire_slice( |
| request, |
| to_run_key, |
| task_result.State.EXPIRED, |
| claim=True, |
| txn_retries=4, |
| txn_catch_errors=True, |
| reason='task_expire_tasks', |
| ) |
| if new_to_run: |
| # The next slice was enqueued. |
| reenqueued += 1 |
| elif summary: |
| # The task was updated, but there's no new slice => the task expired. |
| slice_index = task_to_run.task_to_run_key_slice_index(to_run_key) |
| expired.append( |
| (task_id, request.task_slice(slice_index).properties.dimensions)) |
| else: |
| # The task was not updated => the slice already expired or the |
| # transaction failed. |
| skipped += 1 |
| finally: |
| if expired: |
| logging.info( |
| 'EXPIRED!\n%d tasks:\n%s', len(expired), |
| '\n'.join(' %s %s' % (task_id, dims) for task_id, dims in expired)) |
| logging.info('Reenqueued %d tasks, expired %d, skipped %d', reenqueued, |
| len(expired), skipped) |
| |
| |
| def task_cancel_running_children_tasks(parent_result_summary_id): |
| """Enqueues task queue to cancel non-completed children tasks.""" |
| q = task_result.yield_result_summary_by_parent_task_id( |
| parent_result_summary_id) |
| children_tasks_per_version = {} |
| for task in q: |
| if task.state not in task_result.State.STATES_RUNNING: |
| continue |
| version = task.server_versions[0] |
| children_tasks_per_version.setdefault(version, []).append(task.task_id) |
| |
| for version, tasks in sorted(children_tasks_per_version.items()): |
| logging.info('Sending %d tasks to version %s', len(tasks), version) |
| payload = utils.encode_to_json({ |
| 'tasks': tasks, |
| 'kill_running': True, |
| }) |
| ok = utils.enqueue_task( |
| '/internal/taskqueue/important/tasks/cancel', |
| 'cancel-tasks', payload=payload, |
| # cancel task on specific version of backend module. |
| use_dedicated_module=False, |
| version=version) |
| if not ok: |
| raise Error( |
| 'Failed to enqueue task to cancel queue; version: %s, payload: %s' % ( |
| version, payload)) |
| |
| |
| def task_buildbucket_update(payload): |
| """Handles sending a pubsub update to buildbucket or not. |
| """ |
| request_key = task_pack.result_summary_key_to_request_key( |
| task_pack.unpack_result_summary_key(payload["task_id"])) |
| state = payload["state"] |
| update_id = payload['update_id'] |
| if not _buildbucket_update(request_key, state, update_id): |
| logging.exception( |
| 'Fatal error when sending buildbucket update notification') |
| raise Error( |
| 'Transient pubsub error. Failed to update buildbucket with payload %s' % |
| payload) |