| # coding: utf-8 |
| # Copyright 2017 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """Task queues generated from the actual load. |
| |
| This means that the task queues are deduced by the actual load, they are never |
| explicitly defined. They are eventually deleted by a cron job once no incoming |
| task with the exact set of dimensions is triggered anymore. |
| |
| Used to optimize scheduling. |
| """ |
| |
| import collections |
| import datetime |
| import hashlib |
| import json |
| import logging |
| import random |
| import struct |
| import time |
| import urllib |
| |
| from google.appengine.api import datastore_errors |
| from google.appengine.api import memcache |
| from google.appengine.ext import ndb |
| from google.appengine.runtime import apiproxy_errors |
| |
| from components import datastore_utils |
| from components import utils |
| from server import config |
| from server.constants import OR_DIM_SEP |
| |
| |
| ### Models. |
| |
| |
| class TaskDimensionsSets(ndb.Model): |
| """A task dimensions hash and a list of dimension sets that match it. |
| |
| Root entity. Key ID is |
| * `bot:<url-encoded-id>:<dimensions_hash>` - for tasks targeting a bot. |
| * `pool:<url-encoded-id>:<dimensions_hash>` - for tasks targeting a pool. |
| |
| Where `<url-encoded-id>` is an URL encoding of bot or pool ID (primarily to |
| quote `:`, since it is allowed in pool IDs) and `<dimensions_hash>` is a |
| 32 bit integer with the task dimensions hash as generated by |
| hash_dimensions(...). |
| |
| Presence of this entity indicates that there's potentially a pending task |
| that matches `<dimensions_hash>` and matching bots should be polling the |
| corresponding TaskToRun queue. |
| |
| For tasks that use OR-ed dimensions (`{"k": "a|b"}`), the `<dimensions_hash>` |
| is taken over the original dimensions (with "|" still inside), but the stored |
| dimensions set are flat i.e. `sets` logically stores two separate dimension |
| sets: `{"k": "a"}` and `{"k": "b"}`. |
| |
| If there's a collision on `<dimensions_hash>` (i.e. two different dimensions |
| sets hash to the same value), `sets` stores dimensions sets from all colliding |
| entries. Collisions are eventually resolved when bots poll for tasks: they |
| get all tasks that match `<dimensions_hash>`, and then filter out ones that |
| don't actually match bot's dimensions. |
| |
| A TaskDimensionsSets entity is always accompanied by TaskDimensionsInfo |
| entity. TaskDimensionsSets entity stores infrequently changing (but frequently |
| scanned) data, while TaskDimensionsInfo contains all data (including |
| frequently changing data, like expiration timestamps). This separation reduces |
| churn in TaskDimensionsSets datastore indexes (improving performance of full |
| rescans that happen in _tq_rescan_matching_task_sets_async), as well as |
| improves caching efficiency of direct ndb.get_multi fetches of |
| TaskDimensionsSets in assert_bot(...). |
| """ |
| # Disable useless in-process per-request cache to save some RAM. |
| _use_cache = False |
| |
| # All matching dimensions sets as a list of dicts. |
| # |
| # Each dict defines a single set of task dimensions: |
| # |
| # { |
| # "dimensions": ["k1:v1", "k1:v2", "k2:v2", ...] |
| # } |
| # |
| # Dimensions are represented by a sorted dedupped list of `key:value` strings. |
| # These are flattened dimensions as returned by expand_dimensions_to_flats, |
| # i.e. values here never have "|" in them. Such dimension sets can directly be |
| # matched with bot dimensions using set operations. |
| # |
| # Matches dimensions stored in `TaskDimensionsInfo.sets` (sans expiration |
| # time). |
| # |
| # The order of dicts is irrelevant. Never empty. |
| sets = datastore_utils.DeterministicJsonProperty(indexed=False) |
| |
| def contains_task_dimensions_set(self, task_dimensions_flat): |
| """True if any of stored `sets` is equal to `task_dimensions_flat`. |
| |
| Arguments: |
| task_dimensions_flat: a sorted list of `k:v` pairs with task dimensions. |
| """ |
| return any(task_dimensions_flat == s['dimensions'] for s in self.sets) |
| |
| def matches_bot_dimensions(self, bot_dimensions_set): |
| """True if any of stored `sets` matches given bot dimensions. |
| |
| Arguments: |
| bot_dimensions_set: a set of `k:v` pairs with bot dimensions. |
| """ |
| return any( |
| bot_dimensions_set.issuperset(s['dimensions']) for s in self.sets) |
| |
| @staticmethod |
| def dimensions_to_id(dimensions): |
| """Returns a string ID for this entity given the task dimensions dict. |
| |
| Arguments: |
| dimensions: a dict with task dimensions prior to expansion of "|". |
| """ |
| # Both `id` and `pool` are guaranteed to have at most 1 item. If a task |
| # targets a specific bot via `id`, use `bot:` key prefix to allow this bot |
| # to find the task faster, otherwise use `pool:`. |
| if u'id' in dimensions: |
| kind = 'bot' |
| pfx = dimensions[u'id'][0] |
| else: |
| kind = 'pool' |
| pfx = dimensions[u'pool'][0] |
| dimensions_hash = hash_dimensions(dimensions) |
| return '%s:%d' % (TaskDimensionsSets.id_prefix(kind, pfx), dimensions_hash) |
| |
| @staticmethod |
| def id_prefix(kind, pfx): |
| """Returns either `bot:<url-encoded-id>` or `pool:<url-encoded-id>`.""" |
| assert kind in ('bot', 'pool'), kind |
| if isinstance(pfx, unicode): |
| pfx = pfx.encode('utf-8') |
| return '%s:%s' % (kind, urllib.quote_plus(pfx)) |
| |
| @staticmethod |
| def split_id(sets_id): |
| """Given a valid TaskDimensionsSets ID, returns its kind, ID and number. |
| |
| Arguments: |
| sets_id: a string ID, e.g. `bot:some-bot:1234`. |
| |
| Returns: |
| Tuple with kind, ID and number, e.g. `('bot', 'some-bot', 1234)`. |
| """ |
| chunks = sets_id.split(':') |
| if len(chunks) != 3: |
| raise ValueError('Invalid TaskDimensionsSets ID %r' % sets_id) |
| kind, pfx, num = chunks |
| if kind not in ('bot', 'pool'): |
| raise ValueError('Invalid TaskDimensionsSets ID %r' % sets_id) |
| try: |
| return (kind, urllib.unquote(pfx), int(num)) |
| except ValueError: |
| raise ValueError('Invalid TaskDimensionsSets ID %r' % sets_id) |
| |
| @staticmethod |
| def ids_to_queue_numbers(sets_ids): |
| """Given an iterable of TaskDimensionsSets IDs returns a list of queues. |
| |
| Arguments: |
| sets_ids: an iterable with TaskDimensionsSets string IDs. |
| |
| Returns: |
| A sorted list with queue numbers aka dimensions hashes. |
| """ |
| sets_id_to_int = lambda x: int(x[x.rfind(':') + 1:]) |
| return sorted(sets_id_to_int(sets_id) for sets_id in sets_ids) |
| |
| |
| class TaskDimensionsInfo(ndb.Model): |
| """Accompanies TaskDimensionsSets and caries expiration timestamps. |
| |
| The Parent entity is the corresponding TaskDimensionsSets. ID is integer 1. |
| |
| There things can happen to `sets`: |
| * A new entry added: happens in assert_task_async. |
| * Expiration of an existing entry is updated: happens in assert_task_async. |
| * An expired entry is removed: happens in the cleanup cron. |
| |
| Whenever the list of sets in `sets` changes, the matching TaskDimensionsSets |
| entity is updated as well (in a single transaction). This happens relatively |
| infrequently. Changes to the expiry time of existing entries (which happen |
| frequently) do not touch TaskDimensionsSets entity at all. |
| """ |
| # Disable useless in-process per-request cache to save some RAM. |
| _use_cache = False |
| |
| # All matching dimensions sets and their expiry time as a list of dicts. |
| # |
| # Each dict defines a single set of task dimensions together with a timestamp |
| # when it should be removed from the datastore: |
| # |
| # { |
| # "dimensions": ["k1:v1", "k1:v2", "k2:v2", ...], |
| # "expiry": 1663020417 |
| # } |
| # |
| # Dimensions are represented by a sorted dedupped list of `key:value` strings. |
| # These are flattened dimensions as returned by expand_dimensions_to_flats, |
| # i.e. values here never have "|" in them. Such dimension sets can directly be |
| # matched with bot dimensions using set operations. |
| # |
| # Expiry is specified as a integer Unix timestamp in seconds since epoch. |
| # Expiry is always larger (with some wide margin) than the scheduling deadline |
| # of the last task that used these dimensions and it is randomized to avoid |
| # expiration stampedes. |
| # |
| # Matches dimensions stored in `TaskDimensionsSets.sets`. |
| # |
| # The order is irrelevant. Never empty. |
| sets = datastore_utils.DeterministicJsonProperty(indexed=False) |
| |
| # A timestamp when this entity needs to be visited by the cleanup cron. |
| # |
| # The cron visits it if now() > next_cleanup_ts. This timestamp is calculated |
| # based on expiry of individual dimensions sets. |
| next_cleanup_ts = ndb.DateTimeProperty(indexed=True) |
| |
| def expiry_map(self): |
| """Returns sets as a mapping of task dimension tuples to their expiry time. |
| |
| Returns: |
| A dict {tuple(task dimensions list) => expiry datetime.datetime} |
| """ |
| return _sets_to_expiry_map(self.sets) |
| |
| |
| class BotDimensionsMatches(ndb.Model): |
| """Stores what TaskDimensionsSets are matched to a bot. |
| |
| Root entity. Key ID is the bot ID. |
| |
| Created the first time the bot calls assert_bot(...). Deleted when the bot is |
| expected to stop consuming tasks (e.g. it is deleted, quarantined, detected as |
| dead, etc.) in cleanup_after_bot(...) . |
| """ |
| # Disable useless in-process per-request cache to save some RAM. |
| _use_cache = False |
| |
| # Bot dimensions as a sorted list of dedupped 'key:value' strings. |
| # |
| # If a bot reports dimension `{k: ["v1", "v2"]}`, this set will contain both |
| # `k:v1` and `k:v2` strings. |
| # |
| # Indexed to allow finding bots that match particular task dimensions using |
| # a datastore query, see _tq_update_bot_matches_async. |
| dimensions = ndb.StringProperty(repeated=True, indexed=True) |
| |
| # String IDs of all matching TaskDimensionsSets. |
| # |
| # New entries are added via two mechanisms: |
| # * When a new TaskDimensionsSets task appears it proactively "searches" |
| # for matching bots in _tq_update_bot_matches_async. This reduces latency |
| # of scheduling of new kinds of tasks. |
| # * Periodically, and on bot dimensions change, there's a full rescan of |
| # all TaskDimensionsSets, see _tq_rescan_matching_task_sets_async. |
| # |
| # Entries are deleted in assert_bot(...) when their corresponding |
| # TaskDimensionsSets entities disappear (which means there are no more tasks |
| # matching these dimensions anymore) or when bot's dimensions no longer match |
| # dimensions in TaskDimensionsSets (can happen when bot changes dimensions). |
| matches = ndb.StringProperty(repeated=True, indexed=False) |
| |
| # Incremented whenever a rescan is triggered, used to skip obsolete TQ tasks. |
| # |
| # When a task queue task starts, if the current value of `rescan_counter` in |
| # the entity is different from the value in the task payload, then this task |
| # is no longer relevant and should be skipped. |
| rescan_counter = ndb.IntegerProperty(default=0, indexed=False) |
| |
| # When we need to enqueue the next full rescan. |
| # |
| # Note: it is updated when triggering the asynchronous rescan via a task queue |
| # task, not when it completes. |
| next_rescan_ts = ndb.DateTimeProperty(default=utils.EPOCH, indexed=False) |
| |
| # When the last rescan was enqueued, for debugging. |
| last_rescan_enqueued_ts = ndb.DateTimeProperty(indexed=False) |
| # When the last rescan finished, for debugging. |
| last_rescan_finished_ts = ndb.DateTimeProperty(indexed=False) |
| # The last time when `matches` were cleaned up, for debugging. |
| last_cleanup_ts = ndb.DateTimeProperty(indexed=False) |
| # The last time a match was added via _maybe_add_match_async, for debugging. |
| last_addition_ts = ndb.DateTimeProperty(indexed=False) |
| |
| @classmethod |
| @ndb.tasklet |
| def get_or_default_async(cls, bot_id): |
| """Fetches an existing entity or constructs a default one. |
| |
| The constructed entity is not saved here yet. It is caller's responsibility |
| to eventually store it. |
| """ |
| ent = yield ndb.Key(cls, bot_id).get_async() |
| raise ndb.Return(ent or cls(id=bot_id)) |
| |
| |
| ### Internal APIs. |
| |
| # Exceptions that can be raised by transaction_async(...). |
| _TXN_EXCEPTIONS = ( |
| # Deadline starting or landing the transaction. |
| apiproxy_errors.DeadlineExceededError, |
| # Transaction handle has expired. |
| datastore_errors.BadRequestError, |
| # Internal error of unknown origin. |
| datastore_errors.InternalError, |
| # Datastore-specific timeout. |
| datastore_errors.Timeout, |
| # Transaction collisions or internal errors. |
| datastore_utils.CommitError, |
| ) |
| |
| |
| def _is_bot_matching_any_task_dims(bot_dimensions_flat, task_dims_sets): |
| """True if a bot can execute tasks with any of given dimensions. |
| |
| Arguments: |
| bot_dimensions_flat: a list of bot dimensions as `k:v` pairs. |
| task_dims_sets: a list of lists with flat task dimensions represented as |
| `k:v` pairs. This is usually expansion of `|` task dimensions as generated |
| by expand_dimensions_to_flats. |
| """ |
| bot_dims_set = set(bot_dimensions_flat) |
| return any(bot_dims_set.issuperset(dims) for dims in task_dims_sets) |
| |
| |
| def _sets_to_expiry_map(sets): |
| """Constructs a mapping of task dimension tuples to their expiry time. |
| |
| Arguments: |
| sets: a value of `TaskDimensionsInfo.sets` entity property. In particular, |
| `expiry` in dicts is populated. |
| |
| Returns: |
| A dict {tuple(task dimensions list) => expiry datetime.datetime} |
| """ |
| return { |
| tuple(s['dimensions']): utils.timestamp_to_datetime(s['expiry'] * 1e6) |
| for s in sets |
| } |
| |
| |
| def _expiry_map_to_sets(expiry_map, with_expiry=True): |
| """Converts an expiry map back into a list that can be stored in `sets`. |
| |
| Reverse of _sets_to_expiry_map(...). Optionally drops the expiry time for |
| storing `sets` in TaskDimensionsSets. |
| |
| Arguments: |
| expiry_map: a {tuple(task dimensions list) => datetime.datetime}. |
| with_expiry: if True, populate `expiry` field in the result. |
| |
| Returns: |
| A list with dicts, can be stored in `sets` entity property. |
| """ |
| sets = [] |
| for dims, exp in sorted(expiry_map.items()): |
| d = {'dimensions': dims} |
| if with_expiry: |
| d['expiry'] = int(utils.datetime_to_timestamp(exp) / 1e6) |
| sets.append(d) |
| return sets |
| |
| |
| def _put_task_dimensions_sets_async(sets_id, expiry_map): |
| """Puts TaskDimensionsSets and TaskDimensionsInfo entities. |
| |
| Must be called in a transaction to make sure both entities stay in sync. |
| |
| Arguments: |
| sets_id: string ID of TaskDimensionsSets to store under. |
| expiry_map: an expiry map to store there, must not be empty. |
| """ |
| assert ndb.in_transaction() |
| assert expiry_map |
| |
| sets_key = ndb.Key(TaskDimensionsSets, sets_id) |
| info_key = ndb.Key(TaskDimensionsInfo, 1, parent=sets_key) |
| |
| # Schedule a cleanup a little bit past the expiry of the entry that expires |
| # first (to clean it up). Mostly to avoid hitting weird edge cases related |
| # to not perfectly synchronized clocks. |
| next_cleanup_ts = min(expiry_map.values()) + datetime.timedelta(minutes=5) |
| |
| return ndb.put_multi_async([ |
| TaskDimensionsSets(key=sets_key, |
| sets=_expiry_map_to_sets(expiry_map, |
| with_expiry=False)), |
| TaskDimensionsInfo( |
| key=info_key, |
| sets=_expiry_map_to_sets(expiry_map, with_expiry=True), |
| next_cleanup_ts=next_cleanup_ts, |
| ), |
| ]) |
| |
| |
| def _delete_task_dimensions_sets_async(sets_id): |
| """Deletes TaskDimensionsSets and TaskDimensionsInfo entities. |
| |
| Must be called in a transaction to make sure both entities stay in sync. |
| |
| Arguments: |
| sets_id: string ID of TaskDimensionsSets to delete. |
| """ |
| assert ndb.in_transaction() |
| sets_key = ndb.Key(TaskDimensionsSets, sets_id) |
| info_key = ndb.Key(TaskDimensionsInfo, 1, parent=sets_key) |
| return ndb.delete_multi_async([sets_key, info_key]) |
| |
| |
| @ndb.tasklet |
| def _check_matches_async(bot_dimensions_set, sets_ids): |
| """Loads TaskDimensionsSets and checks if they still exist and match the bot. |
| |
| Arguments: |
| bot_dimensions_set: a set of `k:v` pairs with bot dimensions. |
| sets_ids: string IDs of TaskDimensionsSets to load and check. |
| |
| Returns: |
| set(alive and still matching sets IDs), set(stale sets IDs). |
| """ |
| sets_ents = yield ndb.get_multi_async( |
| [ndb.Key(TaskDimensionsSets, sets_id) for sets_id in sets_ids]) |
| |
| alive = set() |
| stale = set() |
| for sets_id, sets_ent in zip(sets_ids, sets_ents): |
| if sets_ent and sets_ent.matches_bot_dimensions(bot_dimensions_set): |
| alive.add(sets_id) |
| else: |
| stale.add(sets_id) |
| |
| raise ndb.Return((alive, stale)) |
| |
| |
| @ndb.tasklet |
| def _assert_task_dimensions_async(task_dimensions, exp_ts): |
| """Ensures there's corresponding TaskDimensionsSets stored in the datastore. |
| |
| If it is a never seen before set of dimensions, will emit a task queue task |
| to find matching bots. Otherwise occasionally makes a transaction to bump |
| expiration time of the stored TaskDimensionsSets entity. |
| |
| Arguments: |
| task_dimensions: a dict with task dimensions (prior to expansion of "|"). |
| exp_ts: datetime.datetime with task expiration (aka scheduling deadline). |
| """ |
| # This is e.g. "pool:<url-encoded-pool-id>:<number>". |
| sets_id = TaskDimensionsSets.dimensions_to_id(task_dimensions) |
| sets_key = ndb.Key(TaskDimensionsSets, sets_id) |
| info_key = ndb.Key(TaskDimensionsInfo, 1, parent=sets_key) |
| |
| log = _Logger('assert_task(%s)', sets_id) |
| |
| # Load expiration of known dimensions sets to see if they need to be updated. |
| info = yield info_key.get_async() |
| expiry_map = _sets_to_expiry_map(info.sets if info else []) |
| |
| # Randomize expiration time a bit. This is done for two reasons: |
| # * Reduce transaction collisions when checking the expiry from concurrent |
| # requests (only the most unlucky one will do the transaction below). |
| # * Spread out load for TaskDimensionsSets cleanup cron a bit in case a |
| # lot of tasks appeared at once. |
| # Some extra time will also be added in the transaction below, see comments |
| # there. |
| exp_ts += _random_timedelta_mins(0, 30) |
| |
| # This expands e.g. `{"k": "a|b"}` into `[("k:a",), ("k:b",)]`. |
| expanded = [tuple(s) for s in expand_dimensions_to_flats(task_dimensions)] |
| |
| # Check all sets are known and fresh. |
| fresh = True |
| for dims in expanded: |
| cur_expiry = expiry_map.get(dims) |
| if not cur_expiry or cur_expiry < exp_ts: |
| fresh = False |
| break |
| if fresh: |
| raise ndb.Return(None) |
| |
| # Some dimensions sets are either missing or stale. We need to transactionally |
| # update the entity to add them, perhaps emitting a task queue task to find |
| # matching bots. |
| @ndb.tasklet |
| def txn(): |
| # Set the stored expiration time far in advance. This reduces frequency of |
| # transactions that update the expiration time in case of a steady stream of |
| # requests (one transaction "covers" all next ~4h of requests: they'll see |
| # the entity as fresh). The downside is that we keep stuff in datastore for |
| # longer than strictly necessary per `exp_ts`. But this is actually a good |
| # thing for infrequent tasks with short expiration time. We **want** to keep |
| # TaskDimensionsSets for them in the datastore longer (perhaps even if there |
| # are no such tasks the queues) to avoid frequently creating and deleting |
| # TaskDimensionsSets entities for them. Creating TaskDimensionsSets is a |
| # costly operation since it requires a scan to find matching bots. |
| extended_ts = exp_ts + datetime.timedelta(hours=4) |
| |
| # Load the fresh state, since we are in the transaction now. |
| info = yield info_key.get_async() |
| expiry_map = _sets_to_expiry_map(info.sets if info else []) |
| |
| # Bump the expiry and add new sets. |
| created = [] |
| updated = [] |
| for dims in expanded: |
| cur_expiry = expiry_map.get(dims) |
| if not cur_expiry: |
| log.info('adding %s, expiry %s', dims, extended_ts) |
| expiry_map[dims] = extended_ts |
| created.append(dims) |
| elif cur_expiry < extended_ts: |
| log.info('expiry of %s: %s => %s', dims, cur_expiry, extended_ts) |
| expiry_map[dims] = extended_ts |
| updated.append(dims) |
| |
| # It is possible the entity was already updated by another transaction. |
| if not created and not updated: |
| log.info('nothing to commit, already updated by another txn') |
| raise ndb.Return(None) |
| |
| # Kick out old sets since we are going to commit the mutation anyway. |
| now = utils.utcnow() |
| for dims, expiry in expiry_map.items(): |
| if expiry < now: |
| log.info('dropping %s, expired at %s', dims, expiry) |
| expiry_map.pop(dims) |
| |
| # Some sets must survive, since we just added/updated fresh sets. |
| assert len(expiry_map) > 0 |
| yield _put_task_dimensions_sets_async(sets_id, expiry_map) |
| |
| # If we added new sets, need to launch a task to find matching bots ASAP. |
| # This reduces scheduling latency for new kinds of tasks. This eventually |
| # calls _tq_update_bot_matches_async. Note we don't need to do this if we |
| # are just updating the expiration time. The assignments of tasks to bots |
| # were already done at that point. |
| if created: |
| log.info('enqueuing a task to find matches for %s', created) |
| ok = yield utils.enqueue_task_async( |
| '/internal/taskqueue/important/task_queues/update-bot-matches', |
| 'update-bot-matches', |
| payload=utils.encode_to_json({ |
| 'task_sets_id': sets_id, |
| 'dimensions': created, |
| 'enqueued_ts': now, |
| }), |
| transactional=True, |
| ) |
| if not ok: |
| raise datastore_utils.CommitError('Failed to enqueue a TQ task') |
| |
| # Do it! |
| log.info('launching txn') |
| yield datastore_utils.transaction_async(txn, retries=3) |
| log.info('txn completed') |
| |
| |
| @ndb.tasklet |
| def _tq_update_bot_matches_async(task_sets_id, task_sets_dims, enqueued_ts): |
| """Assigns a new task dimension set to matching bots. |
| |
| Discovers BotDimensionsMatches that can be potentially matched to any |
| task dimensions set in `task_sets_dims` and transactionally updates them |
| if they indeed match. |
| |
| Arguments: |
| task_sets_id: string ID of TaskDimensionsSets that contains the set. |
| task_sets_dims: a list with lists of flat task dimensions to match on, e.g. |
| `[[`k1:a`, `k2:v1`], [`k1:a`, `k2:v2`]]`. It may have multiple elements |
| when tasks is using OR dimensions. |
| enqueued_ts: datetime.datetime when this task was enqueued, for debugging. |
| |
| Returns: |
| True if succeeded, False if the TQ task needs to be retried. |
| """ |
| assert task_sets_id.startswith(('bot:', 'pool:')), task_sets_id |
| assert all(isinstance(dims, list) for dims in task_sets_dims), task_sets_dims |
| |
| log = _Logger('tq_update_matches(%s)', task_sets_id) |
| |
| # Verify this TQ task is still necessary by checking the entity exists. |
| task_sets = yield ndb.Key(TaskDimensionsSets, task_sets_id).get_async() |
| if not task_sets: |
| log.warning('the entity is already gone, retiring the task') |
| raise ndb.Return(True) |
| |
| # Ignore sets no longer mentioned in the entity (i.e. if there were cleaned up |
| # by the cron). The TQ task may have been stuck for a while. Should be rare. |
| actual_task_dims = [] |
| for dims in task_sets_dims: |
| if task_sets.contains_task_dimensions_set(dims): |
| actual_task_dims.append(dims) |
| else: |
| log.warning('the set is no longer actual: %s', dims) |
| if not actual_task_dims: |
| log.warning('all sets are no longer actual, retiring the task') |
| raise ndb.Return(True) |
| |
| # Set of bot IDs that were visited by `_maybe_add_match_async` already. |
| bots_visited = set() |
| # Bot IDs that were transactionally updated. |
| bots_updated = [] |
| # True if all necessary scans and updates succeeded. |
| ok = True |
| |
| # Bind all unchanging arguments for _maybe_add_match_async. |
| maybe_add_match_async = lambda bot_matches_ent: _maybe_add_match_async( |
| bot_matches_ent, task_sets_id, actual_task_dims, enqueued_ts, log, |
| bots_visited, bots_updated) |
| |
| if task_sets_id.startswith('bot:'): |
| # When the task targets a specific bot, we can find BotDimensionsMatches |
| # fast. There's at most one potentially matching entity, no need to run |
| # queries. |
| _, bot_id, _ = TaskDimensionsSets.split_id(task_sets_id) |
| bot_matches_ent = yield ndb.Key(BotDimensionsMatches, bot_id).get_async() |
| if bot_matches_ent: |
| ok = yield maybe_add_match_async(bot_matches_ent) |
| else: |
| # When targeting all bots in a pool, need to run queries based on indexed |
| # bot dimensions to find them. |
| queries = [] |
| for idx, task_dims in enumerate(actual_task_dims): |
| q = BotDimensionsMatches.query() |
| for kv in task_dims: |
| q = q.filter(BotDimensionsMatches.dimensions == kv) |
| queries.append((q, log.derive('query #%d', idx))) |
| ok = yield _map_async(queries, maybe_add_match_async) |
| |
| # Retry the whole thing if something failed or the query timed out. Hopefully |
| # there will be less things to process on a retry and eventually the task |
| # succeeds. |
| log.info('visited %d bots, updated %d', len(bots_visited), len(bots_updated)) |
| if not ok: |
| log.error('need a retry') |
| raise ndb.Return(ok) |
| |
| |
| @ndb.tasklet |
| def _maybe_add_match_async(bot_matches_ent, task_sets_id, task_sets_dims, |
| enqueued_ts, log, bots_visited, bots_updated): |
| """Adds ID of a matching set to BotDimensionsMatches if still necessary. |
| |
| Implementation detail of _tq_update_bot_matches_async. |
| |
| Arguments: |
| bot_matches_ent: BotDimensionsMatches to update if still necessary. |
| task_sets_id: string ID of TaskDimensionsSets to add as a match. |
| task_sets_dims: a list with lists of flat task dimensions to match on, e.g. |
| `[[`k1:a`, `k2:v1`], [`k1:a`, `k2:v2`]]`. It may have multiple elements |
| when original task dimensions have "|" inside. |
| enqueued_ts: datetime.datetime when the TQ was enqueued, for debugging. |
| log: _Logger to use for logs. |
| bots_visited: a set of already check or updated bot IDs. Will be mutated. |
| bots_updated: a list of already updated bot IDs. Will be mutated. |
| |
| Returns: |
| True if succeeded (or was skipped), False if something failed. |
| """ |
| # A bot may be discovered through different queries. Visit it only once. |
| bot_id = bot_matches_ent.key.string_id() |
| if bot_id in bots_visited: |
| raise ndb.Return(True) |
| bots_visited.add(bot_id) |
| |
| # The bot may be already matched to the task set. |
| if task_sets_id in bot_matches_ent.matches: |
| raise ndb.Return(True) |
| |
| # Check bot dimensions are still a match (this is especially important when |
| # dealing with `bot:...` TaskDimensionsSets, since we don't use a filtering |
| # query in that case). Bot dimensions will be also double checked inside |
| # the transaction. |
| bot_dims = bot_matches_ent.dimensions |
| if not _is_bot_matching_any_task_dims(bot_dims, task_sets_dims): |
| raise ndb.Return(True) |
| |
| # The transaction will redo all the checks (in case dimensions changed) and |
| # add the match. The transaction is needed to avoid stomping over matches |
| # added concurrently via other TQ tasks. |
| @ndb.tasklet |
| def txn(): |
| txn_matches = yield ndb.Key(BotDimensionsMatches, bot_id).get_async() |
| if not txn_matches: |
| raise ndb.Return(False) |
| if task_sets_id in txn_matches.matches: |
| raise ndb.Return(False) |
| bot_dims = txn_matches.dimensions |
| if not _is_bot_matching_any_task_dims(bot_dims, task_sets_dims): |
| raise ndb.Return(False) |
| txn_matches.matches.append(task_sets_id) |
| txn_matches.matches.sort() |
| txn_matches.last_addition_ts = utils.utcnow() |
| yield txn_matches.put_async() |
| raise ndb.Return(True) |
| |
| try: |
| updated = yield datastore_utils.transaction_async(txn, retries=5) |
| if updated: |
| log.info('updated %s, delay %s', bot_id, utils.utcnow() - enqueued_ts) |
| bots_updated.append(bot_id) |
| raise ndb.Return(True) |
| except _TXN_EXCEPTIONS: |
| log.warning('error updating %s', bot_id) |
| raise ndb.Return(False) |
| |
| |
| @ndb.tasklet |
| def _cleanup_task_dimensions_async(dims_info, log): |
| """Removes stale dimensions sets from a TaskDimensions[Sets|Info] entities. |
| |
| Deletes entities entirely if all sets there are stale. Called as part of the |
| cleanup cron. |
| |
| Arguments: |
| dims_info: a TaskDimensionsInfo entity to cleanup. |
| log: _Logger to use for logs. |
| |
| Returns: |
| True if cleaned up, False if already clean or gone. |
| """ |
| now = utils.utcnow() |
| sets_id = dims_info.key.parent().string_id() |
| |
| # Confirm we have something to cleanup before opening a transaction. |
| expiry_map = _sets_to_expiry_map(dims_info.sets) |
| if all(expiry >= now for expiry in expiry_map.values()): |
| raise ndb.Return(False) |
| |
| @ndb.tasklet |
| def txn(): |
| txn_ent = yield dims_info.key.get_async() |
| if not txn_ent: |
| raise ndb.Return(False) |
| expiry_map = _sets_to_expiry_map(txn_ent.sets) |
| |
| changed = False |
| for dims, expiry in expiry_map.items(): |
| if expiry < now: |
| log.info('%s: dropping %s, expired at %s', sets_id, dims, expiry) |
| expiry_map.pop(dims) |
| changed = True |
| |
| if changed: |
| if not expiry_map: |
| yield _delete_task_dimensions_sets_async(sets_id) |
| else: |
| yield _put_task_dimensions_sets_async(sets_id, expiry_map) |
| |
| raise ndb.Return(changed) |
| |
| changed = yield datastore_utils.transaction_async(txn, retries=3) |
| raise ndb.Return(changed) |
| |
| |
| @ndb.tasklet |
| def _assert_bot_dimensions_async(bot_dimensions): |
| """Updates assignment of task queues to the bot, returns them. |
| |
| Runs as part of /bot/poll and must be fast. |
| |
| Arguments: |
| bot_dimensions: a dict with bot dimensions (including "id" dimension), i.e. |
| `{key: [value]}`. |
| |
| Returns: |
| A set of string IDs of TaskDimensionsSets that match the bot. |
| """ |
| bot_id = bot_dimensions[u'id'][0] |
| bot_dimensions_flat = bot_dimensions_to_flat(bot_dimensions) |
| bot_dimensions_set = set(bot_dimensions_flat) |
| |
| log = _Logger('assert_bot(%s)', bot_id) |
| |
| # Load queue numbers assigned to the bot. |
| log.info('loading bot dimensions matches') |
| matches = yield BotDimensionsMatches.get_or_default_async(bot_id) |
| |
| # Load associated dimension sets to check the bot still matches them. This is |
| # the hottest spot that heavily relies on ndb memcache. |
| log.info('checking %d dimensions sets', len(matches.matches)) |
| alive, stale = yield _check_matches_async(bot_dimensions_set, matches.matches) |
| if stale: |
| log.info('will unmatch: %s', ' '.join(stale)) |
| |
| # We need to trigger a rescan of all potentially matching TaskDimensionsSets |
| # when bot dimensions change and also periodically (to workaround eventual |
| # consistency of datastore queries and various races in the code). |
| rescan_reason = None |
| if matches.dimensions != bot_dimensions_flat: |
| rescan_reason = _diff_bot_dims(matches.dimensions, bot_dimensions_flat) |
| elif matches.next_rescan_ts and utils.utcnow() > matches.next_rescan_ts: |
| rescan_reason = 'periodic' |
| |
| if rescan_reason is not None: |
| |
| @ndb.tasklet |
| def enqueue_rescan_txn(): |
| now = utils.utcnow() |
| txn_ent = yield BotDimensionsMatches.get_or_default_async(bot_id) |
| if txn_ent.next_rescan_ts != matches.next_rescan_ts: |
| # This can theoretically happen if a bot is calling /bot/poll in |
| # parallel, which it should not be doing. |
| log.warning('aborting txn, the entity state changed') |
| raise ndb.Return(None) |
| txn_ent.dimensions = bot_dimensions_flat |
| txn_ent.matches = [x for x in txn_ent.matches if x not in stale] |
| txn_ent.rescan_counter += 1 |
| txn_ent.next_rescan_ts = now + _random_timedelta_mins(20, 40) |
| txn_ent.last_rescan_enqueued_ts = now |
| txn_ent.last_cleanup_ts = now |
| # This eventually calls _tq_rescan_matching_task_sets_async. |
| ok = yield utils.enqueue_task_async( |
| '/internal/taskqueue/important/task_queues/rescan-matching-task-sets', |
| 'rescan-matching-task-sets', |
| payload=utils.encode_to_json({ |
| 'bot_id': bot_id, |
| 'rescan_counter': txn_ent.rescan_counter, |
| 'rescan_reason': rescan_reason, |
| }), |
| transactional=True) |
| if not ok: |
| raise datastore_utils.CommitError('Failed to enqueue TQ task') |
| yield txn_ent.put_async() |
| |
| log.info('triggering rescan: %s', rescan_reason) |
| yield datastore_utils.transaction_async(enqueue_rescan_txn, retries=5) |
| elif stale: |
| # If the rescan is not needed, but we have stale matches, clean them up now |
| # to reduce amount of work for future checks. This can be "fire and forget" |
| # operation, but ndb async tasklet model makes this impossible, so we'll |
| # wait. Ignore errors though, they are not critical at this stage. |
| @ndb.tasklet |
| def unmatch_txn(): |
| txn_ent = yield BotDimensionsMatches.get_or_default_async(bot_id) |
| if txn_ent.dimensions != bot_dimensions_flat: |
| log.warning('aborting txn, the entity state changed') |
| raise ndb.Return(None) |
| before = len(txn_ent.matches) |
| txn_ent.matches = [x for x in txn_ent.matches if x not in stale] |
| if len(txn_ent.matches) != before: |
| txn_ent.last_cleanup_ts = utils.utcnow() |
| yield txn_ent.put_async() |
| |
| try: |
| yield datastore_utils.transaction_async(unmatch_txn, retries=5) |
| except _TXN_EXCEPTIONS: |
| log.warning('error when cleaning matches, ignoring') |
| |
| # Log integer IDs of all matching queues. |
| queue_numbers = TaskDimensionsSets.ids_to_queue_numbers(alive) |
| log.info('queues (%d): %s', len(queue_numbers), queue_numbers) |
| |
| raise ndb.Return(alive) |
| |
| |
| @ndb.tasklet |
| def _tq_rescan_matching_task_sets_async(bot_id, rescan_counter, rescan_reason): |
| """A task queue task that finds all matching TaskDimensionsSets for a bot. |
| |
| Arguments: |
| bot_id: ID of the bot. |
| rescan_counter: value of `BotDimensionsMatches.rescan_counter` when the |
| task was enqueued. |
| rescan_reason: a string with rescan reason, for debugging. |
| |
| Returns: |
| True if succeeded, False if the TQ task needs to be retried. |
| """ |
| log = _Logger('tq_rescan_matches(%s)', bot_id) |
| log.info('rescan: %s', rescan_reason) |
| |
| # Verify the task is still fresh and load dimensions to match on. |
| matches = yield BotDimensionsMatches.get_or_default_async(bot_id) |
| if matches.rescan_counter != rescan_counter: |
| log.info('skipping, rescan counter %d != %s', matches.rescan_counter, |
| rescan_counter) |
| raise ndb.Return(True) |
| log.info('enqueued %s ago', utils.utcnow() - matches.last_rescan_enqueued_ts) |
| |
| bot_dimensions_flat = matches.dimensions |
| bot_dimensions_set = set(bot_dimensions_flat) |
| |
| def scan_prefix_query(set_kind, pfx): |
| pfx = TaskDimensionsSets.id_prefix(set_kind, pfx) |
| start = '%s:%s' % (pfx, chr(ord('0') - 1)) |
| end = '%s:%s' % (pfx, chr(ord('9') + 1)) |
| query = ndb.Query( |
| kind='TaskDimensionsSets', |
| filters=ndb.ConjunctionNode( |
| ndb.FilterNode('__key__', '>', ndb.Key(TaskDimensionsSets, start)), |
| ndb.FilterNode('__key__', '<', ndb.Key(TaskDimensionsSets, end)), |
| ), |
| ) |
| return query, log.derive('%s', pfx) |
| |
| # Construct queries that scan for potentially matching TaskDimensionsSets. |
| # |
| # TODO(vadimsh): Each query can be sharded to parallelize the scan even more |
| # if necessary. |
| queries = [scan_prefix_query('bot', bot_id)] |
| for kv in bot_dimensions_flat: |
| k, v = kv.split(':', 1) |
| if k == 'pool': |
| queries.append(scan_prefix_query('pool', v)) |
| |
| # A set of matching TaskDimensionsSets IDs discovered by the scans. |
| alive = set() |
| # A counter of visited items for debugging. |
| visited = [0] |
| |
| def visit_task_dimensions_set(task_dims_sets): |
| visited[0] += 1 |
| if task_dims_sets.matches_bot_dimensions(bot_dimensions_set): |
| alive.add(task_dims_sets.key.string_id()) |
| |
| # Find all TaskDimensionsSets matching the bot dimensions. |
| visited_all = yield _map_async(queries, visit_task_dimensions_set) |
| log.info('visited %d entities, found %d matches', visited[0], len(alive)) |
| |
| # Double check any currently matched sets that were not discovered by the scan |
| # are indeed dead and should be unmatched. This is particularly important if |
| # scans were incomplete due to timeouts (i.e. visited_all is False), but also |
| # matters if the query is "eventually consistent" and omits some very recent |
| # entities. We don't want to delete active matches. |
| _, stale = yield _check_matches_async( |
| bot_dimensions_set, [sid for sid in matches.matches if sid not in alive]) |
| |
| # Store new matches in the entity if the entity still has dimensions we |
| # scanned for. This returns `last_rescan_enqueued_ts` if the entity was |
| # updated or None if not. |
| @ndb.tasklet |
| def txn(): |
| now = utils.utcnow() |
| txn_ent = yield BotDimensionsMatches.get_or_default_async(bot_id) |
| if txn_ent.dimensions != bot_dimensions_flat: |
| log.warning('dimensions changed while we were scanning, aborting') |
| raise ndb.Return(None) |
| # Carefully merge `alive` and `stale` into `matches` to avoid stomping over |
| # matches that may have been added concurrently by _maybe_add_match_async. |
| cur = set(txn_ent.matches) |
| new = (cur | alive) - stale |
| if cur == new: |
| log.info('no changes to the matched set') |
| for x in cur - new: |
| log.info('unmatched %s', x) |
| for x in new - cur: |
| log.info('matched %s', x) |
| txn_ent.matches = sorted(new) |
| txn_ent.last_rescan_finished_ts = now |
| txn_ent.last_cleanup_ts = now |
| yield txn_ent.put_async() |
| raise ndb.Return(txn_ent.last_rescan_enqueued_ts) |
| |
| log.info('storing changes') |
| rescan_enqueued_ts = yield datastore_utils.transaction_async(txn, retries=5) |
| if rescan_enqueued_ts: |
| log.info('rescan delay: %s', utils.utcnow() - rescan_enqueued_ts) |
| |
| # Retire the TQ task if the entity is already stale or we successfully |
| # scanned and updated everything. |
| if not rescan_enqueued_ts: |
| log.warning('this task queue task is stale, retiring it') |
| elif not visited_all: |
| log.error('some scans did not complete, need a retry') |
| raise ndb.Return(stale or visited_all) |
| |
| |
| def _freshen_up_queues_memcache(queues): |
| """Updates memcache records with liveness of given queues. |
| |
| This is needed by probably_has_capacity(...) check. |
| |
| Arguments: |
| queues: a list of integers with queue numbers (aka dimension hashes). |
| """ |
| # TODO(vadimsh): Stop hammering memcache. These entries here easily end up |
| # getting 2k-3k QPS each. |
| memcache.set_multi({str(d): True |
| for d in queues}, |
| time=61, |
| namespace='task_queues_tasks') |
| |
| |
| ### Some generic helpers. |
| |
| |
| class _Logger(object): |
| """Prefixes a string and request duration to logged messages.""" |
| |
| def __init__(self, pfx, *args): |
| self._pfx = pfx % args |
| self._ts = time.time() |
| |
| def derive(self, sfx, *args): |
| """Derives a new logger with prefix extended by the given message.""" |
| logger = _Logger('%s: %s', self._pfx, sfx % args) |
| logger._ts = self._ts |
| return logger |
| |
| def _log(self, lvl, msg, args): |
| lvl('[%.2fs] %s: %s', time.time() - self._ts, self._pfx, msg % args) |
| #print '[%.2fs] %s: %s' % (time.time() - self._ts, self._pfx, msg % args) |
| |
| def info(self, msg, *args): |
| self._log(logging.info, msg, args) |
| |
| def warning(self, msg, *args): |
| self._log(logging.warning, msg, args) |
| |
| def error(self, msg, *args): |
| self._log(logging.error, msg, args) |
| |
| |
| def _diff_bot_dims(old, new): |
| """Pretty-prints changes to bot dimensions into a string. |
| |
| Arguments: |
| old: old bot dimensions as a list of `k:v` strings. |
| new: new bot dimensions as as list of `k:v` strings. |
| |
| Returns: |
| A string for logging. |
| """ |
| old_set = set(old) |
| new_set = set(new) |
| |
| add = ' '.join(dim for dim in new if dim not in old_set) |
| rem = ' '.join(dim for dim in old if dim not in new_set) |
| |
| parts = [] |
| if add: |
| parts.append('dims added [%s]' % add) |
| if rem: |
| parts.append('dims removed [%s]' % rem) |
| return ', '.join(parts) |
| |
| |
| def _random_timedelta_mins(a, b): |
| """Returns a random timedelta in [a min; b min) range.""" |
| return datetime.timedelta(minutes=random.uniform(a, b)) |
| |
| |
| class _AsyncWorkQueue(object): |
| """A queue of work items with limits on concurrency. |
| |
| Used internally by _map_async. |
| """ |
| |
| def __init__(self, producers): |
| """Creates a queue that accepts items from a number of producers. |
| |
| Arguments: |
| producers: number of producers that would enqueue items. Each one is |
| expected to call done() when they are done. Once all producers are done, |
| the queue is not accepting new items. This eventually causes |
| consume_async tasklet to exit. |
| """ |
| assert producers > 0 |
| self._queue = collections.deque() |
| self._producers = producers |
| self._waiter = None |
| |
| def _wakeup(self): |
| if self._waiter: |
| waiter, self._waiter = self._waiter, None |
| waiter.set_result(None) |
| |
| def enqueue(self, items): |
| """Enqueues a batch of items to be processed. |
| |
| Arguments: |
| items: an iterable of items, each one will eventually be passed to the |
| callback in `cb` which may decide to launch a tasklet to process this |
| item. |
| """ |
| assert self._producers > 0 |
| if items: |
| self._queue.extend(items) |
| self._wakeup() |
| |
| def done(self): |
| """Must be called by a producer when they are done with the queue.""" |
| assert self._producers > 0 |
| self._producers -= 1 |
| if not self._producers: |
| self._wakeup() |
| |
| @ndb.tasklet |
| def consume_async(self, cb, max_concurrency): |
| """Runs a loop that dequeues items and calls the callback for each item. |
| |
| See _map_async for expected behavior of the callback. |
| |
| Returns: |
| True if all futures returned by the callback resolved into True. |
| """ |
| ok = True |
| running = [] |
| while self._producers or self._queue: |
| # While have stuff in the queue, keep launching and executing it. |
| while self._queue: |
| fut = cb(self._queue.popleft()) |
| if not isinstance(fut, ndb.Future): |
| continue |
| running.append(fut) |
| |
| # If running too much stuff, wait for some of it to finish. What we'd |
| # really like is ndb.Future.wait_any_async(running), but there's no |
| # async variant of wait_any in Python 2 ndb library (there's one in |
| # Python 3 ndb). So instead just wait for the oldest future to finish |
| # and then collect all the ones that finished during that time. |
| if len(running) >= max_concurrency: |
| yield running[0] |
| pending = [] |
| for f in running: |
| if f.done(): |
| ok = ok and f.get_result() |
| else: |
| pending.append(f) |
| running = pending |
| |
| # If the queue is drained and there are no more producers, we are done. |
| if not self._producers: |
| break |
| |
| # If the queue is drained, but there are producers, wait for more items |
| # to appear or when the last producer drops. |
| assert not self._waiter |
| self._waiter = ndb.Future() |
| yield self._waiter |
| |
| # Wait for the completion of the rest of the items. |
| oks = yield running |
| raise ndb.Return(ok and all(oks)) |
| |
| |
| @ndb.tasklet |
| def _map_async(queries, |
| cb, |
| max_concurrency=200, |
| page_size=1000, |
| timeout=300, |
| max_pages=None): |
| """Applies a callback to results of a bunch of queries. |
| |
| This is roughly similar to ndb.Query.map_async, except it tries to make |
| progress on errors and timeouts and it limits number of concurrent tasklets |
| to avoid grinding to a halt when processing large sets. |
| |
| Arguments: |
| queries: a list of (ndb.Query, _Logger) with queries to fetch results of. |
| cb: a callback that takes a fetched entity and optionally returns a future |
| to wait on. The future must resolve in a boolean, where True indicates |
| the item was processed successfully and False if the processing failed. |
| If the callback doesn't return a future, the item is assumed to be |
| processed successfully. The callback must not raise exceptions, they |
| will abort the whole thing. The callback may be called multiple times |
| with the same entity if this entity is returned by multiple queries. |
| max_concurrency: a limit on number of concurrently pending futures. |
| page_size: a page size for datastore queries. |
| timeout: how long (in seconds) to run the query loop before giving up. |
| max_pages: a limit on number of pages to fetch (mostly for tests). |
| |
| Returns: |
| True if all queries finished to completion and all fetched items were |
| processed successfully. |
| """ |
| queue = _AsyncWorkQueue(len(queries)) |
| |
| @ndb.tasklet |
| def scan(q, logger): |
| # For some reason queries that use has_next_async() API have a hard deadline |
| # of 60s regardless of query options. This is not enough for busy servers. |
| # Instead we'll use an explicitly paginated query. |
| try: |
| logger.info('scanning') |
| deadline = utils.time_time() + timeout |
| count = 0 |
| pages = 0 |
| cursor = None |
| more = True |
| while more: |
| rpc_deadline = deadline - utils.time_time() |
| if rpc_deadline < 0 or (max_pages and pages >= max_pages): |
| raise ndb.Return(False) |
| try: |
| page, cursor, more = yield q.fetch_page_async(page_size, |
| start_cursor=cursor, |
| deadline=rpc_deadline) |
| # Avoid favoring items that appear earlier in the page. It is |
| # important when _map_async is retried, e.g. as a part of TQ task |
| # retry. Gives some opportunity for tail items to be processed. Mostly |
| # useful for larger page sizes. |
| random.shuffle(page) |
| queue.enqueue(page) |
| count += len(page) |
| pages += 1 |
| except (datastore_errors.BadRequestError, datastore_errors.Timeout): |
| logger.error('scan timed out, visited %d items', count) |
| raise ndb.Return(False) |
| logger.info('scan completed, visited %d items', count) |
| raise ndb.Return(True) |
| finally: |
| queue.done() |
| |
| # Run scans that enqueue items into a queue, and in parallel consume items |
| # from this queue. |
| futs = [scan(q, logger) for q, logger in queries] |
| futs.append(queue.consume_async(cb, max_concurrency)) |
| ok = yield futs |
| raise ndb.Return(all(ok)) |
| |
| |
| ### Public APIs. |
| |
| |
| def expand_dimensions_to_flats(dimensions, is_bot_dim=False): |
| """Expands |dimensions| to a series of dimensions_flat. |
| |
| If OR is not used, the result contains exactly one element. Otherwise, it |
| expands the OR dimension into a series of basic dimensions, then flattens each |
| one of them. |
| |
| Returns: a list of dimensions_flat expanded from |dimensions|. A |
| dimensions_flat is a sorted list of '<key>:<value>' of the basic dimensions. |
| |
| This function can be called with invalid dimensions that are reported by the |
| bot. Tolerate them, but trim dimensions longer than 321 characters (the limit |
| is 64+256+1=321). This is important, otherwise handling the returned values |
| that are too long can throw while trying to store this in the datastore. |
| |
| The challenge here is that we're handling unicode strings, but we need to |
| count in term of utf-8 bytes while being efficient. |
| |
| According to https://en.wikipedia.org/wiki/UTF-8, the longest UTF-8 encoded |
| character is 4 bytes. |
| |
| Keys are strictly a subset of ASCII, thus valid keys are at most 64 bytes. |
| |
| Values can be any unicode string limited to 256 characters. So the worst |
| case is 1024 bytes for a valid value. |
| |
| This means that the maximum valid string is 64+1+1024 = 1089 bytes, where 1 is |
| the ':' character which is one byte. |
| |
| One problem is that because surrogate code points are used to encode |
| characters outside the base plane in UTF-16, a UCS2 build of Python may use |
| two Unicode code points for one characters, so len('😬') returns 2, even |
| though it returns 1 on a UCS4 build. |
| |
| So it means that the limit is effectively halved for non-BMP characters, |
| depending on the python build used. |
| |
| Silently remove duplicate dimensions, for the same reason as for long ones. |
| """ |
| dimensions_kv = list(dimensions.items()) |
| for i, (k, v) in enumerate(dimensions_kv): |
| if isinstance(v, (str, unicode)): |
| assert is_bot_dim, (k, v) |
| dimensions_kv[i] = (k, [ |
| v, |
| ]) |
| cur_dimensions_flat = [] |
| result = [] |
| cutoff = config.DIMENSION_KEY_LENGTH + 1 + config.DIMENSION_VALUE_LENGTH |
| |
| def gen(ki, vi): |
| if ki == len(dimensions_kv): |
| # Remove duplicate dimensions. While invalid, we want to make sure they |
| # can be stored without throwing an exception. |
| result.append(sorted(set(cur_dimensions_flat))) |
| return |
| |
| key, values = dimensions_kv[ki] |
| if vi == len(values): |
| gen(ki + 1, 0) |
| return |
| |
| for v in values[vi].split(OR_DIM_SEP): |
| flat = u'%s:%s' % (key, v) |
| if len(flat) > cutoff: |
| assert is_bot_dim, flat |
| # An ellipsis is codepoint U+2026 which is encoded with 3 bytes in |
| # UTF-8. We're still well below the 1500 bytes limit. Datastore uses |
| # UTF-8. |
| flat = flat[:cutoff] + u'…' |
| |
| cur_dimensions_flat.append(flat) |
| gen(ki, vi + 1) |
| cur_dimensions_flat.pop() |
| |
| gen(0, 0) |
| return result |
| |
| |
| def bot_dimensions_to_flat(dimensions): |
| """Returns a flat '<key>:<value>' sorted list of dimensions.""" |
| try: |
| expanded = expand_dimensions_to_flats(dimensions, is_bot_dim=True) |
| except AttributeError as e: |
| logging.exception( |
| "crbug.com/1133117: failed to call expand_dimensions_to_flats for %s", |
| dimensions) |
| raise e |
| assert len(expanded) == 1, dimensions |
| return expanded[0] |
| |
| |
| def hash_dimensions(dimensions): |
| """Returns a 32 bits int that is a hash of the request dimensions specified. |
| |
| Dimensions values still have "|" in them, i.e. this is calculated prior to |
| expansion of OR-ed dimensions into a disjunction of dimension sets. |
| |
| Arguments: |
| dimensions: dict(str, [str]) |
| |
| The return value is guaranteed to be a non-zero int so it can be used as a key |
| id in a ndb.Key. |
| """ |
| # This horrible code is the product of micro benchmarks. |
| # TODO(maruel): This is incorrect, as it can confuse keys and values. But |
| # changing the algo is non-trivial. |
| data = '' |
| for k, values in sorted(dimensions.items()): |
| data += k.encode('utf8') |
| data += '\000' |
| assert isinstance(values, (list, tuple)), values |
| for v in values: |
| data += v.encode('utf8') |
| data += '\000' |
| digest = hashlib.md5(data).digest() |
| # Note that 'L' means C++ unsigned long which is (usually) 32 bits and |
| # python's int is 64 bits. |
| return int(struct.unpack('<L', digest[:4])[0]) or 1 |
| |
| |
| def assert_bot(bot_dimensions): |
| """Registers the bot in the task queues system, fetches matching queues. |
| |
| Coupled with assert_task_async(), enables assignment of tasks to bots by |
| putting tasks into logical queues (represented by queue numbers), and |
| assigning each bot a list of queues it needs to poll tasks from. |
| |
| Arguments: |
| bot_dimensions: dictionary of the bot dimensions. |
| |
| Returns: |
| A list of integers with queues to poll. |
| """ |
| # Get a set of matching TaskDimensionsSets IDs. |
| matches = _assert_bot_dimensions_async(bot_dimensions).get_result() |
| |
| # Update queues' TTL in memcache. This acts as a signal there's still a bot |
| # that polls these particular queues. |
| queues = TaskDimensionsSets.ids_to_queue_numbers(matches) |
| _freshen_up_queues_memcache(queues) |
| |
| return queues |
| |
| |
| def freshen_up_queues(bot_id): |
| """Refreshes liveness of memcache entries for queues polled by the bot. |
| |
| This is needed by probably_has_capacity(...) check. |
| |
| Arguments: |
| bot_id: ID of the polling bot. |
| |
| Returns: |
| A list of queues consumed by the bot. |
| """ |
| matches = ndb.Key(BotDimensionsMatches, bot_id).get() |
| queues = TaskDimensionsSets.ids_to_queue_numbers( |
| matches.matches) if matches else [] |
| _freshen_up_queues_memcache(queues) |
| return queues |
| |
| |
| def cleanup_after_bot(bot_id): |
| """Removes registration of this bot in the task queues system. |
| |
| Arguments: |
| bot_id: ID of the bot to unregister. |
| """ |
| ndb.Key(BotDimensionsMatches, bot_id).delete() |
| |
| |
| @ndb.tasklet |
| def assert_task_async(request): |
| """Makes sure the TaskRequest dimensions, for each TaskProperties, are listed |
| as a known queue. |
| |
| This function must be called before storing the TaskRequest in the DB. |
| |
| When a cache miss occurs, a task queue is triggered. |
| |
| Warning: the task will not be run until the task queue ran, which causes a |
| user visible delay. There is no SLA but expected range is normally seconds at |
| worst. This only occurs on new kind of requests, which is not that often in |
| practice. |
| """ |
| # Note that TaskRequest may not be stored in the datastore yet, but it is |
| # fully populated at this point. |
| exp_ts = request.created_ts |
| futures = [] |
| for i in range(request.num_task_slices): |
| t = request.task_slice(i) |
| exp_ts += datetime.timedelta(seconds=t.expiration_secs) |
| futures.append( |
| _assert_task_dimensions_async(t.properties.dimensions, exp_ts)) |
| yield futures |
| |
| |
| def probably_has_capacity(dimensions): |
| """Returns True if there is likely a live bot to serve this request. |
| |
| There's a risk of collision, that is it could return True even if there is no |
| capacity. The risk is of 2^30 different dimensions sets. |
| |
| It is only looking at the task queues, not at the actual bots. |
| |
| Returns: |
| True or False if the capacity is cached, None otherwise. |
| """ |
| dimensions_hash = str(hash_dimensions(dimensions)) |
| # Sadly, the fact that the key is not there doesn't mean that this task queue |
| # is dead. For example: |
| # - memcache could have been cleared manually or could be malfunctioning. |
| # - in the case where a single bot can service the dimensions, the bot may not |
| # have been polling for N+1 seconds. |
| return memcache.get(dimensions_hash, namespace='task_queues_tasks') |
| |
| |
| def set_has_capacity(dimensions, seconds): |
| """Registers the fact that this task request dimensions set has capacity. |
| |
| Arguments: |
| seconds: the amount of time this 'fact' should be kept. |
| """ |
| dimensions_hash = str(hash_dimensions(dimensions)) |
| memcache.set( |
| dimensions_hash, True, time=seconds, namespace='task_queues_tasks') |
| |
| |
| def update_bot_matches_async(payload): |
| """Assigns new task dimension set to matching bots. |
| |
| Task queue task handler, part of assert_task_async(...) implementation. |
| """ |
| logging.info('TQ task payload:\n%s', payload) |
| payload = json.loads(payload) |
| return _tq_update_bot_matches_async( |
| payload['task_sets_id'], payload['dimensions'], |
| utils.parse_datetime(payload['enqueued_ts'])) |
| |
| |
| def rescan_matching_task_sets_async(payload): |
| """A task queue task that finds all matching TaskDimensionsSets for a bot.""" |
| logging.info('TQ task payload:\n%s', payload) |
| payload = json.loads(payload) |
| return _tq_rescan_matching_task_sets_async(payload['bot_id'], |
| payload['rescan_counter'], |
| payload['rescan_reason']) |
| |
| |
| @ndb.tasklet |
| def tidy_task_dimension_sets_async(): |
| """Removes expired task dimension sets from the datastore. |
| |
| Returns: |
| True if cleaned up everything, False if something failed. |
| """ |
| log = _Logger('tidy_task_dims') |
| |
| updated = [] |
| |
| @ndb.tasklet |
| def cleanup_async(dim_info): |
| sets_id = dim_info.key.parent().string_id() |
| try: |
| cleaned = yield _cleanup_task_dimensions_async(dim_info, log) |
| if cleaned: |
| updated.append(sets_id) |
| raise ndb.Return(True) |
| except _TXN_EXCEPTIONS: |
| log.warning('error cleaning %s', sets_id) |
| raise ndb.Return(False) |
| |
| q = TaskDimensionsInfo.query( |
| TaskDimensionsInfo.next_cleanup_ts < utils.utcnow()) |
| ok = yield _map_async([(q, log)], cleanup_async) |
| log.info('cleaned %d', len(updated)) |
| |
| # Retry the whole thing if something failed or the query timed out. This is |
| # faster than waiting for the next cron tick. Will also give some monitoring |
| # signal. |
| if not ok: |
| log.error('need a retry') |
| raise ndb.Return(ok) |