blob: ea95d87611b0a126aae7cf61f17f00910c98aeb4 [file] [log] [blame]
# coding: utf-8
# Copyright 2017 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Task queues generated from the actual load.
This means that the task queues are deduced by the actual load, they are never
explicitly defined. They are eventually deleted by a cron job once no incoming
task with the exact set of dimensions is triggered anymore.
Used to optimize scheduling.
"""
import collections
import datetime
import hashlib
import json
import logging
import random
import struct
import time
import urllib
from google.appengine.api import datastore_errors
from google.appengine.api import memcache
from google.appengine.ext import ndb
from google.appengine.runtime import apiproxy_errors
from components import datastore_utils
from components import utils
from server import config
from server.constants import OR_DIM_SEP
### Models.
class TaskDimensionsSets(ndb.Model):
"""A task dimensions hash and a list of dimension sets that match it.
Root entity. Key ID is
* `bot:<url-encoded-id>:<dimensions_hash>` - for tasks targeting a bot.
* `pool:<url-encoded-id>:<dimensions_hash>` - for tasks targeting a pool.
Where `<url-encoded-id>` is an URL encoding of bot or pool ID (primarily to
quote `:`, since it is allowed in pool IDs) and `<dimensions_hash>` is a
32 bit integer with the task dimensions hash as generated by
hash_dimensions(...).
Presence of this entity indicates that there's potentially a pending task
that matches `<dimensions_hash>` and matching bots should be polling the
corresponding TaskToRun queue.
For tasks that use OR-ed dimensions (`{"k": "a|b"}`), the `<dimensions_hash>`
is taken over the original dimensions (with "|" still inside), but the stored
dimensions set are flat i.e. `sets` logically stores two separate dimension
sets: `{"k": "a"}` and `{"k": "b"}`.
If there's a collision on `<dimensions_hash>` (i.e. two different dimensions
sets hash to the same value), `sets` stores dimensions sets from all colliding
entries. Collisions are eventually resolved when bots poll for tasks: they
get all tasks that match `<dimensions_hash>`, and then filter out ones that
don't actually match bot's dimensions.
A TaskDimensionsSets entity is always accompanied by TaskDimensionsInfo
entity. TaskDimensionsSets entity stores infrequently changing (but frequently
scanned) data, while TaskDimensionsInfo contains all data (including
frequently changing data, like expiration timestamps). This separation reduces
churn in TaskDimensionsSets datastore indexes (improving performance of full
rescans that happen in _tq_rescan_matching_task_sets_async), as well as
improves caching efficiency of direct ndb.get_multi fetches of
TaskDimensionsSets in assert_bot(...).
"""
# Disable useless in-process per-request cache to save some RAM.
_use_cache = False
# All matching dimensions sets as a list of dicts.
#
# Each dict defines a single set of task dimensions:
#
# {
# "dimensions": ["k1:v1", "k1:v2", "k2:v2", ...]
# }
#
# Dimensions are represented by a sorted dedupped list of `key:value` strings.
# These are flattened dimensions as returned by expand_dimensions_to_flats,
# i.e. values here never have "|" in them. Such dimension sets can directly be
# matched with bot dimensions using set operations.
#
# Matches dimensions stored in `TaskDimensionsInfo.sets` (sans expiration
# time).
#
# The order of dicts is irrelevant. Never empty.
sets = datastore_utils.DeterministicJsonProperty(indexed=False)
def contains_task_dimensions_set(self, task_dimensions_flat):
"""True if any of stored `sets` is equal to `task_dimensions_flat`.
Arguments:
task_dimensions_flat: a sorted list of `k:v` pairs with task dimensions.
"""
return any(task_dimensions_flat == s['dimensions'] for s in self.sets)
def matches_bot_dimensions(self, bot_dimensions_set):
"""True if any of stored `sets` matches given bot dimensions.
Arguments:
bot_dimensions_set: a set of `k:v` pairs with bot dimensions.
"""
return any(
bot_dimensions_set.issuperset(s['dimensions']) for s in self.sets)
@staticmethod
def dimensions_to_id(dimensions):
"""Returns a string ID for this entity given the task dimensions dict.
Arguments:
dimensions: a dict with task dimensions prior to expansion of "|".
"""
# Both `id` and `pool` are guaranteed to have at most 1 item. If a task
# targets a specific bot via `id`, use `bot:` key prefix to allow this bot
# to find the task faster, otherwise use `pool:`.
if u'id' in dimensions:
kind = 'bot'
pfx = dimensions[u'id'][0]
else:
kind = 'pool'
pfx = dimensions[u'pool'][0]
dimensions_hash = hash_dimensions(dimensions)
return '%s:%d' % (TaskDimensionsSets.id_prefix(kind, pfx), dimensions_hash)
@staticmethod
def id_prefix(kind, pfx):
"""Returns either `bot:<url-encoded-id>` or `pool:<url-encoded-id>`."""
assert kind in ('bot', 'pool'), kind
if isinstance(pfx, unicode):
pfx = pfx.encode('utf-8')
return '%s:%s' % (kind, urllib.quote_plus(pfx))
@staticmethod
def split_id(sets_id):
"""Given a valid TaskDimensionsSets ID, returns its kind, ID and number.
Arguments:
sets_id: a string ID, e.g. `bot:some-bot:1234`.
Returns:
Tuple with kind, ID and number, e.g. `('bot', 'some-bot', 1234)`.
"""
chunks = sets_id.split(':')
if len(chunks) != 3:
raise ValueError('Invalid TaskDimensionsSets ID %r' % sets_id)
kind, pfx, num = chunks
if kind not in ('bot', 'pool'):
raise ValueError('Invalid TaskDimensionsSets ID %r' % sets_id)
try:
return (kind, urllib.unquote(pfx), int(num))
except ValueError:
raise ValueError('Invalid TaskDimensionsSets ID %r' % sets_id)
@staticmethod
def ids_to_queue_numbers(sets_ids):
"""Given an iterable of TaskDimensionsSets IDs returns a list of queues.
Arguments:
sets_ids: an iterable with TaskDimensionsSets string IDs.
Returns:
A sorted list with queue numbers aka dimensions hashes.
"""
sets_id_to_int = lambda x: int(x[x.rfind(':') + 1:])
return sorted(sets_id_to_int(sets_id) for sets_id in sets_ids)
class TaskDimensionsInfo(ndb.Model):
"""Accompanies TaskDimensionsSets and caries expiration timestamps.
The Parent entity is the corresponding TaskDimensionsSets. ID is integer 1.
There things can happen to `sets`:
* A new entry added: happens in assert_task_async.
* Expiration of an existing entry is updated: happens in assert_task_async.
* An expired entry is removed: happens in the cleanup cron.
Whenever the list of sets in `sets` changes, the matching TaskDimensionsSets
entity is updated as well (in a single transaction). This happens relatively
infrequently. Changes to the expiry time of existing entries (which happen
frequently) do not touch TaskDimensionsSets entity at all.
"""
# Disable useless in-process per-request cache to save some RAM.
_use_cache = False
# All matching dimensions sets and their expiry time as a list of dicts.
#
# Each dict defines a single set of task dimensions together with a timestamp
# when it should be removed from the datastore:
#
# {
# "dimensions": ["k1:v1", "k1:v2", "k2:v2", ...],
# "expiry": 1663020417
# }
#
# Dimensions are represented by a sorted dedupped list of `key:value` strings.
# These are flattened dimensions as returned by expand_dimensions_to_flats,
# i.e. values here never have "|" in them. Such dimension sets can directly be
# matched with bot dimensions using set operations.
#
# Expiry is specified as a integer Unix timestamp in seconds since epoch.
# Expiry is always larger (with some wide margin) than the scheduling deadline
# of the last task that used these dimensions and it is randomized to avoid
# expiration stampedes.
#
# Matches dimensions stored in `TaskDimensionsSets.sets`.
#
# The order is irrelevant. Never empty.
sets = datastore_utils.DeterministicJsonProperty(indexed=False)
# A timestamp when this entity needs to be visited by the cleanup cron.
#
# The cron visits it if now() > next_cleanup_ts. This timestamp is calculated
# based on expiry of individual dimensions sets.
next_cleanup_ts = ndb.DateTimeProperty(indexed=True)
def expiry_map(self):
"""Returns sets as a mapping of task dimension tuples to their expiry time.
Returns:
A dict {tuple(task dimensions list) => expiry datetime.datetime}
"""
return _sets_to_expiry_map(self.sets)
class BotDimensionsMatches(ndb.Model):
"""Stores what TaskDimensionsSets are matched to a bot.
Root entity. Key ID is the bot ID.
Created the first time the bot calls assert_bot(...). Deleted when the bot is
expected to stop consuming tasks (e.g. it is deleted, quarantined, detected as
dead, etc.) in cleanup_after_bot(...) .
"""
# Disable useless in-process per-request cache to save some RAM.
_use_cache = False
# Bot dimensions as a sorted list of dedupped 'key:value' strings.
#
# If a bot reports dimension `{k: ["v1", "v2"]}`, this set will contain both
# `k:v1` and `k:v2` strings.
#
# Indexed to allow finding bots that match particular task dimensions using
# a datastore query, see _tq_update_bot_matches_async.
dimensions = ndb.StringProperty(repeated=True, indexed=True)
# String IDs of all matching TaskDimensionsSets.
#
# New entries are added via two mechanisms:
# * When a new TaskDimensionsSets task appears it proactively "searches"
# for matching bots in _tq_update_bot_matches_async. This reduces latency
# of scheduling of new kinds of tasks.
# * Periodically, and on bot dimensions change, there's a full rescan of
# all TaskDimensionsSets, see _tq_rescan_matching_task_sets_async.
#
# Entries are deleted in assert_bot(...) when their corresponding
# TaskDimensionsSets entities disappear (which means there are no more tasks
# matching these dimensions anymore) or when bot's dimensions no longer match
# dimensions in TaskDimensionsSets (can happen when bot changes dimensions).
matches = ndb.StringProperty(repeated=True, indexed=False)
# Incremented whenever a rescan is triggered, used to skip obsolete TQ tasks.
#
# When a task queue task starts, if the current value of `rescan_counter` in
# the entity is different from the value in the task payload, then this task
# is no longer relevant and should be skipped.
rescan_counter = ndb.IntegerProperty(default=0, indexed=False)
# When we need to enqueue the next full rescan.
#
# Note: it is updated when triggering the asynchronous rescan via a task queue
# task, not when it completes.
next_rescan_ts = ndb.DateTimeProperty(default=utils.EPOCH, indexed=False)
# When the last rescan was enqueued, for debugging.
last_rescan_enqueued_ts = ndb.DateTimeProperty(indexed=False)
# When the last rescan finished, for debugging.
last_rescan_finished_ts = ndb.DateTimeProperty(indexed=False)
# The last time when `matches` were cleaned up, for debugging.
last_cleanup_ts = ndb.DateTimeProperty(indexed=False)
# The last time a match was added via _maybe_add_match_async, for debugging.
last_addition_ts = ndb.DateTimeProperty(indexed=False)
@classmethod
@ndb.tasklet
def get_or_default_async(cls, bot_id):
"""Fetches an existing entity or constructs a default one.
The constructed entity is not saved here yet. It is caller's responsibility
to eventually store it.
"""
ent = yield ndb.Key(cls, bot_id).get_async()
raise ndb.Return(ent or cls(id=bot_id))
### Internal APIs.
# Exceptions that can be raised by transaction_async(...).
_TXN_EXCEPTIONS = (
# Deadline starting or landing the transaction.
apiproxy_errors.DeadlineExceededError,
# Transaction handle has expired.
datastore_errors.BadRequestError,
# Internal error of unknown origin.
datastore_errors.InternalError,
# Datastore-specific timeout.
datastore_errors.Timeout,
# Transaction collisions or internal errors.
datastore_utils.CommitError,
)
def _is_bot_matching_any_task_dims(bot_dimensions_flat, task_dims_sets):
"""True if a bot can execute tasks with any of given dimensions.
Arguments:
bot_dimensions_flat: a list of bot dimensions as `k:v` pairs.
task_dims_sets: a list of lists with flat task dimensions represented as
`k:v` pairs. This is usually expansion of `|` task dimensions as generated
by expand_dimensions_to_flats.
"""
bot_dims_set = set(bot_dimensions_flat)
return any(bot_dims_set.issuperset(dims) for dims in task_dims_sets)
def _sets_to_expiry_map(sets):
"""Constructs a mapping of task dimension tuples to their expiry time.
Arguments:
sets: a value of `TaskDimensionsInfo.sets` entity property. In particular,
`expiry` in dicts is populated.
Returns:
A dict {tuple(task dimensions list) => expiry datetime.datetime}
"""
return {
tuple(s['dimensions']): utils.timestamp_to_datetime(s['expiry'] * 1e6)
for s in sets
}
def _expiry_map_to_sets(expiry_map, with_expiry=True):
"""Converts an expiry map back into a list that can be stored in `sets`.
Reverse of _sets_to_expiry_map(...). Optionally drops the expiry time for
storing `sets` in TaskDimensionsSets.
Arguments:
expiry_map: a {tuple(task dimensions list) => datetime.datetime}.
with_expiry: if True, populate `expiry` field in the result.
Returns:
A list with dicts, can be stored in `sets` entity property.
"""
sets = []
for dims, exp in sorted(expiry_map.items()):
d = {'dimensions': dims}
if with_expiry:
d['expiry'] = int(utils.datetime_to_timestamp(exp) / 1e6)
sets.append(d)
return sets
def _put_task_dimensions_sets_async(sets_id, expiry_map):
"""Puts TaskDimensionsSets and TaskDimensionsInfo entities.
Must be called in a transaction to make sure both entities stay in sync.
Arguments:
sets_id: string ID of TaskDimensionsSets to store under.
expiry_map: an expiry map to store there, must not be empty.
"""
assert ndb.in_transaction()
assert expiry_map
sets_key = ndb.Key(TaskDimensionsSets, sets_id)
info_key = ndb.Key(TaskDimensionsInfo, 1, parent=sets_key)
# Schedule a cleanup a little bit past the expiry of the entry that expires
# first (to clean it up). Mostly to avoid hitting weird edge cases related
# to not perfectly synchronized clocks.
next_cleanup_ts = min(expiry_map.values()) + datetime.timedelta(minutes=5)
return ndb.put_multi_async([
TaskDimensionsSets(key=sets_key,
sets=_expiry_map_to_sets(expiry_map,
with_expiry=False)),
TaskDimensionsInfo(
key=info_key,
sets=_expiry_map_to_sets(expiry_map, with_expiry=True),
next_cleanup_ts=next_cleanup_ts,
),
])
def _delete_task_dimensions_sets_async(sets_id):
"""Deletes TaskDimensionsSets and TaskDimensionsInfo entities.
Must be called in a transaction to make sure both entities stay in sync.
Arguments:
sets_id: string ID of TaskDimensionsSets to delete.
"""
assert ndb.in_transaction()
sets_key = ndb.Key(TaskDimensionsSets, sets_id)
info_key = ndb.Key(TaskDimensionsInfo, 1, parent=sets_key)
return ndb.delete_multi_async([sets_key, info_key])
@ndb.tasklet
def _check_matches_async(bot_dimensions_set, sets_ids):
"""Loads TaskDimensionsSets and checks if they still exist and match the bot.
Arguments:
bot_dimensions_set: a set of `k:v` pairs with bot dimensions.
sets_ids: string IDs of TaskDimensionsSets to load and check.
Returns:
set(alive and still matching sets IDs), set(stale sets IDs).
"""
sets_ents = yield ndb.get_multi_async(
[ndb.Key(TaskDimensionsSets, sets_id) for sets_id in sets_ids])
alive = set()
stale = set()
for sets_id, sets_ent in zip(sets_ids, sets_ents):
if sets_ent and sets_ent.matches_bot_dimensions(bot_dimensions_set):
alive.add(sets_id)
else:
stale.add(sets_id)
raise ndb.Return((alive, stale))
@ndb.tasklet
def _assert_task_dimensions_async(task_dimensions, exp_ts):
"""Ensures there's corresponding TaskDimensionsSets stored in the datastore.
If it is a never seen before set of dimensions, will emit a task queue task
to find matching bots. Otherwise occasionally makes a transaction to bump
expiration time of the stored TaskDimensionsSets entity.
Arguments:
task_dimensions: a dict with task dimensions (prior to expansion of "|").
exp_ts: datetime.datetime with task expiration (aka scheduling deadline).
"""
# This is e.g. "pool:<url-encoded-pool-id>:<number>".
sets_id = TaskDimensionsSets.dimensions_to_id(task_dimensions)
sets_key = ndb.Key(TaskDimensionsSets, sets_id)
info_key = ndb.Key(TaskDimensionsInfo, 1, parent=sets_key)
log = _Logger('assert_task(%s)', sets_id)
# Load expiration of known dimensions sets to see if they need to be updated.
info = yield info_key.get_async()
expiry_map = _sets_to_expiry_map(info.sets if info else [])
# Randomize expiration time a bit. This is done for two reasons:
# * Reduce transaction collisions when checking the expiry from concurrent
# requests (only the most unlucky one will do the transaction below).
# * Spread out load for TaskDimensionsSets cleanup cron a bit in case a
# lot of tasks appeared at once.
# Some extra time will also be added in the transaction below, see comments
# there.
exp_ts += _random_timedelta_mins(0, 30)
# This expands e.g. `{"k": "a|b"}` into `[("k:a",), ("k:b",)]`.
expanded = [tuple(s) for s in expand_dimensions_to_flats(task_dimensions)]
# Check all sets are known and fresh.
fresh = True
for dims in expanded:
cur_expiry = expiry_map.get(dims)
if not cur_expiry or cur_expiry < exp_ts:
fresh = False
break
if fresh:
raise ndb.Return(None)
# Some dimensions sets are either missing or stale. We need to transactionally
# update the entity to add them, perhaps emitting a task queue task to find
# matching bots.
@ndb.tasklet
def txn():
# Set the stored expiration time far in advance. This reduces frequency of
# transactions that update the expiration time in case of a steady stream of
# requests (one transaction "covers" all next ~4h of requests: they'll see
# the entity as fresh). The downside is that we keep stuff in datastore for
# longer than strictly necessary per `exp_ts`. But this is actually a good
# thing for infrequent tasks with short expiration time. We **want** to keep
# TaskDimensionsSets for them in the datastore longer (perhaps even if there
# are no such tasks the queues) to avoid frequently creating and deleting
# TaskDimensionsSets entities for them. Creating TaskDimensionsSets is a
# costly operation since it requires a scan to find matching bots.
extended_ts = exp_ts + datetime.timedelta(hours=4)
# Load the fresh state, since we are in the transaction now.
info = yield info_key.get_async()
expiry_map = _sets_to_expiry_map(info.sets if info else [])
# Bump the expiry and add new sets.
created = []
updated = []
for dims in expanded:
cur_expiry = expiry_map.get(dims)
if not cur_expiry:
log.info('adding %s, expiry %s', dims, extended_ts)
expiry_map[dims] = extended_ts
created.append(dims)
elif cur_expiry < extended_ts:
log.info('expiry of %s: %s => %s', dims, cur_expiry, extended_ts)
expiry_map[dims] = extended_ts
updated.append(dims)
# It is possible the entity was already updated by another transaction.
if not created and not updated:
log.info('nothing to commit, already updated by another txn')
raise ndb.Return(None)
# Kick out old sets since we are going to commit the mutation anyway.
now = utils.utcnow()
for dims, expiry in expiry_map.items():
if expiry < now:
log.info('dropping %s, expired at %s', dims, expiry)
expiry_map.pop(dims)
# Some sets must survive, since we just added/updated fresh sets.
assert len(expiry_map) > 0
yield _put_task_dimensions_sets_async(sets_id, expiry_map)
# If we added new sets, need to launch a task to find matching bots ASAP.
# This reduces scheduling latency for new kinds of tasks. This eventually
# calls _tq_update_bot_matches_async. Note we don't need to do this if we
# are just updating the expiration time. The assignments of tasks to bots
# were already done at that point.
if created:
log.info('enqueuing a task to find matches for %s', created)
ok = yield utils.enqueue_task_async(
'/internal/taskqueue/important/task_queues/update-bot-matches',
'update-bot-matches',
payload=utils.encode_to_json({
'task_sets_id': sets_id,
'dimensions': created,
'enqueued_ts': now,
}),
transactional=True,
)
if not ok:
raise datastore_utils.CommitError('Failed to enqueue a TQ task')
# Do it!
log.info('launching txn')
yield datastore_utils.transaction_async(txn, retries=3)
log.info('txn completed')
@ndb.tasklet
def _tq_update_bot_matches_async(task_sets_id, task_sets_dims, enqueued_ts):
"""Assigns a new task dimension set to matching bots.
Discovers BotDimensionsMatches that can be potentially matched to any
task dimensions set in `task_sets_dims` and transactionally updates them
if they indeed match.
Arguments:
task_sets_id: string ID of TaskDimensionsSets that contains the set.
task_sets_dims: a list with lists of flat task dimensions to match on, e.g.
`[[`k1:a`, `k2:v1`], [`k1:a`, `k2:v2`]]`. It may have multiple elements
when tasks is using OR dimensions.
enqueued_ts: datetime.datetime when this task was enqueued, for debugging.
Returns:
True if succeeded, False if the TQ task needs to be retried.
"""
assert task_sets_id.startswith(('bot:', 'pool:')), task_sets_id
assert all(isinstance(dims, list) for dims in task_sets_dims), task_sets_dims
log = _Logger('tq_update_matches(%s)', task_sets_id)
# Verify this TQ task is still necessary by checking the entity exists.
task_sets = yield ndb.Key(TaskDimensionsSets, task_sets_id).get_async()
if not task_sets:
log.warning('the entity is already gone, retiring the task')
raise ndb.Return(True)
# Ignore sets no longer mentioned in the entity (i.e. if there were cleaned up
# by the cron). The TQ task may have been stuck for a while. Should be rare.
actual_task_dims = []
for dims in task_sets_dims:
if task_sets.contains_task_dimensions_set(dims):
actual_task_dims.append(dims)
else:
log.warning('the set is no longer actual: %s', dims)
if not actual_task_dims:
log.warning('all sets are no longer actual, retiring the task')
raise ndb.Return(True)
# Set of bot IDs that were visited by `_maybe_add_match_async` already.
bots_visited = set()
# Bot IDs that were transactionally updated.
bots_updated = []
# True if all necessary scans and updates succeeded.
ok = True
# Bind all unchanging arguments for _maybe_add_match_async.
maybe_add_match_async = lambda bot_matches_ent: _maybe_add_match_async(
bot_matches_ent, task_sets_id, actual_task_dims, enqueued_ts, log,
bots_visited, bots_updated)
if task_sets_id.startswith('bot:'):
# When the task targets a specific bot, we can find BotDimensionsMatches
# fast. There's at most one potentially matching entity, no need to run
# queries.
_, bot_id, _ = TaskDimensionsSets.split_id(task_sets_id)
bot_matches_ent = yield ndb.Key(BotDimensionsMatches, bot_id).get_async()
if bot_matches_ent:
ok = yield maybe_add_match_async(bot_matches_ent)
else:
# When targeting all bots in a pool, need to run queries based on indexed
# bot dimensions to find them.
queries = []
for idx, task_dims in enumerate(actual_task_dims):
q = BotDimensionsMatches.query()
for kv in task_dims:
q = q.filter(BotDimensionsMatches.dimensions == kv)
queries.append((q, log.derive('query #%d', idx)))
ok = yield _map_async(queries, maybe_add_match_async)
# Retry the whole thing if something failed or the query timed out. Hopefully
# there will be less things to process on a retry and eventually the task
# succeeds.
log.info('visited %d bots, updated %d', len(bots_visited), len(bots_updated))
if not ok:
log.error('need a retry')
raise ndb.Return(ok)
@ndb.tasklet
def _maybe_add_match_async(bot_matches_ent, task_sets_id, task_sets_dims,
enqueued_ts, log, bots_visited, bots_updated):
"""Adds ID of a matching set to BotDimensionsMatches if still necessary.
Implementation detail of _tq_update_bot_matches_async.
Arguments:
bot_matches_ent: BotDimensionsMatches to update if still necessary.
task_sets_id: string ID of TaskDimensionsSets to add as a match.
task_sets_dims: a list with lists of flat task dimensions to match on, e.g.
`[[`k1:a`, `k2:v1`], [`k1:a`, `k2:v2`]]`. It may have multiple elements
when original task dimensions have "|" inside.
enqueued_ts: datetime.datetime when the TQ was enqueued, for debugging.
log: _Logger to use for logs.
bots_visited: a set of already check or updated bot IDs. Will be mutated.
bots_updated: a list of already updated bot IDs. Will be mutated.
Returns:
True if succeeded (or was skipped), False if something failed.
"""
# A bot may be discovered through different queries. Visit it only once.
bot_id = bot_matches_ent.key.string_id()
if bot_id in bots_visited:
raise ndb.Return(True)
bots_visited.add(bot_id)
# The bot may be already matched to the task set.
if task_sets_id in bot_matches_ent.matches:
raise ndb.Return(True)
# Check bot dimensions are still a match (this is especially important when
# dealing with `bot:...` TaskDimensionsSets, since we don't use a filtering
# query in that case). Bot dimensions will be also double checked inside
# the transaction.
bot_dims = bot_matches_ent.dimensions
if not _is_bot_matching_any_task_dims(bot_dims, task_sets_dims):
raise ndb.Return(True)
# The transaction will redo all the checks (in case dimensions changed) and
# add the match. The transaction is needed to avoid stomping over matches
# added concurrently via other TQ tasks.
@ndb.tasklet
def txn():
txn_matches = yield ndb.Key(BotDimensionsMatches, bot_id).get_async()
if not txn_matches:
raise ndb.Return(False)
if task_sets_id in txn_matches.matches:
raise ndb.Return(False)
bot_dims = txn_matches.dimensions
if not _is_bot_matching_any_task_dims(bot_dims, task_sets_dims):
raise ndb.Return(False)
txn_matches.matches.append(task_sets_id)
txn_matches.matches.sort()
txn_matches.last_addition_ts = utils.utcnow()
yield txn_matches.put_async()
raise ndb.Return(True)
try:
updated = yield datastore_utils.transaction_async(txn, retries=5)
if updated:
log.info('updated %s, delay %s', bot_id, utils.utcnow() - enqueued_ts)
bots_updated.append(bot_id)
raise ndb.Return(True)
except _TXN_EXCEPTIONS:
log.warning('error updating %s', bot_id)
raise ndb.Return(False)
@ndb.tasklet
def _cleanup_task_dimensions_async(dims_info, log):
"""Removes stale dimensions sets from a TaskDimensions[Sets|Info] entities.
Deletes entities entirely if all sets there are stale. Called as part of the
cleanup cron.
Arguments:
dims_info: a TaskDimensionsInfo entity to cleanup.
log: _Logger to use for logs.
Returns:
True if cleaned up, False if already clean or gone.
"""
now = utils.utcnow()
sets_id = dims_info.key.parent().string_id()
# Confirm we have something to cleanup before opening a transaction.
expiry_map = _sets_to_expiry_map(dims_info.sets)
if all(expiry >= now for expiry in expiry_map.values()):
raise ndb.Return(False)
@ndb.tasklet
def txn():
txn_ent = yield dims_info.key.get_async()
if not txn_ent:
raise ndb.Return(False)
expiry_map = _sets_to_expiry_map(txn_ent.sets)
changed = False
for dims, expiry in expiry_map.items():
if expiry < now:
log.info('%s: dropping %s, expired at %s', sets_id, dims, expiry)
expiry_map.pop(dims)
changed = True
if changed:
if not expiry_map:
yield _delete_task_dimensions_sets_async(sets_id)
else:
yield _put_task_dimensions_sets_async(sets_id, expiry_map)
raise ndb.Return(changed)
changed = yield datastore_utils.transaction_async(txn, retries=3)
raise ndb.Return(changed)
@ndb.tasklet
def _assert_bot_dimensions_async(bot_dimensions):
"""Updates assignment of task queues to the bot, returns them.
Runs as part of /bot/poll and must be fast.
Arguments:
bot_dimensions: a dict with bot dimensions (including "id" dimension), i.e.
`{key: [value]}`.
Returns:
A set of string IDs of TaskDimensionsSets that match the bot.
"""
bot_id = bot_dimensions[u'id'][0]
bot_dimensions_flat = bot_dimensions_to_flat(bot_dimensions)
bot_dimensions_set = set(bot_dimensions_flat)
log = _Logger('assert_bot(%s)', bot_id)
# Load queue numbers assigned to the bot.
log.info('loading bot dimensions matches')
matches = yield BotDimensionsMatches.get_or_default_async(bot_id)
# Load associated dimension sets to check the bot still matches them. This is
# the hottest spot that heavily relies on ndb memcache.
log.info('checking %d dimensions sets', len(matches.matches))
alive, stale = yield _check_matches_async(bot_dimensions_set, matches.matches)
if stale:
log.info('will unmatch: %s', ' '.join(stale))
# We need to trigger a rescan of all potentially matching TaskDimensionsSets
# when bot dimensions change and also periodically (to workaround eventual
# consistency of datastore queries and various races in the code).
rescan_reason = None
if matches.dimensions != bot_dimensions_flat:
rescan_reason = _diff_bot_dims(matches.dimensions, bot_dimensions_flat)
elif matches.next_rescan_ts and utils.utcnow() > matches.next_rescan_ts:
rescan_reason = 'periodic'
if rescan_reason is not None:
@ndb.tasklet
def enqueue_rescan_txn():
now = utils.utcnow()
txn_ent = yield BotDimensionsMatches.get_or_default_async(bot_id)
if txn_ent.next_rescan_ts != matches.next_rescan_ts:
# This can theoretically happen if a bot is calling /bot/poll in
# parallel, which it should not be doing.
log.warning('aborting txn, the entity state changed')
raise ndb.Return(None)
txn_ent.dimensions = bot_dimensions_flat
txn_ent.matches = [x for x in txn_ent.matches if x not in stale]
txn_ent.rescan_counter += 1
txn_ent.next_rescan_ts = now + _random_timedelta_mins(20, 40)
txn_ent.last_rescan_enqueued_ts = now
txn_ent.last_cleanup_ts = now
# This eventually calls _tq_rescan_matching_task_sets_async.
ok = yield utils.enqueue_task_async(
'/internal/taskqueue/important/task_queues/rescan-matching-task-sets',
'rescan-matching-task-sets',
payload=utils.encode_to_json({
'bot_id': bot_id,
'rescan_counter': txn_ent.rescan_counter,
'rescan_reason': rescan_reason,
}),
transactional=True)
if not ok:
raise datastore_utils.CommitError('Failed to enqueue TQ task')
yield txn_ent.put_async()
log.info('triggering rescan: %s', rescan_reason)
yield datastore_utils.transaction_async(enqueue_rescan_txn, retries=5)
elif stale:
# If the rescan is not needed, but we have stale matches, clean them up now
# to reduce amount of work for future checks. This can be "fire and forget"
# operation, but ndb async tasklet model makes this impossible, so we'll
# wait. Ignore errors though, they are not critical at this stage.
@ndb.tasklet
def unmatch_txn():
txn_ent = yield BotDimensionsMatches.get_or_default_async(bot_id)
if txn_ent.dimensions != bot_dimensions_flat:
log.warning('aborting txn, the entity state changed')
raise ndb.Return(None)
before = len(txn_ent.matches)
txn_ent.matches = [x for x in txn_ent.matches if x not in stale]
if len(txn_ent.matches) != before:
txn_ent.last_cleanup_ts = utils.utcnow()
yield txn_ent.put_async()
try:
yield datastore_utils.transaction_async(unmatch_txn, retries=5)
except _TXN_EXCEPTIONS:
log.warning('error when cleaning matches, ignoring')
# Log integer IDs of all matching queues.
queue_numbers = TaskDimensionsSets.ids_to_queue_numbers(alive)
log.info('queues (%d): %s', len(queue_numbers), queue_numbers)
raise ndb.Return(alive)
@ndb.tasklet
def _tq_rescan_matching_task_sets_async(bot_id, rescan_counter, rescan_reason):
"""A task queue task that finds all matching TaskDimensionsSets for a bot.
Arguments:
bot_id: ID of the bot.
rescan_counter: value of `BotDimensionsMatches.rescan_counter` when the
task was enqueued.
rescan_reason: a string with rescan reason, for debugging.
Returns:
True if succeeded, False if the TQ task needs to be retried.
"""
log = _Logger('tq_rescan_matches(%s)', bot_id)
log.info('rescan: %s', rescan_reason)
# Verify the task is still fresh and load dimensions to match on.
matches = yield BotDimensionsMatches.get_or_default_async(bot_id)
if matches.rescan_counter != rescan_counter:
log.info('skipping, rescan counter %d != %s', matches.rescan_counter,
rescan_counter)
raise ndb.Return(True)
log.info('enqueued %s ago', utils.utcnow() - matches.last_rescan_enqueued_ts)
bot_dimensions_flat = matches.dimensions
bot_dimensions_set = set(bot_dimensions_flat)
def scan_prefix_query(set_kind, pfx):
pfx = TaskDimensionsSets.id_prefix(set_kind, pfx)
start = '%s:%s' % (pfx, chr(ord('0') - 1))
end = '%s:%s' % (pfx, chr(ord('9') + 1))
query = ndb.Query(
kind='TaskDimensionsSets',
filters=ndb.ConjunctionNode(
ndb.FilterNode('__key__', '>', ndb.Key(TaskDimensionsSets, start)),
ndb.FilterNode('__key__', '<', ndb.Key(TaskDimensionsSets, end)),
),
)
return query, log.derive('%s', pfx)
# Construct queries that scan for potentially matching TaskDimensionsSets.
#
# TODO(vadimsh): Each query can be sharded to parallelize the scan even more
# if necessary.
queries = [scan_prefix_query('bot', bot_id)]
for kv in bot_dimensions_flat:
k, v = kv.split(':', 1)
if k == 'pool':
queries.append(scan_prefix_query('pool', v))
# A set of matching TaskDimensionsSets IDs discovered by the scans.
alive = set()
# A counter of visited items for debugging.
visited = [0]
def visit_task_dimensions_set(task_dims_sets):
visited[0] += 1
if task_dims_sets.matches_bot_dimensions(bot_dimensions_set):
alive.add(task_dims_sets.key.string_id())
# Find all TaskDimensionsSets matching the bot dimensions.
visited_all = yield _map_async(queries, visit_task_dimensions_set)
log.info('visited %d entities, found %d matches', visited[0], len(alive))
# Double check any currently matched sets that were not discovered by the scan
# are indeed dead and should be unmatched. This is particularly important if
# scans were incomplete due to timeouts (i.e. visited_all is False), but also
# matters if the query is "eventually consistent" and omits some very recent
# entities. We don't want to delete active matches.
_, stale = yield _check_matches_async(
bot_dimensions_set, [sid for sid in matches.matches if sid not in alive])
# Store new matches in the entity if the entity still has dimensions we
# scanned for. This returns `last_rescan_enqueued_ts` if the entity was
# updated or None if not.
@ndb.tasklet
def txn():
now = utils.utcnow()
txn_ent = yield BotDimensionsMatches.get_or_default_async(bot_id)
if txn_ent.dimensions != bot_dimensions_flat:
log.warning('dimensions changed while we were scanning, aborting')
raise ndb.Return(None)
# Carefully merge `alive` and `stale` into `matches` to avoid stomping over
# matches that may have been added concurrently by _maybe_add_match_async.
cur = set(txn_ent.matches)
new = (cur | alive) - stale
if cur == new:
log.info('no changes to the matched set')
for x in cur - new:
log.info('unmatched %s', x)
for x in new - cur:
log.info('matched %s', x)
txn_ent.matches = sorted(new)
txn_ent.last_rescan_finished_ts = now
txn_ent.last_cleanup_ts = now
yield txn_ent.put_async()
raise ndb.Return(txn_ent.last_rescan_enqueued_ts)
log.info('storing changes')
rescan_enqueued_ts = yield datastore_utils.transaction_async(txn, retries=5)
if rescan_enqueued_ts:
log.info('rescan delay: %s', utils.utcnow() - rescan_enqueued_ts)
# Retire the TQ task if the entity is already stale or we successfully
# scanned and updated everything.
if not rescan_enqueued_ts:
log.warning('this task queue task is stale, retiring it')
elif not visited_all:
log.error('some scans did not complete, need a retry')
raise ndb.Return(stale or visited_all)
def _freshen_up_queues_memcache(queues):
"""Updates memcache records with liveness of given queues.
This is needed by probably_has_capacity(...) check.
Arguments:
queues: a list of integers with queue numbers (aka dimension hashes).
"""
# TODO(vadimsh): Stop hammering memcache. These entries here easily end up
# getting 2k-3k QPS each.
memcache.set_multi({str(d): True
for d in queues},
time=61,
namespace='task_queues_tasks')
### Some generic helpers.
class _Logger(object):
"""Prefixes a string and request duration to logged messages."""
def __init__(self, pfx, *args):
self._pfx = pfx % args
self._ts = time.time()
def derive(self, sfx, *args):
"""Derives a new logger with prefix extended by the given message."""
logger = _Logger('%s: %s', self._pfx, sfx % args)
logger._ts = self._ts
return logger
def _log(self, lvl, msg, args):
lvl('[%.2fs] %s: %s', time.time() - self._ts, self._pfx, msg % args)
#print '[%.2fs] %s: %s' % (time.time() - self._ts, self._pfx, msg % args)
def info(self, msg, *args):
self._log(logging.info, msg, args)
def warning(self, msg, *args):
self._log(logging.warning, msg, args)
def error(self, msg, *args):
self._log(logging.error, msg, args)
def _diff_bot_dims(old, new):
"""Pretty-prints changes to bot dimensions into a string.
Arguments:
old: old bot dimensions as a list of `k:v` strings.
new: new bot dimensions as as list of `k:v` strings.
Returns:
A string for logging.
"""
old_set = set(old)
new_set = set(new)
add = ' '.join(dim for dim in new if dim not in old_set)
rem = ' '.join(dim for dim in old if dim not in new_set)
parts = []
if add:
parts.append('dims added [%s]' % add)
if rem:
parts.append('dims removed [%s]' % rem)
return ', '.join(parts)
def _random_timedelta_mins(a, b):
"""Returns a random timedelta in [a min; b min) range."""
return datetime.timedelta(minutes=random.uniform(a, b))
class _AsyncWorkQueue(object):
"""A queue of work items with limits on concurrency.
Used internally by _map_async.
"""
def __init__(self, producers):
"""Creates a queue that accepts items from a number of producers.
Arguments:
producers: number of producers that would enqueue items. Each one is
expected to call done() when they are done. Once all producers are done,
the queue is not accepting new items. This eventually causes
consume_async tasklet to exit.
"""
assert producers > 0
self._queue = collections.deque()
self._producers = producers
self._waiter = None
def _wakeup(self):
if self._waiter:
waiter, self._waiter = self._waiter, None
waiter.set_result(None)
def enqueue(self, items):
"""Enqueues a batch of items to be processed.
Arguments:
items: an iterable of items, each one will eventually be passed to the
callback in `cb` which may decide to launch a tasklet to process this
item.
"""
assert self._producers > 0
if items:
self._queue.extend(items)
self._wakeup()
def done(self):
"""Must be called by a producer when they are done with the queue."""
assert self._producers > 0
self._producers -= 1
if not self._producers:
self._wakeup()
@ndb.tasklet
def consume_async(self, cb, max_concurrency):
"""Runs a loop that dequeues items and calls the callback for each item.
See _map_async for expected behavior of the callback.
Returns:
True if all futures returned by the callback resolved into True.
"""
ok = True
running = []
while self._producers or self._queue:
# While have stuff in the queue, keep launching and executing it.
while self._queue:
fut = cb(self._queue.popleft())
if not isinstance(fut, ndb.Future):
continue
running.append(fut)
# If running too much stuff, wait for some of it to finish. What we'd
# really like is ndb.Future.wait_any_async(running), but there's no
# async variant of wait_any in Python 2 ndb library (there's one in
# Python 3 ndb). So instead just wait for the oldest future to finish
# and then collect all the ones that finished during that time.
if len(running) >= max_concurrency:
yield running[0]
pending = []
for f in running:
if f.done():
ok = ok and f.get_result()
else:
pending.append(f)
running = pending
# If the queue is drained and there are no more producers, we are done.
if not self._producers:
break
# If the queue is drained, but there are producers, wait for more items
# to appear or when the last producer drops.
assert not self._waiter
self._waiter = ndb.Future()
yield self._waiter
# Wait for the completion of the rest of the items.
oks = yield running
raise ndb.Return(ok and all(oks))
@ndb.tasklet
def _map_async(queries,
cb,
max_concurrency=200,
page_size=1000,
timeout=300,
max_pages=None):
"""Applies a callback to results of a bunch of queries.
This is roughly similar to ndb.Query.map_async, except it tries to make
progress on errors and timeouts and it limits number of concurrent tasklets
to avoid grinding to a halt when processing large sets.
Arguments:
queries: a list of (ndb.Query, _Logger) with queries to fetch results of.
cb: a callback that takes a fetched entity and optionally returns a future
to wait on. The future must resolve in a boolean, where True indicates
the item was processed successfully and False if the processing failed.
If the callback doesn't return a future, the item is assumed to be
processed successfully. The callback must not raise exceptions, they
will abort the whole thing. The callback may be called multiple times
with the same entity if this entity is returned by multiple queries.
max_concurrency: a limit on number of concurrently pending futures.
page_size: a page size for datastore queries.
timeout: how long (in seconds) to run the query loop before giving up.
max_pages: a limit on number of pages to fetch (mostly for tests).
Returns:
True if all queries finished to completion and all fetched items were
processed successfully.
"""
queue = _AsyncWorkQueue(len(queries))
@ndb.tasklet
def scan(q, logger):
# For some reason queries that use has_next_async() API have a hard deadline
# of 60s regardless of query options. This is not enough for busy servers.
# Instead we'll use an explicitly paginated query.
try:
logger.info('scanning')
deadline = utils.time_time() + timeout
count = 0
pages = 0
cursor = None
more = True
while more:
rpc_deadline = deadline - utils.time_time()
if rpc_deadline < 0 or (max_pages and pages >= max_pages):
raise ndb.Return(False)
try:
page, cursor, more = yield q.fetch_page_async(page_size,
start_cursor=cursor,
deadline=rpc_deadline)
# Avoid favoring items that appear earlier in the page. It is
# important when _map_async is retried, e.g. as a part of TQ task
# retry. Gives some opportunity for tail items to be processed. Mostly
# useful for larger page sizes.
random.shuffle(page)
queue.enqueue(page)
count += len(page)
pages += 1
except (datastore_errors.BadRequestError, datastore_errors.Timeout):
logger.error('scan timed out, visited %d items', count)
raise ndb.Return(False)
logger.info('scan completed, visited %d items', count)
raise ndb.Return(True)
finally:
queue.done()
# Run scans that enqueue items into a queue, and in parallel consume items
# from this queue.
futs = [scan(q, logger) for q, logger in queries]
futs.append(queue.consume_async(cb, max_concurrency))
ok = yield futs
raise ndb.Return(all(ok))
### Public APIs.
def expand_dimensions_to_flats(dimensions, is_bot_dim=False):
"""Expands |dimensions| to a series of dimensions_flat.
If OR is not used, the result contains exactly one element. Otherwise, it
expands the OR dimension into a series of basic dimensions, then flattens each
one of them.
Returns: a list of dimensions_flat expanded from |dimensions|. A
dimensions_flat is a sorted list of '<key>:<value>' of the basic dimensions.
This function can be called with invalid dimensions that are reported by the
bot. Tolerate them, but trim dimensions longer than 321 characters (the limit
is 64+256+1=321). This is important, otherwise handling the returned values
that are too long can throw while trying to store this in the datastore.
The challenge here is that we're handling unicode strings, but we need to
count in term of utf-8 bytes while being efficient.
According to https://en.wikipedia.org/wiki/UTF-8, the longest UTF-8 encoded
character is 4 bytes.
Keys are strictly a subset of ASCII, thus valid keys are at most 64 bytes.
Values can be any unicode string limited to 256 characters. So the worst
case is 1024 bytes for a valid value.
This means that the maximum valid string is 64+1+1024 = 1089 bytes, where 1 is
the ':' character which is one byte.
One problem is that because surrogate code points are used to encode
characters outside the base plane in UTF-16, a UCS2 build of Python may use
two Unicode code points for one characters, so len('😬') returns 2, even
though it returns 1 on a UCS4 build.
So it means that the limit is effectively halved for non-BMP characters,
depending on the python build used.
Silently remove duplicate dimensions, for the same reason as for long ones.
"""
dimensions_kv = list(dimensions.items())
for i, (k, v) in enumerate(dimensions_kv):
if isinstance(v, (str, unicode)):
assert is_bot_dim, (k, v)
dimensions_kv[i] = (k, [
v,
])
cur_dimensions_flat = []
result = []
cutoff = config.DIMENSION_KEY_LENGTH + 1 + config.DIMENSION_VALUE_LENGTH
def gen(ki, vi):
if ki == len(dimensions_kv):
# Remove duplicate dimensions. While invalid, we want to make sure they
# can be stored without throwing an exception.
result.append(sorted(set(cur_dimensions_flat)))
return
key, values = dimensions_kv[ki]
if vi == len(values):
gen(ki + 1, 0)
return
for v in values[vi].split(OR_DIM_SEP):
flat = u'%s:%s' % (key, v)
if len(flat) > cutoff:
assert is_bot_dim, flat
# An ellipsis is codepoint U+2026 which is encoded with 3 bytes in
# UTF-8. We're still well below the 1500 bytes limit. Datastore uses
# UTF-8.
flat = flat[:cutoff] + u'…'
cur_dimensions_flat.append(flat)
gen(ki, vi + 1)
cur_dimensions_flat.pop()
gen(0, 0)
return result
def bot_dimensions_to_flat(dimensions):
"""Returns a flat '<key>:<value>' sorted list of dimensions."""
try:
expanded = expand_dimensions_to_flats(dimensions, is_bot_dim=True)
except AttributeError as e:
logging.exception(
"crbug.com/1133117: failed to call expand_dimensions_to_flats for %s",
dimensions)
raise e
assert len(expanded) == 1, dimensions
return expanded[0]
def hash_dimensions(dimensions):
"""Returns a 32 bits int that is a hash of the request dimensions specified.
Dimensions values still have "|" in them, i.e. this is calculated prior to
expansion of OR-ed dimensions into a disjunction of dimension sets.
Arguments:
dimensions: dict(str, [str])
The return value is guaranteed to be a non-zero int so it can be used as a key
id in a ndb.Key.
"""
# This horrible code is the product of micro benchmarks.
# TODO(maruel): This is incorrect, as it can confuse keys and values. But
# changing the algo is non-trivial.
data = ''
for k, values in sorted(dimensions.items()):
data += k.encode('utf8')
data += '\000'
assert isinstance(values, (list, tuple)), values
for v in values:
data += v.encode('utf8')
data += '\000'
digest = hashlib.md5(data).digest()
# Note that 'L' means C++ unsigned long which is (usually) 32 bits and
# python's int is 64 bits.
return int(struct.unpack('<L', digest[:4])[0]) or 1
def assert_bot(bot_dimensions):
"""Registers the bot in the task queues system, fetches matching queues.
Coupled with assert_task_async(), enables assignment of tasks to bots by
putting tasks into logical queues (represented by queue numbers), and
assigning each bot a list of queues it needs to poll tasks from.
Arguments:
bot_dimensions: dictionary of the bot dimensions.
Returns:
A list of integers with queues to poll.
"""
# Get a set of matching TaskDimensionsSets IDs.
matches = _assert_bot_dimensions_async(bot_dimensions).get_result()
# Update queues' TTL in memcache. This acts as a signal there's still a bot
# that polls these particular queues.
queues = TaskDimensionsSets.ids_to_queue_numbers(matches)
_freshen_up_queues_memcache(queues)
return queues
def freshen_up_queues(bot_id):
"""Refreshes liveness of memcache entries for queues polled by the bot.
This is needed by probably_has_capacity(...) check.
Arguments:
bot_id: ID of the polling bot.
Returns:
A list of queues consumed by the bot.
"""
matches = ndb.Key(BotDimensionsMatches, bot_id).get()
queues = TaskDimensionsSets.ids_to_queue_numbers(
matches.matches) if matches else []
_freshen_up_queues_memcache(queues)
return queues
def cleanup_after_bot(bot_id):
"""Removes registration of this bot in the task queues system.
Arguments:
bot_id: ID of the bot to unregister.
"""
ndb.Key(BotDimensionsMatches, bot_id).delete()
@ndb.tasklet
def assert_task_async(request):
"""Makes sure the TaskRequest dimensions, for each TaskProperties, are listed
as a known queue.
This function must be called before storing the TaskRequest in the DB.
When a cache miss occurs, a task queue is triggered.
Warning: the task will not be run until the task queue ran, which causes a
user visible delay. There is no SLA but expected range is normally seconds at
worst. This only occurs on new kind of requests, which is not that often in
practice.
"""
# Note that TaskRequest may not be stored in the datastore yet, but it is
# fully populated at this point.
exp_ts = request.created_ts
futures = []
for i in range(request.num_task_slices):
t = request.task_slice(i)
exp_ts += datetime.timedelta(seconds=t.expiration_secs)
futures.append(
_assert_task_dimensions_async(t.properties.dimensions, exp_ts))
yield futures
def probably_has_capacity(dimensions):
"""Returns True if there is likely a live bot to serve this request.
There's a risk of collision, that is it could return True even if there is no
capacity. The risk is of 2^30 different dimensions sets.
It is only looking at the task queues, not at the actual bots.
Returns:
True or False if the capacity is cached, None otherwise.
"""
dimensions_hash = str(hash_dimensions(dimensions))
# Sadly, the fact that the key is not there doesn't mean that this task queue
# is dead. For example:
# - memcache could have been cleared manually or could be malfunctioning.
# - in the case where a single bot can service the dimensions, the bot may not
# have been polling for N+1 seconds.
return memcache.get(dimensions_hash, namespace='task_queues_tasks')
def set_has_capacity(dimensions, seconds):
"""Registers the fact that this task request dimensions set has capacity.
Arguments:
seconds: the amount of time this 'fact' should be kept.
"""
dimensions_hash = str(hash_dimensions(dimensions))
memcache.set(
dimensions_hash, True, time=seconds, namespace='task_queues_tasks')
def update_bot_matches_async(payload):
"""Assigns new task dimension set to matching bots.
Task queue task handler, part of assert_task_async(...) implementation.
"""
logging.info('TQ task payload:\n%s', payload)
payload = json.loads(payload)
return _tq_update_bot_matches_async(
payload['task_sets_id'], payload['dimensions'],
utils.parse_datetime(payload['enqueued_ts']))
def rescan_matching_task_sets_async(payload):
"""A task queue task that finds all matching TaskDimensionsSets for a bot."""
logging.info('TQ task payload:\n%s', payload)
payload = json.loads(payload)
return _tq_rescan_matching_task_sets_async(payload['bot_id'],
payload['rescan_counter'],
payload['rescan_reason'])
@ndb.tasklet
def tidy_task_dimension_sets_async():
"""Removes expired task dimension sets from the datastore.
Returns:
True if cleaned up everything, False if something failed.
"""
log = _Logger('tidy_task_dims')
updated = []
@ndb.tasklet
def cleanup_async(dim_info):
sets_id = dim_info.key.parent().string_id()
try:
cleaned = yield _cleanup_task_dimensions_async(dim_info, log)
if cleaned:
updated.append(sets_id)
raise ndb.Return(True)
except _TXN_EXCEPTIONS:
log.warning('error cleaning %s', sets_id)
raise ndb.Return(False)
q = TaskDimensionsInfo.query(
TaskDimensionsInfo.next_cleanup_ts < utils.utcnow())
ok = yield _map_async([(q, log)], cleanup_async)
log.info('cleaned %d', len(updated))
# Retry the whole thing if something failed or the query timed out. This is
# faster than waiting for the next cron tick. Will also give some monitoring
# signal.
if not ok:
log.error('need a retry')
raise ndb.Return(ok)