blob: fbc2e8d2b0a86a18bfdb5a4952396a92f813c465 [file] [log] [blame]
# Copyright 2014 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Swarming bot management, e.g. list of known bots and their state.
+-----------+
|BotRoot |
|id=<bot_id>|
+-----------+
|
+------+--------------+
| | |
| v v
| +-----------+ +-------+
| |BotSettings| |BotInfo|
| |id=settings| |id=info|
| +-----------+ +-------+
|
+------+-----------+----- ... ----+
| | |
v v v
+--------+ +--------+ +--------+
|BotEvent| |BotEvent| ... |BotEvent|
|id=fffff| |if=ffffe| ... |id=00000|
+--------+ +--------+ +--------+
- BotEvent is a monotonically inserted entity that is added for each event
happening for the bot.
- BotInfo is a 'dump-only' entity used for UI, it permits quickly show the
state of every bots in an single query. It is basically a cache of the last
BotEvent and additionally updated on poll. It doesn't need to be updated in a
transaction.
- BotSettings contains bot-specific settings. It must be updated in a
transaction and contains admin-provided settings, contrary to the other
entities which are generated from data provided by the bot itself.
"""
import datetime
import functools
import logging
import time
from google.appengine.api import datastore_errors
from google.appengine.ext import ndb
from components import datastore_utils
from components import utils
from proto.api import swarming_pb2 # pylint: disable=no-name-in-module
from server import config
from server import task_pack
from server import task_queues
from server.constants import OR_DIM_SEP
# BotEvent entities are deleted when they are older than the cutoff.
_OLD_BOT_EVENTS_CUT_OFF = datetime.timedelta(days=4 * 7)
# BotInfo entities are deleted when they are older than the cutoff.
_OLD_BOT_INFO_CUT_OFF = _OLD_BOT_EVENTS_CUT_OFF + datetime.timedelta(hours=4)
### Models.
class BotRoot(ndb.Model):
"""Root entity for BotEvent, BotInfo and BotSettings.
Doesn't actually exist in the datastore. Just used as a parent for other
entity keys.
"""
class _BotCommon(ndb.Model):
"""Common data between BotEvent and BotInfo.
Parent is BotRoot.
"""
# State is purely informative. It is completely free form.
state = datastore_utils.DeterministicJsonProperty(json_type=dict)
# The current bot session ID as reported when the bot connected.
session_id = ndb.StringProperty(indexed=False)
# IP address as seen by the HTTP handler.
external_ip = ndb.StringProperty(indexed=False)
# Bot identity as seen by the HTTP handler.
authenticated_as = ndb.StringProperty(indexed=False)
# Version of swarming_bot.zip the bot is currently running.
version = ndb.StringProperty(default='', indexed=False)
# Set when either:
# - dimensions['quarantined'] or state['quarantined'] is set. This either
# happens via internal python error (e.g. an exception while generating
# dimensions) or via self-health check.
# - dimensions['id'] is not exactly one item.
# - invalid HTTP POST request keys.
# - BotSettings.quarantined was set at that moment.
# https://crbug.com/839415
quarantined = ndb.BooleanProperty(default=False, indexed=False)
# If set, the bot is rejecting tasks due to maintenance.
maintenance_msg = ndb.StringProperty(indexed=False)
# Affected by event_type == 'request_task', 'task_killed', 'task_completed',
# 'task_error'.
task_id = ndb.StringProperty(indexed=False)
# Deprecated. TODO(crbug/897355): Remove.
lease_id = ndb.StringProperty(indexed=False)
lease_expiration_ts = ndb.DateTimeProperty(indexed=False)
leased_indefinitely = ndb.BooleanProperty(indexed=False)
machine_type = ndb.StringProperty(indexed=False)
machine_lease = ndb.StringProperty(indexed=False)
# Dimensions are used for task selection. They are encoded as a list of
# key:value. Keep in mind that the same key can be used multiple times. The
# list must be sorted. It is indexed to enable searching for bots.
dimensions_flat = ndb.StringProperty(repeated=True)
# Last time the bot pinged and this entity was updated
last_seen_ts = ndb.DateTimeProperty(indexed=False)
# Time the bot started polling for next task.
# None is set during running task or hooks.
idle_since_ts = ndb.DateTimeProperty(indexed=False)
# When this entity can be cleaned up via Cloud Datastore TTL policy.
expire_at = ndb.DateTimeProperty(indexed=False)
@property
def dimensions(self):
"""Returns a dict representation of self.dimensions_flat."""
out = {}
for i in self.dimensions_flat:
k, v = i.split(':', 1)
out.setdefault(k, []).append(v)
return out
@property
def id(self):
return self.key.parent().string_id()
@property
def task(self):
if not self.task_id:
return None
return task_pack.unpack_run_result_key(self.task_id)
def to_dict(self, exclude=None):
exclude = ['dimensions_flat'] + (exclude or [])
out = super(_BotCommon, self).to_dict(exclude=exclude)
out['dimensions'] = self.dimensions
return out
def to_proto(self, out):
"""Converts self to a swarming_pb2.Bot."""
# Used by BotEvent.to_proto() and BotInfo.to_proto().
if self.key:
out.bot_id = self.key.parent().string_id()
#out.session_id = '' # https://crbug.com/786735
for l in self.dimensions_flat:
if l.startswith(u'pool:'):
out.pools.append(l[len(u'pool:'):])
if self.is_dead:
out.status = swarming_pb2.MISSING
out.status_msg = ''
if self.is_idle:
out.status = swarming_pb2.IDLE
# https://crbug.com/757931: QUARANTINED_BY_SERVER
# https://crbug.com/870723: OVERHEAD_BOT_INTERNAL
# https://crbug.com/870723: HOST_REBOOTING
# https://crbug.com/913978: RESERVED
if self.quarantined:
out.status = swarming_pb2.QUARANTINED_BY_BOT
msg = (self.state or {}).get(u'quarantined')
if msg:
if not isinstance(msg, basestring):
# Having {'quarantined': True} is valid for the state, convert this to
# a string.
msg = 'true'
out.status_msg = msg
elif self.maintenance_msg:
out.status = swarming_pb2.OVERHEAD_MAINTENANCE_EXTERNAL
out.status_msg = self.maintenance_msg
elif self.task_id:
out.status = swarming_pb2.BUSY
if self.task_id:
out.current_task_id = self.task_id
for key, values in sorted(self.dimensions.items()):
d = out.dimensions.add()
d.key = key
for value in values:
d.values.append(value)
# The BotInfo part.
if self.state:
out.info.supplemental.update(self.state)
if self.version:
out.info.version = self.version
if self.authenticated_as:
out.info.authenticated_as = self.authenticated_as
if self.external_ip:
out.info.external_ip = self.external_ip
if self.is_dead and self.last_seen_ts:
out.info.last_seen_ts.FromDatetime(self.last_seen_ts)
# TODO(maruel): Populate bot.info.host and bot.info.devices.
# https://crbug.com/916570
def _pre_put_hook(self):
super(_BotCommon, self)._pre_put_hook()
self.dimensions_flat.sort()
class BotInfo(_BotCommon):
"""This entity declare the knowledge about a bot that successfully connected.
Parent is BotRoot. Key id is 'info'.
This entity is a cache of the last BotEvent and is additionally updated on
poll, which does not create a BotEvent.
"""
# One of:
NOT_IN_MAINTENANCE = 1 << 9 # 512
IN_MAINTENANCE = 1 << 8 # 256
# One of:
ALIVE = 1 << 7 # 128
DEAD = 1 << 6 # 64
# One of:
HEALTHY = 1 << 3 # 8
QUARANTINED = 1 << 2 # 4
# One of:
IDLE = 1<<1 # 2
BUSY = 1<<0 # 1
# First time this bot was seen.
first_seen_ts = ndb.DateTimeProperty(auto_now_add=True, indexed=False)
# Must only be set when self.task_id is set.
task_name = ndb.StringProperty(indexed=False)
# Avoid having huge amounts of indices to query by quarantined/idle.
composite = ndb.IntegerProperty(repeated=True)
def calc_composite(self):
"""Returns the value for BotInfo.composite, which permits quick searches."""
return [
self.IN_MAINTENANCE
if self.maintenance_msg else self.NOT_IN_MAINTENANCE,
self.DEAD if self._should_be_dead() else self.ALIVE,
self.QUARANTINED if self.quarantined else self.HEALTHY,
self.IDLE if self.idle_since_ts else self.BUSY,
]
def _should_be_dead(self, deadline=None):
if not self.last_seen_ts:
return False
return self.last_seen_ts <= (deadline or self._deadline())
@property
def is_dead(self):
assert self.composite, 'Please store first'
return self.DEAD in self.composite
def to_dict(self, exclude=None):
out = super(BotInfo, self).to_dict(exclude=exclude)
# Inject the bot id, since it's the entity key.
out['id'] = self.id
out['is_dead'] = self.is_dead
return out
#def to_proto(self, out):
# """Converts self to a swarming_pb2.Bot."""
# This populates most of the data.
# super(BotInfo, self).to_proto(out)
# https://crbug.com/757931: QUARANTINED_BY_SERVER
# https://crbug.com/870723: OVERHEAD_BOT_INTERNAL
# https://crbug.com/870723: HOST_REBOOTING
# https://crbug.com/913978: RESERVED
# TODO(maruel): Populate bot.info.host and bot.info.devices.
# https://crbug.com/916570
def _pre_put_hook(self):
super(BotInfo, self)._pre_put_hook()
if not self.task_id:
self.task_name = None
self.composite = self.calc_composite()
@classmethod
def yield_alive_bots(cls):
"""Yields BotInfo of all alive bots."""
q = cls.query(cls.composite == cls.ALIVE)
cursor = None
more = True
while more:
bots, cursor, more = q.fetch_page(1000, start_cursor=cursor)
for b in bots:
yield b
@staticmethod
def _deadline():
dt = datetime.timedelta(seconds=config.settings().bot_death_timeout_secs)
return utils.utcnow() - dt
class BotEvent(_BotCommon):
"""This entity is immutable.
Parent is BotRoot.
This entity is created on each bot state transition.
"""
_MAPPING = {
'bot_connected': swarming_pb2.BOT_NEW_SESSION,
'bot_error': swarming_pb2.BOT_HOOK_ERROR,
'bot_log': swarming_pb2.BOT_HOOK_LOG,
'bot_missing': swarming_pb2.BOT_MISSING,
'bot_rebooting': swarming_pb2.BOT_REBOOTING_HOST,
'bot_shutdown': swarming_pb2.BOT_SHUTDOWN,
'bot_terminate': swarming_pb2.INSTRUCT_TERMINATE_BOT,
'request_restart': swarming_pb2.INSTRUCT_RESTART_BOT,
# Shall only be stored when there is a significant difference in the bot
# state versus the previous event.
'request_sleep': swarming_pb2.INSTRUCT_IDLE,
'request_task': swarming_pb2.INSTRUCT_START_TASK,
'request_update': swarming_pb2.INSTRUCT_UPDATE_BOT_CODE,
'task_completed': swarming_pb2.TASK_COMPLETED,
'task_error': swarming_pb2.TASK_INTERNAL_FAILURE,
'task_killed': swarming_pb2.TASK_KILLED,
# These values are not registered in the API.
'bot_idle': None,
'bot_polling': None,
'task_update': None,
}
ALLOWED_EVENTS = {
# Bot specific events that are outside the scope of a task:
'bot_connected',
'bot_error',
'bot_idle',
'bot_log',
'bot_missing',
'bot_polling',
'bot_rebooting',
'bot_shutdown',
'bot_terminate',
# Bot polling result:
'request_restart',
'request_sleep',
'request_task',
'request_update',
# Task lifetime as processed by the bot:
'task_completed',
'task_error',
'task_killed',
'task_update',
}
# Common properties for all events (which includes everything in _BotCommon).
ts = ndb.DateTimeProperty(auto_now_add=True)
event_type = ndb.StringProperty(choices=ALLOWED_EVENTS, indexed=False)
# event_type == 'bot_error', 'request_restart', 'bot_rebooting', etc.
message = ndb.TextProperty()
@property
def is_dead(self):
return self.event_type == 'bot_missing'
@property
def is_idle(self):
return (self.event_type in ('request_sleep', 'bot_idle')
and not self.quarantined and not self.maintenance_msg)
def to_proto(self, out):
"""Converts self to a swarming_pb2.BotEvent."""
if self.ts:
out.event_time.FromDatetime(self.ts)
# Populates out.bot with _BotCommon.
_BotCommon.to_proto(self, out.bot)
e = self._MAPPING.get(self.event_type)
if e:
out.event = e
if self.message:
out.event_msg = self.message
class BotSettings(ndb.Model):
"""Contains all settings that are set by the administrator on the server.
Parent is BotRoot. Key id is 'settings'.
This entity must always be updated in a transaction.
"""
# If set to True, no task is handed out to this bot due to the bot being in a
# broken situation.
quarantined = ndb.BooleanProperty()
### Public APIs.
def get_root_key(bot_id):
"""Returns the BotRoot ndb.Key for a known bot."""
if not bot_id:
raise ValueError('Bad id')
return ndb.Key(BotRoot, bot_id)
def get_info_key(bot_id):
"""Returns the BotInfo ndb.Key for a known bot."""
return ndb.Key(BotInfo, 'info', parent=get_root_key(bot_id))
def get_events_query(bot_id):
"""Returns an ndb.Query for most recent events in reverse chronological order.
"""
# Disable the in-process local cache. This is important, as there can be up to
# a thousand entities loaded in memory, and this is a pure memory leak, as
# there's no chance this specific instance will need these again, therefore
# this leads to 'Exceeded soft memory limit' AppEngine errors.
q = BotEvent.query(
default_options=ndb.QueryOptions(use_cache=False),
ancestor=get_root_key(bot_id))
q = q.order(-BotEvent.ts)
return q
def get_latest_info(bot_id):
"""Returns the last known BotInfo, reconstructing it from the event history
if necessary.
The returned BotInfo should not be used in any transactions since it may not
really exist.
Returns:
(BotInfo, exists) where `exists` is True if BotInfo exists for real and
False if the bot was already deleted and BotInfo was reconstructed from
the history.
"""
# A quick check for bots that exist.
info_key = get_info_key(bot_id)
bot = info_key.get(use_cache=False, use_memcache=False)
if bot:
return bot, True
# If there is no BotInfo, it means the bot doesn't currently exist (i.e. it
# was deleted or never existed). Look into the event history to get its last
# known state. If the history is empty, it means the bot never existed or was
# deleted long time ago. If there's a history, it means the bot existed, but
# was deleted.
events = get_events_query(bot_id).fetch(1)
if not events:
return None, False
# After the first get() and before get_events_query() a bot may have
# handshaked, which created a BotEvent record that get_events_query() fetched.
# Here, another get() is made to make sure the bot is still actually missing.
# If we skip this check, we may accidentally mark a very new bot as deleted.
#
# See https://crbug.com/1407381 for more information.
bot = info_key.get(use_cache=False)
if bot:
return bot, True
# Reconstruct BotInfo of the deleted bot based on the latest event.
bot = BotInfo(key=info_key,
dimensions_flat=events[0].dimensions_flat,
state=events[0].state,
session_id=events[0].session_id,
external_ip=events[0].external_ip,
authenticated_as=events[0].authenticated_as,
version=events[0].version,
quarantined=events[0].quarantined,
maintenance_msg=events[0].maintenance_msg,
task_id=events[0].task_id,
last_seen_ts=events[0].ts)
# message_conversion.bot_info_to_rpc calls `is_dead` and this property
# require `composite` to be calculated. The calculation is done in
# _pre_put_hook usually. But the BotInfo shouldn't be stored in this
# case, as it's already deleted.
bot.composite = bot.calc_composite()
return bot, False
def get_bot_pools(bot_id):
"""Returns pools the bot belongs to or should belong to once it appears.
Uses the latest BotInfo, falling back to the last recorded BotEvent for
deleted bots that don't have a BotInfo. If there's no such bot at all returns
an empty list.
Returns:
List of pools or an empty list if the bot is unknown.
"""
# TODO(vadimsh): Check the static config first before hitting the datastore.
# Fallback to the datastore only if the config is empty (to support showing
# deleted bots).
# Note: unlike get_latest_info() here we don't care about carefully detecting
# if a bot is dead. This allows to skip one get() when checking deleted bots.
bot = get_info_key(bot_id).get(use_cache=False, use_memcache=False)
if bot:
return get_pools_from_dimensions_flat(bot.dimensions_flat)
events = get_events_query(bot_id).fetch(1)
if not events:
return []
return get_pools_from_dimensions_flat(events[0].dimensions_flat)
def get_settings_key(bot_id):
"""Returns the BotSettings ndb.Key for a known bot."""
return ndb.Key(BotSettings, 'settings', parent=get_root_key(bot_id))
def filter_dimensions(q, dimensions):
"""Filters a ndb.Query for BotInfo based on dimensions in the request."""
for d in dimensions:
parts = d.split(':', 1)
if len(parts) != 2 or any(i.strip() != i or not i for i in parts):
raise ValueError('Invalid dimensions')
# expand OR operator
# e.g. 'foo:A|B' -> ['foo:A', 'foo:B']
values = parts[1].split(OR_DIM_SEP)
dims = ['%s:%s' % (parts[0], v) for v in values]
q = q.filter(BotInfo.dimensions_flat.IN(dims))
return q
def filter_availability(q, quarantined, in_maintenance, is_dead, is_busy):
"""Filters a ndb.Query for BotInfo based on quarantined/is_dead/is_busy."""
if quarantined is not None:
if quarantined:
q = q.filter(BotInfo.composite == BotInfo.QUARANTINED)
else:
q = q.filter(BotInfo.composite == BotInfo.HEALTHY)
if in_maintenance is not None:
if in_maintenance:
q = q.filter(BotInfo.composite == BotInfo.IN_MAINTENANCE)
else:
q = q.filter(BotInfo.composite == BotInfo.NOT_IN_MAINTENANCE)
if is_busy is not None:
if is_busy:
q = q.filter(BotInfo.composite == BotInfo.BUSY)
else:
q = q.filter(BotInfo.composite == BotInfo.IDLE)
if is_dead:
q = q.filter(BotInfo.composite == BotInfo.DEAD)
elif is_dead is not None:
q = q.filter(BotInfo.composite == BotInfo.ALIVE)
# TODO(charliea): Add filtering based on the 'maintenance' field.
return q
def _apply_event_updates(bot_info, event_type, now, session_id, task_id,
task_name, external_ip, authenticated_as,
dimensions_flat, state, version, quarantined,
maintenance_msg, register_dimensions):
"""Mutates BotInfo based on event details passed to bot_event(...)."""
# Bump the expiration time every time the entity is touched. Note that this
# field is unindexed (Cloud Datastore TTL policy doesn't need an index),
# the cost of updating it is negligible.
bot_info.expire_at = now + _OLD_BOT_INFO_CUT_OFF
# `bot_missing` event is created by the server (see cron_update_bot_info), not
# a bot. So it shouldn't update fields that are reported by the bot itself.
#
# If the `last_seen_ts` gets updated, it would change the bot composite
# to alive. And if it clears `maintenance_msg`, it would change the composite
# to NOT_IN_MAINTENANCE and lose the message.
if event_type != 'bot_missing':
bot_info.last_seen_ts = now
if session_id is not None:
bot_info.session_id = session_id
if external_ip is not None:
bot_info.external_ip = external_ip
if authenticated_as is not None:
bot_info.authenticated_as = authenticated_as
bot_info.maintenance_msg = maintenance_msg
# Override values in BotInfo only if we have newer values.
if state is not None:
bot_info.state = state
if version is not None:
bot_info.version = version
if quarantined is not None:
bot_info.quarantined = quarantined
if task_id is not None:
bot_info.task_id = task_id
if task_name is not None:
bot_info.task_name = task_name
# Remove the task from the BotInfo summary in the following cases
# 1) When the task finishes (event_type=task_XXX)
# In these cases, the BotEvent shall have the task
# since the event still refers to it
# 2) When the bot is polling (event_type=request_sleep|bot_idle|bot_polling)
# The bot has already finished the previous task.
# But it could have forgotten to remove the task from the BotInfo.
# So ensure the task is removed.
# 3) When the bot is missing
# We assume it can't process assigned task anymore.
if event_type in ('task_completed', 'task_error', 'task_killed',
'request_sleep', 'bot_idle', 'bot_polling', 'bot_missing'):
bot_info.task_id = None
bot_info.task_name = None
# idle_since_ts is updated only when bot is idle in healthy state.
is_idle = (event_type in ('request_sleep', 'bot_idle')
and not bot_info.quarantined and not bot_info.maintenance_msg)
if is_idle:
bot_info.idle_since_ts = bot_info.idle_since_ts or now
else:
bot_info.idle_since_ts = None
# Update dimensions only if they are given and `register_dimensions` is True.
if (dimensions_flat and register_dimensions
and bot_info.dimensions_flat != dimensions_flat):
logging.debug('bot_event: Updating dimensions. from: %s, to: %s',
bot_info.dimensions_flat, dimensions_flat)
bot_info.dimensions_flat = dimensions_flat
def _snapshot_bot_info(bot_info):
"""Captures a subset of BotInfo used to detect it we need to emit an event."""
return (
bot_info.calc_composite(),
bot_info.dimensions_flat,
)
# Events that happen very often and not worth reporting individually.
_FREQUENT_EVENTS = frozenset(
['request_sleep', 'task_update', 'bot_idle', 'bot_polling'])
# Events that may result in creation of new BotInfo entities (i.e. a new bot
# appearing). Most often this is just bot_connected.
_HEALTHY_BOT_EVENTS = frozenset([
'bot_connected',
'bot_idle',
'bot_polling',
'request_restart',
'request_sleep',
'request_task',
'request_update',
])
def _should_store_event(event_type, before, after):
"""Decides if we should store a new BotEvent entity.
Arguments:
event_type: event type, one of BotEvent.ALLOWED_EVENTS.
before: a result of _snapshot_bot_info() before any changes.
after: a result of _snapshot_bot_info() after all changes has been applied.
Returns:
True or False.
"""
return (
# Record all "rare" events.
event_type not in _FREQUENT_EVENTS
# Record status changing events (crbug.com/952984, crbug.com/1040345).
or before[0] != after[0]
# Record changes to dimensions (crbug.com/1015365).
or before[1] != after[1])
def _insert_bot_with_txn(root_key, bot_info, event):
"""Stores BotInfo and/or BotEvent (skipping None)."""
entities = []
if bot_info:
assert bot_info.key.root() == root_key
entities.append(bot_info)
if event:
assert event.key.root() == root_key
entities.append(event)
if not entities:
return
attempt = 1
delay = 0.1
what = 'event %s' % event.event_type if event else 'bot info'
while True:
try:
logging.info('Attempt %d to insert %s (%d entities) for bot_id %s',
attempt, what, len(entities), root_key.id())
attempt += 1
if len(entities) == 1:
entities[0].put()
else:
datastore_utils.transaction(lambda: ndb.put_multi(entities), retries=0)
break
except (datastore_utils.CommitError, datastore_errors.InternalError,
datastore_errors.Timeout,
datastore_errors.TransactionFailedError) as exc:
logging.warning('_insert_bot_with_txn: error inserting %s for %s: %s',
what, root_key.id(), exc)
delay = min(5.0, delay * 2.0)
time.sleep(delay)
def bot_event(event_type,
bot_id,
session_id=None,
task_id=None,
task_name=None,
external_ip=None,
authenticated_as=None,
dimensions=None,
state=None,
version=None,
quarantined=None,
maintenance_msg=None,
event_msg=None,
register_dimensions=False):
"""Updates the state of the bot in the datastore.
This call usually means the bot is alive (not dead), except for `bot_missing`
event which is created by server. The bot may be quarantined, in which case it
will be unregistered from the task queues.
If the bot is declaring maintenance, it will be kept in the task queues, as
maintenance is supposed to be temporary and expected to complete within a
reasonable time frame.
Arguments:
event_type: event type, one of BotEvent.ALLOWED_EVENTS. Required.
bot_id: bot id. Required.
session_id: the current bot session ID.
task_id: packed task id if relevant. Set to '' to zap the stored value. If
None, keep the previous value.
task_name: task name if relevant. Zapped when task_id is zapped. If None,
keep the previous value.
external_ip: IP address as seen by the HTTP handler. If None, keep the
previous value.
authenticated_as: bot identity as seen by the HTTP handler. If None, keep
the previous value.
dimensions: bot's dimensions as self-reported. If None, keep the previous
value.
state: a dict with ephemeral state of the bot. It is expected to change
constantly. If None or empty, keep the previous value.
version: swarming_bot.zip version as self-reported. Used to spot if a bot
failed to update promptly. If None, keep the previous value.
quarantined: bool to determine if the bot was declared quarantined. If None,
keep the previous value.
maintenance_msg: string describing why the bot is in maintenance. If None
or empty, the bot is not considered in maintenance.
event_msg: an optional informational message to store in BotEvent. Doesn't
affect bot state.
register_dimensions: bool to specify whether to update dimensions stored in
BotInfo. This affects discoverability of this bot by the scheduler. If
`dimensions` is empty, `register_dimensions` is ignored.
Returns:
ndb.Key to BotEvent entity if one was added.
"""
assert event_type in BotEvent.ALLOWED_EVENTS, event_type
if not bot_id:
return None
# BotInfo and BotEvent operate with flattened dimensions.
dimensions_flat = task_queues.bot_dimensions_to_flat(dimensions or {})
# Retrieve the previous BotInfo to update it. Note that most events are
# ultimately produced by a single serial bot process and there should not be
# concurrent events from the same bot, therefore there are no transactions
# here. In the worst case some intermediary state changes won't be properly
# recorded.
info_key = get_info_key(bot_id)
bot_info = info_key.get(use_cache=False, use_memcache=False)
store_bot_info = True
if not bot_info:
# Register only id and pool dimensions at the first handshake.
bot_info = BotInfo(
key=info_key,
dimensions_flat=[
d for d in dimensions_flat
if d.startswith('id:') or d.startswith('pool:')
],
)
# Create BotInfo only if this event indicates the bot is actually alive.
# This check exists to workaround race conditions when deleting bots. In
# particular, cron_update_bot_info marks the bot as deleted before it
# emits `bot_missing` event. If between these two calls, something
# (like GCE Provider) notices the bot is marked as dead and calls DeleteBot,
# the late `bot_missing` event ends up recreating the deleted bot. A similar
# thing can happen when a dead bot suddenly comes back to life just to
# report that it has failed a task and is terminating now (`bot_shutdown`
# event).
store_bot_info = event_type in _HEALTHY_BOT_EVENTS
if not store_bot_info:
logging.warning('No BotInfo(%s) when storing %s', bot_id, event_type)
# Use the exact same timestamp in both BotInfo and BotEvent for consistency.
now = utils.utcnow()
# Snapshot the state before any changes, used in _should_store_event.
state_before = _snapshot_bot_info(bot_info)
# Mutate BotInfo in place based on the event details.
_apply_event_updates(bot_info=bot_info,
event_type=event_type,
now=now,
session_id=session_id,
task_id=task_id,
task_name=task_name,
external_ip=external_ip,
authenticated_as=authenticated_as,
dimensions_flat=dimensions_flat,
state=state,
version=version,
quarantined=quarantined,
maintenance_msg=maintenance_msg,
register_dimensions=register_dimensions)
# Snapshot the state after changes, used in _should_store_event.
state_after = _snapshot_bot_info(bot_info)
# If the bot is quarantined, unregister it from scheduler queues.
if quarantined:
task_queues.cleanup_after_bot(bot_id)
# We always need to save BotInfo (in particular `last_seen_ts`) to keep the
# bot alive in the datastore, but we should record BotEvent only in case
# something interesting happens.
if _should_store_event(event_type, state_before, state_after):
event = BotEvent(parent=bot_info.key.parent(),
event_type=event_type,
ts=now,
expire_at=now + _OLD_BOT_EVENTS_CUT_OFF,
session_id=bot_info.session_id,
external_ip=bot_info.external_ip,
authenticated_as=bot_info.authenticated_as,
dimensions_flat=(dimensions_flat
or bot_info.dimensions_flat),
quarantined=bot_info.quarantined,
maintenance_msg=bot_info.maintenance_msg,
state=bot_info.state,
task_id=task_id or bot_info.task_id,
version=bot_info.version,
last_seen_ts=bot_info.last_seen_ts,
idle_since_ts=bot_info.idle_since_ts,
message=event_msg)
_insert_bot_with_txn(info_key.root(), bot_info if store_bot_info else None,
event)
return event.key
# No need to emit an event. Just update BotInfo on its own.
if store_bot_info:
_insert_bot_with_txn(info_key.root(), bot_info, None)
return None
def has_capacity(dimensions):
"""Returns True if there's a reasonable chance for this task request
dimensions set to be serviced by a bot alive.
First look at the task queues, then look into the datastore to figure this
out.
"""
assert not ndb.in_transaction()
# Look at the fast path.
cap = task_queues.probably_has_capacity(dimensions)
if cap is not None:
return cap
# Add it to the 'quick cache' to improve performance. This cache is kept for
# the same duration as how long bots are considered still alive without a
# ping. Useful if there's a single bot in the fleet for these dimensions and
# it takes a long time to reboot. This is the case with Android with slow
# initialization and some baremetal bots (thanks SCSI firmware!).
seconds = config.settings().bot_death_timeout_secs
@ndb.tasklet
def run_query(flat):
# Do a query. That's slower and it's eventually consistent.
q = BotInfo.query()
for f in flat:
q = q.filter(BotInfo.dimensions_flat == f)
num = yield q.count_async(limit=1)
if num:
logging.info('Found capacity via BotInfo: %s', flat)
raise ndb.Return(True)
# Search a bit harder. In this case, we're looking for BotEvent which would
# be a bot that used to exist recently.
cutoff = utils.utcnow() - datetime.timedelta(seconds=seconds)
q = BotEvent.query(BotEvent.ts > cutoff)
for f in flat:
q = q.filter(BotEvent.dimensions_flat == f)
num = yield q.count_async(limit=1)
if num:
logging.info('Found capacity via BotEvent: %s', flat)
raise ndb.Return(True)
raise ndb.Return(False)
futures = [
run_query(f) for f in task_queues.expand_dimensions_to_flats(dimensions)
]
ndb.tasklets.Future.wait_all(futures)
if any(f.get_result() for f in futures):
task_queues.set_has_capacity(dimensions, seconds)
return True
logging.warning('HAS NO CAPACITY: %s', dimensions)
return False
def get_pools_from_dimensions_flat(dimensions_flat):
"""Gets pools from dimensions_flat."""
return [
d.replace('pool:', '') for d in dimensions_flat if d.startswith('pool:')
]
def cron_update_bot_info():
"""Refreshes BotInfo.composite for dead bots."""
@ndb.tasklet
def run(bot_key):
bot = bot_key.get()
if not bot:
logging.debug('BotInfo %s deleted since query or query was stale',
bot_key)
raise ndb.Return(None)
if not bot.is_dead and bot._should_be_dead():
# `is_dead` is updated in _pre_put_hook based on should_be_dead.
logging.info('Changing Bot status to DEAD: %s', bot.id)
yield bot.put_async()
raise ndb.Return(bot)
logging.debug('BotInfo changed since query or query was stale, %r', bot)
raise ndb.Return(None)
# Note: tx_result can potentially block for a significant amount of time since
# it makes several datastore updates (including a transaction) in a blocking
# way.
def tx_result(future, stats):
try:
bot = future.get_result()
if not bot:
stats['stale'] += 1
return
stats['dead'] += 1
# Unregister the bot from task queues since it can't reap anything.
task_queues.cleanup_after_bot(bot.id)
# Note: this is best effort at this point. If it fails, there'll be no
# retry: the bot is already marked as dead.
logging.info('Sending bot_missing event: %s', bot.id)
bot_event('bot_missing', bot.id)
except datastore_utils.CommitError:
logging.warning('Failed to commit a Tx')
stats['failed'] += 1
# The assumption here is that a cron job can visit all alive bots fast enough.
# The number of bots is expected to be up to ~30k. It takes about a minute to
# process them (assuming there's negligible amount of dead bots). See also
# cron.yaml `/internal/cron/monitoring/bots/update_bot_info` entry that
# depends on this timing.
cron_stats = {
'seen': 0,
'dead': 0,
'failed': 0,
'stale': 0,
}
# _deadline() hits the instance config cache. Do it only once here instead of
# several thousand times inside _should_be_dead() in the loop below.
deadline = BotInfo._deadline()
futures = []
logging.debug('Finding dead based on deadline %s...', deadline)
try:
for info in BotInfo.yield_alive_bots():
cron_stats['seen'] += 1
if cron_stats['seen'] % 500 == 0:
logging.debug('Visited %d bots so far', cron_stats['seen'])
# We visit all alive bots and check if any should be marked as dead now.
# Note that an alternative would be to have an index on `last_seen_ts`,
# but this index turns out to be very hot (being update on every poll).
# See https://chromium.googlesource.com/infra/luci/luci-py/+/4e9aecba.
if info.is_dead or not info._should_be_dead(deadline):
continue
# Transactionally flip the state of the bot to DEAD. Retry more often than
# the default 1. We do not want to throw too much in the logs and there
# should be plenty of time to do the retries.
f = datastore_utils.transaction_async(functools.partial(run, info.key),
retries=5)
futures.append(f)
# Limit the number of concurrent transactions to avoid OOMs.
if len(futures) > 20:
ndb.Future.wait_any(futures)
pending = []
for f in futures:
if f.done():
tx_result(f, cron_stats)
else:
pending.append(f)
futures = pending
# Collect all remaining futures.
for f in futures:
tx_result(f, cron_stats)
finally:
logging.debug('Seen: %d, marked as dead: %d, stale: %d, failed: %d',
cron_stats['seen'], cron_stats['dead'], cron_stats['stale'],
cron_stats['failed'])
return cron_stats['dead']