blob: 0f03ab06c47ef5e7bd014c3999d6ec30f3fbb2a9 [file] [log] [blame] [edit]
# Copyright 2018 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Swarming bot named cache management, i.e. list of known named cache and their
state on each bot.
+--------------+
|NamedCacheRoot|
|id=<pool> |
+--------------+
|
+----------------+
| |
| v
+----------+ +----------+
|NamedCache| |NamedCache|
|id=<name> | ... |id=<name> |
+----------+ +----------+
The cached size hint for each named cache is the 95th percentile for the caches
found on the fleet. It is updated every 24 hours, so that if a large cache is
not re-observed for 24h, it will be lowered. When an higher size hint is
observed that is more than 10% of the previous one, the value is immediately
updated.
Caches for named cache that haven't been updated for 8 days are deleted.
The caches will only be precalculated for the pools defined in pools.cfg.
"""
import datetime
import json
import logging
from google.appengine.ext import ndb
from components import utils
from server import bot_management
from server import pools_config
### Models.
class NamedCacheRoot(ndb.Model):
"""There is one NamedCacheRoot entity per pool."""
class NamedCache(ndb.Model):
"""Represents the state of a single named cache.
Parent is NamedCacheRoot for the pool.
This entity is not stored in a transaction, as we want it to be hot in
memcache, even if the hint is not exactly right.
"""
ts = ndb.DateTimeProperty()
os = ndb.StringProperty()
name = ndb.StringProperty()
hint = ndb.IntegerProperty(indexed=False, default=0)
### Private APIs.
def _named_cache_key(pool, os, name):
"""Returns the ndb.Key to a NamedCache."""
assert isinstance(pool, basestring), repr(pool)
assert isinstance(os, basestring), repr(os)
assert isinstance(name, basestring), repr(name)
return ndb.Key(NamedCacheRoot, pool, NamedCache, os + ':' + name)
def _update_named_cache(pool, os, name, hint):
"""Opportunistically update a named cache if the hint was off by 10% or more.
Arguments:
- pool: pool name
- os: reduced 'os' value
- name: named cache name
- hint: observed size hint to use on the fleet
It will be updated when:
- The NamedCache is older than 24 hours, where the new size is used and the
old maximum ignored.
- The new maximum is at least 10% higher than the previous one.
"""
assert isinstance(pool, basestring), repr(pool)
assert isinstance(os, basestring), repr(os)
assert isinstance(name, basestring), repr(name)
assert isinstance(hint, (int, long)), repr(hint)
key = _named_cache_key(pool, os, name)
now = utils.utcnow().replace(microsecond=0)
e = key.get()
exp = now - datetime.timedelta(hours=24)
if not e or e.hint <= hint*0.9 or e.ts < exp:
e = NamedCache(key=key, ts=now, os=os, name=name, hint=hint)
e.put()
return True
return False
def _reduce_oses(oses):
"""Returns a single OS key."""
assert isinstance(oses, list), repr(oses)
if not oses:
return 'unknown'
# TODO(maruel): It's a bit adhoc. Revisit if it's not good enough.
if 'Windows' in oses:
return 'Windows'
if 'Mac' in oses:
return 'Mac'
if 'Android' in oses:
return 'Android'
if 'Linux' in oses:
return 'Linux'
return min(oses)
### Public APIs.
def get_hints(pool, oses, names):
"""Returns the hints for each named caches.
Returns:
list of hints in bytes for each named cache, or -1 when there's no hint
available.
"""
assert isinstance(oses, list), repr(oses)
assert isinstance(names, list), repr(names)
os = _reduce_oses(oses)
keys = [_named_cache_key(pool, os, name) for name in names]
entities = ndb.get_multi(keys)
hints = [e.hint if e else -1 for e in entities]
ancestor = ndb.Key(NamedCacheRoot, pool)
for i, hint in enumerate(hints):
if hint > -1:
continue
# Look for named cache in other OSes in the same pool.
q = NamedCache.query(NamedCache.name == names[i], ancestor=ancestor)
other_oses = q.fetch()
if not other_oses:
# TODO(maruel): We could define default hints in the pool.
continue
# Found something! Take the largest value.
hints[i] = max(e.hint for e in other_oses)
return hints
def task_update_pool(pool):
"""Updates the NamedCache for a pool.
This needs to be able to scale for several thousands bots and hundreds of
different caches.
- Query all the bots in a pool.
- Calculate the named caches for the bots in this pool.
- Update the entities.
"""
q = bot_management.BotInfo.query()
found = {}
bots = 0
exp = utils.utcnow().replace(microsecond=0) - datetime.timedelta(hours=4)
for bot in bot_management.filter_dimensions(q, [u'pool:'+pool]):
if bot.last_seen_ts < exp:
# Very dead bot; it hasn't pinged for 4 hours.
continue
bots += 1
# Some bots do not have 'os' correctly specified. They fall into the
# 'unknown' OS bucket.
os = _reduce_oses(bot.dimensions.get('os') or [])
state = bot.state
if not state or not isinstance(state, dict):
logging.debug('%s has no state', bot.id)
continue
# TODO(maruel): Use structured data instead of adhoc json.
c = state.get('named_caches')
if not isinstance(c, dict):
continue
d = found.setdefault(os, {})
for key, value in sorted(c.items()):
if not value or not isinstance(value, list) or len(value) != 2:
logging.error('%s has bad cache (A): %s', bot.id, value)
continue
if not value[0] or not isinstance(value[0], list) or len(value[0]) != 2:
logging.error('%s has bad cache (B): %s', bot.id, value)
continue
if not value[0][1] or not isinstance(value[0][1], (int, long)):
logging.error('%s has bad cache (C): %s', bot.id, value)
continue
s = value[0][1]
d.setdefault(key, []).append(s)
logging.info(
'Found %d bots, %d caches in %d distinct OSes in pool %r',
bots, sum(len(f) for f in found.values()), len(found), pool)
# TODO(maruel): Parallelise.
for os, d in sorted(found.items()):
for name, sizes in sorted(d.items()):
# Adhoc calculation to take the ~95th percentile.
sizes.sort()
hint = sizes[int(float(len(sizes)) * 0.95)]
if _update_named_cache(pool, os, name, hint):
logging.debug('Pool %r OS %r Cache %r hint=%d', pool, os, name, hint)
# Delete the old ones.
exp = utils.utcnow().replace(microsecond=0) - datetime.timedelta(days=8)
ancestor = ndb.Key(NamedCacheRoot, pool)
logging.info('Exp: %s', exp)
q = NamedCache.query(NamedCache.ts < exp, ancestor=ancestor)
keys = q.fetch(keys_only=True)
if keys:
logging.info('Deleting %d stale entities', len(keys))
ndb.delete_multi(keys)
return True
def cron_update_named_caches():
"""Trigger one task queue per pool to update NamedCache entites."""
total = 0
for pool in pools_config.known():
if not utils.enqueue_task(
'/internal/taskqueue/important/named_cache/update-pool',
'named-cache-task',
payload=json.dumps({
'pool': pool
}),
):
logging.error('Failed to enqueue task for pool %s', pool)
else:
logging.debug('Enqueued task for pool %s', pool)
total += 1
return total