blob: b0dbd539594061a7c3f0de49dd3e51c9d9d9236d [file] [log] [blame]
#!/usr/bin/env vpython
# coding: utf-8
# Copyright 2017 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
import datetime
import json
import logging
import os
import random
import sys
import unittest
# Setups environment.
APP_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, APP_DIR)
import test_env_handlers
from parameterized import parameterized
import webtest
from google.appengine.api import datastore_errors
from google.appengine.api import memcache
from google.appengine.ext import ndb
import handlers_backend
from components import datastore_utils
from components import utils
from server import bot_management
from server import task_queues
from server import task_request
def _gen_properties(**kwargs):
"""Creates a TaskProperties."""
args = {
'command': [u'command1'],
'dimensions': {
u'cpu': [u'x86-64'],
u'os': [u'Ubuntu-16.04'],
u'pool': [u'default'],
},
'env': {},
'execution_timeout_secs': 24 * 60 * 60,
'io_timeout_secs': None,
}
args.update(kwargs)
args[u'dimensions_data'] = args.pop(u'dimensions')
return task_request.TaskProperties(**args)
def _gen_request(properties=None):
"""Creates a TaskRequest."""
now = utils.utcnow()
args = {
'created_ts':
now,
'manual_tags': [u'tag:1'],
'name':
'Request name',
'priority':
50,
'task_slices': [
task_request.TaskSlice(
expiration_secs=60, properties=properties or _gen_properties()),
],
'user':
'Jesus',
}
req = task_request.TaskRequest(**args)
task_request.init_new_request(req, True, task_request.TEMPLATE_AUTO)
return req
def _to_timestamp(dt):
return int(utils.datetime_to_timestamp(dt) / 1e6)
class TaskQueuesApiTest(test_env_handlers.AppTestBase):
def setUp(self):
super(TaskQueuesApiTest, self).setUp()
# Setup the backend to handle task queues.
self.app = webtest.TestApp(
handlers_backend.create_application(True),
extra_environ={
'REMOTE_ADDR': self.source_ip,
'SERVER_SOFTWARE': os.environ['SERVER_SOFTWARE'],
})
# Flip use_dedicated_module to False, we don't have it mocked in tests.
enqueue_task_async = utils.enqueue_task_async
def enqueue_task_async_mock(*args, **kwargs):
return enqueue_task_async(*args, use_dedicated_module=False, **kwargs)
self.mock(utils, 'enqueue_task_async', enqueue_task_async_mock)
def random_dt(a, b):
return datetime.timedelta(minutes=(a + b) / 2.0)
self.mock(task_queues, '_random_timedelta_mins', random_dt)
def _assert_bot(self, bot_id=u'bot1', dimensions=None):
bot_dimensions = {
u'cpu': [u'x86-64', u'x64'],
u'id': [bot_id],
u'os': [u'Ubuntu-16.04', u'Ubuntu'],
u'pool': [u'default'],
}
bot_dimensions.update(dimensions or {})
bot_management.bot_event(event_type='request_sleep',
bot_id=bot_id,
external_ip='1.2.3.4',
authenticated_as=bot_id,
dimensions=bot_dimensions,
state={},
version='1234',
register_dimensions=True)
queues = task_queues.assert_bot(bot_dimensions)
return self.execute_tasks(), queues
def _assert_task(self, dimensions=None):
"""Creates one pending TaskRequest and asserts it in task_queues."""
request = _gen_request(properties=_gen_properties(
dimensions=dimensions) if dimensions else None)
task_queues.assert_task_async(request).get_result()
return self.execute_tasks()
def test_all_apis_are_tested(self):
actual = frozenset(i[5:] for i in dir(self) if i.startswith('test_'))
# Contains the list of all public APIs.
expected = frozenset(
i for i in dir(task_queues)
if i[0] != '_' and hasattr(getattr(task_queues, i), 'func_name'))
missing = expected - actual
self.assertFalse(missing)
def assert_count(self, count, entity):
actual = entity.query().count()
if actual != count:
self.fail([i.to_dict() for i in entity.query()])
self.assertEqual(count, actual)
def test_assert_bot(self):
self.assert_count(0, task_queues.BotDimensionsMatches)
self.assert_count(0, task_queues.TaskDimensionsInfo)
self.assert_count(0, task_queues.TaskDimensionsSets)
self._assert_bot()
self.assert_count(1, task_queues.BotDimensionsMatches)
self.assert_count(0, task_queues.TaskDimensionsInfo)
self.assert_count(0, task_queues.TaskDimensionsSets)
def test_assert_task_async(self):
self.assert_count(0, task_queues.BotDimensionsMatches)
self.assert_count(0, task_queues.TaskDimensionsInfo)
self.assert_count(0, task_queues.TaskDimensionsSets)
self._assert_task()
self.assert_count(0, task_queues.BotDimensionsMatches)
self.assert_count(1, task_queues.TaskDimensionsInfo)
self.assert_count(1, task_queues.TaskDimensionsSets)
def test_assert_task_async_no_update(self):
# Ran TQ tasks to register the new dimension set.
tq_tasks = self._assert_task()
self.assertEqual(tq_tasks, 1)
# Already seen it, no new tasks.
tq_tasks = self._assert_task()
self.assertEqual(tq_tasks, 0)
def test_assert_task_async_or_dims(self):
# Ran TQ tasks to register the new dimension set.
tq_tasks = self._assert_task({
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'nv|amd'],
})
self.assertEqual(tq_tasks, 1)
# Already seen it, no new tasks.
tq_tasks = self._assert_task({
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'nv|amd'],
})
self.assertEqual(tq_tasks, 0)
def test_freshen_up_queues(self):
# See more complex test below.
pass
def test_or_dimensions_new_tasks(self):
# There are two bots that can handle two OR alternatives of a task.
self._assert_bot(bot_id=u'bot1',
dimensions={
u'os': [u'v1', u'v2'],
u'gpu': [u'nv'],
})
self._assert_bot(bot_id=u'bot2',
dimensions={
u'os': [u'v2'],
u'gpu': [u'amd'],
})
# A task that "|" appears.
self._assert_task({
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'nv|amd'],
})
# Both bots should be able to handle it.
self.assertEqual([787294789], task_queues.freshen_up_queues('bot1'))
self.assertEqual([787294789], task_queues.freshen_up_queues('bot2'))
# Another task comes in that matches only bot1.
self._assert_task({
u'pool': [u'default'],
u'os': [u'v1'],
u'gpu': [u'nv|amd'],
})
# Only bot1 can handle the second task.
self.assertEqual([787294789, 1873318153],
task_queues.freshen_up_queues('bot1'))
self.assertEqual([787294789], task_queues.freshen_up_queues('bot2'))
def test_or_dimensions_new_bots(self):
# Register a task that uses "|" dimensions.
self._assert_task({
u'pool': [u'default'],
u'os': [u'v1|v2'],
u'gpu': [u'nv|amd'],
})
# Register a task that matches only bot1.
self._assert_task({
u'pool': [u'default'],
u'os': [u'v1'],
u'gpu': [u'nv|amd'],
})
# Two bots show up matching two different OR alternatives of the first task.
self._assert_bot(bot_id=u'bot1',
dimensions={
u'os': [u'v1', u'v2'],
u'gpu': [u'nv'],
})
self._assert_bot(bot_id=u'bot2',
dimensions={
u'os': [u'v2'],
u'gpu': [u'amd'],
})
# The bots got correct matches.
self.assertEqual([787294789, 1873318153],
task_queues.freshen_up_queues('bot1'))
self.assertEqual([787294789], task_queues.freshen_up_queues('bot2'))
def test_or_dimensions_same_hash(self):
self._assert_bot(bot_id=u'bot1', dimensions={u'os': [u'v1']})
self._assert_bot(bot_id=u'bot2', dimensions={u'os': [u'v2']})
self._assert_bot(bot_id=u'bot3', dimensions={u'os': [u'v3']})
# Both requests should have the same dimension_hash.
self._assert_task({
u'pool': [u'default'],
u'os': [u'v1|v2|v3'],
})
self._assert_task({
u'pool': [u'default'],
u'os': [u'v3|v2|v1'],
})
# All 3 bots are matched to the same queue.
self.assertEqual([1060356668], task_queues.freshen_up_queues('bot1'))
self.assertEqual([1060356668], task_queues.freshen_up_queues('bot2'))
self.assertEqual([1060356668], task_queues.freshen_up_queues('bot3'))
def test_bot_dimensions_to_flat(self):
actual = task_queues.bot_dimensions_to_flat({
u'a': [u'c', u'bee'],
u'cee': [u'zee']
})
self.assertEqual([u'a:bee', u'a:c', u'cee:zee'], actual)
def test_dimensions_to_flat_long_ascii(self):
key = u'a' * 64
actual = task_queues.bot_dimensions_to_flat({
key: [
# Too long.
u'b' * 257,
# Ok.
u'c' * 256,
],
})
expected = [
key + u':' + u'b' * 256 + u'…',
key + u':' + u'c' * 256,
]
self.assertEqual(expected, actual)
def test_dimensions_to_flat_long_unicode(self):
key = u'a' * 64
actual = task_queues.bot_dimensions_to_flat({
key: [
# Ok.
u'⌛' * 256,
# Too long.
u'â›”' * 257,
],
})
expected = [
key + u':' + u'⌛' * 256,
key + u':' + u'⛔' * 256 + u'…',
]
self.assertEqual(expected, actual)
def test_dimensions_to_flat_long_unicode_non_BMP(self):
# For non-BMP characters, the length is effectively halved for now.
key = u'a' * 64
# Python considers emoji in the supplemental plane to have length 2 on UCS2
# builds, and length 1 on UCS4 builds.
l = 128 if sys.maxunicode < 65536 else 256
actual = task_queues.bot_dimensions_to_flat({
key: [
# Too long.
u'💥' * (l + 1),
# Ok.
u'😬' * l,
],
})
expected = [
key + u':' + u'💥' * l + u'…',
key + u':' + u'😬' * l,
]
self.assertEqual(expected, actual)
def test_dimensions_to_flat_duplicate_value(self):
actual = task_queues.bot_dimensions_to_flat({u'a': [u'c', u'c']})
self.assertEqual([u'a:c'], actual)
def test_python_len_non_BMP(self):
# Here are emojis in the base plane. They are 1 character.
self.assertEqual(1, len(u'⌛'))
self.assertEqual(1, len(u'â›”'))
# Python considers emoji in the supplemental plane to have length 2 on UCS2
# builds, and length 1 on UCS4 builds.
l = 2 if sys.maxunicode < 65536 else 1
self.assertEqual(l, len(u'😬'))
self.assertEqual(l, len(u'💥'))
def test_probably_has_capacity_empty(self):
# The bot can service this dimensions.
d = {u'pool': [u'default'], u'os': [u'Ubuntu-16.04']}
# By default, nothing has capacity.
self.assertEqual(None, task_queues.probably_has_capacity(d))
def test_probably_has_capacity(self):
d = {u'pool': [u'default'], u'os': [u'Ubuntu-16.04']}
# A bot coming online doesn't register capacity automatically.
self._assert_bot()
self.assertEqual(1, bot_management.BotInfo.query().count())
self.assertEqual(None, task_queues.probably_has_capacity(d))
def test_probably_has_capacity_freshen_up_queues(self):
d = {u'pool': [u'default'], u'os': [u'Ubuntu-16.04']}
# Capacity registers there only once there's a request enqueued and
# freshen_up_queues() is called.
self._assert_bot()
request = _gen_request(properties=_gen_properties(dimensions=d))
task_queues.assert_task_async(request).get_result()
self.execute_tasks()
self.assertEqual(None, task_queues.probably_has_capacity(d))
# It gets set only once freshen_up_queues() is called.
task_queues.freshen_up_queues('bot1')
self.assertEqual(True, task_queues.probably_has_capacity(d))
def test_set_has_capacity(self):
d = {u'pool': [u'default'], u'os': [u'Ubuntu-16.04']}
# By default, nothing has capacity. None means no data.
now = utils.utcnow()
self.mock_now(now, 0)
self.assertEqual(None, task_queues.probably_has_capacity(d))
# Keep the value for 2 seconds, exclusive.
task_queues.set_has_capacity(d, 2)
self.assertEqual(True, task_queues.probably_has_capacity(d))
self.mock_now(now, 1)
self.assertEqual(True, task_queues.probably_has_capacity(d))
# The value expired.
self.mock_now(now, 2)
self.assertEqual(None, task_queues.probably_has_capacity(d))
def test_assert_bot_then_task(self):
self._assert_bot()
self._assert_task()
self.assertEqual([2980491642], task_queues.freshen_up_queues('bot1'))
def test_assert_task_async_then_bot(self):
self._assert_task()
self._assert_bot()
self.assertEqual([2980491642], task_queues.freshen_up_queues('bot1'))
def test_cleanup_after_bot(self):
self._assert_bot()
self._assert_task()
task_queues.cleanup_after_bot('bot1')
# BotInfo is deleted separately.
self.assert_count(0, task_queues.BotDimensionsMatches)
self.assert_count(1, bot_management.BotInfo)
def test_assert_bot_dimensions_changed(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
self._assert_task({
u'cpu': [u'x86-64'],
u'os': [u'Amiga'],
u'pool': [u'default'],
})
self._assert_bot(dimensions={u'os': [u'Amiga']})
# The bot is matched to the task now.
self.assertEqual([2828582055], task_queues.freshen_up_queues('bot1'))
# One hour later, the bot changes dimensions.
now += datetime.timedelta(hours=1)
self.mock_now(now)
self._assert_bot(dimensions={u'os': u'Commodore'})
# It is no longer matched to the task.
self.assertEqual([], task_queues.freshen_up_queues('bot1'))
def test_assert_bot_queue_numbers(self):
self._assert_task({
u'os': [u'Ubuntu'],
u'pool': [u'default'],
})
self._assert_task({
u'os': [u'Ubuntu'],
u'id': [u'bot-id'],
u'pool': [u'default'],
})
# This registers the bot for the first time and executes TQ tasks to match
# it to available tasks. Since TQ tasks happen after assert_bot call, there
# are no matching queues yet.
tq, queues = self._assert_bot('bot-id')
self.assertEqual(1, tq)
self.assertEqual([], queues)
# Now all datastore structures are up-to-date and we can check what
# queues are actually matching the bot.
# Matches both submitted tasks.
tq, queues = self._assert_bot('bot-id')
self.assertEqual(0, tq)
self.assertEqual([203088291, 3402762422], queues)
def test_task_dimensions_expiry(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
self._assert_task()
self._assert_bot()
# The bot is matched to the task now.
self.assertEqual([2980491642], task_queues.freshen_up_queues('bot1'))
# Some time later the task dimensions set expires and gets cleaned up.
now += datetime.timedelta(hours=5)
self.mock_now(now)
task_queues.tidy_task_dimension_sets_async().get_result()
# The bot is no longer matching the task.
self._assert_bot()
self.assertEqual([], task_queues.freshen_up_queues('bot1'))
def test_hash_dimensions(self):
with self.assertRaises(AttributeError):
task_queues.hash_dimensions('this is not dict')
# Assert it doesn't return 0.
self.assertEqual(3649838548, task_queues.hash_dimensions({}))
def test_hash_or_dimensions(self):
dim1 = _gen_request(
properties=_gen_properties(dimensions={
u'pool': [u'pool-a'],
u'foo': [u'a|c|b', u'xyz']
})).task_slice(0).properties.dimensions
dim2 = _gen_request(
properties=_gen_properties(dimensions={
u'pool': [u'pool-a'],
u'foo': [u'xyz', u'b|c|a']
})).task_slice(0).properties.dimensions
self.assertEqual(
task_queues.hash_dimensions(dim1), task_queues.hash_dimensions(dim2))
def test_expand_dimensions_to_flats(self):
expand = task_queues.expand_dimensions_to_flats
# Without OR
actual = set(
tuple(f) for f in expand({
u'foo': [u'c', u'a', u'b'],
u'bar': [u'x', u'z', u'y']
}))
expected = {
(u'bar:x', u'bar:y', u'bar:z', u'foo:a', u'foo:b', u'foo:c'),
}
self.assertEqual(actual, expected)
# With OR
actual = set(
tuple(f) for f in expand({
u'foo': [u'a|b|c', u'def'],
u'bar': [u'x|y', u'z|w']
}))
expected = {
(u'bar:w', u'bar:x', u'foo:a', u'foo:def'),
(u'bar:w', u'bar:x', u'foo:b', u'foo:def'),
(u'bar:w', u'bar:x', u'foo:c', u'foo:def'),
(u'bar:w', u'bar:y', u'foo:a', u'foo:def'),
(u'bar:w', u'bar:y', u'foo:b', u'foo:def'),
(u'bar:w', u'bar:y', u'foo:c', u'foo:def'),
(u'bar:x', u'bar:z', u'foo:a', u'foo:def'),
(u'bar:x', u'bar:z', u'foo:b', u'foo:def'),
(u'bar:x', u'bar:z', u'foo:c', u'foo:def'),
(u'bar:y', u'bar:z', u'foo:a', u'foo:def'),
(u'bar:y', u'bar:z', u'foo:b', u'foo:def'),
(u'bar:y', u'bar:z', u'foo:c', u'foo:def'),
}
# flats are sorted
actual = set(tuple(f) for f in expand({
u'foo': [u'a|y|b|z', u'c'],
}))
expected = {
(u'foo:a', u'foo:c'),
(u'foo:b', u'foo:c'),
(u'foo:c', u'foo:y'),
(u'foo:c', u'foo:z'),
}
self.assertEqual(actual, expected)
def test_expiry_map(self):
sets = [
{
'dimensions': ('a:b', 'a:c'),
'expiry': 1663040000
},
{
'dimensions': ('a:d', 'a:e'),
'expiry': 1663041111
},
]
expiry_map = task_queues._sets_to_expiry_map(sets)
self.assertEqual(sets, task_queues._expiry_map_to_sets(expiry_map))
without_ts = task_queues._expiry_map_to_sets(expiry_map, False)
self.assertEqual([
{
'dimensions': ('a:b', 'a:c')
},
{
'dimensions': ('a:d', 'a:e')
},
], without_ts)
@staticmethod
def _stored_task_dims_expiry(expiry):
"""Expiry of sets stored in TaskDimensionsSets given a task slice expiry."""
expiry += datetime.timedelta(minutes=15) # mocked randomization
expiry += datetime.timedelta(hours=4) # extra time added in the txn
return expiry
@parameterized.expand([
({
'pool': ['p'],
'k': ['v1']
}, 'pool:p:1496061212'),
({
'pool': ['p'],
'k': ['v1|v2']
}, 'pool:p:3930364299'),
({
'pool': ['a:b'],
'k': ['v1']
}, 'pool:a%3Ab:2715169585'),
({
'id': ['b'],
'pool': ['p'],
'k': ['v1']
}, 'bot:b:174937362'),
({
'id': ['b'],
'pool': ['p'],
'k': ['v1|v2']
}, 'bot:b:903320086'),
])
def test_assert_task_dimensions_async(self, task_dims, expected_sets_id):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
expected_sets = task_queues.expand_dimensions_to_flats(task_dims)
tq_tasks = []
@ndb.tasklet
def mocked_tq_task(task_sets_id, task_sets_dims, enqueued_ts):
self.assertEqual(task_sets_id, expected_sets_id)
self.assertEqual(task_sets_dims, expected_sets)
self.assertEqual(enqueued_ts, utils.utcnow())
tq_tasks.append(1)
raise ndb.Return(True)
self.mock(task_queues, '_tq_update_bot_matches_async', mocked_tq_task)
def assert_task(expiry):
task_queues._assert_task_dimensions_async(task_dims, expiry).get_result()
self.execute_tasks()
seen = len(tq_tasks)
del tq_tasks[:]
return seen
requested_exp = now + datetime.timedelta(hours=1)
stored_exp = self._stored_task_dims_expiry(requested_exp)
# Add a never seen before task, should enqueue a TQ task.
self.assertEqual(assert_task(requested_exp), 1)
# Check entities are correct and have correct expiry.
sets_key = ndb.Key(task_queues.TaskDimensionsSets, expected_sets_id)
sets_ent = sets_key.get()
self.assertEqual(sets_ent.to_dict(), {
'sets': [{
'dimensions': dims
} for dims in expected_sets],
})
info_key = ndb.Key(task_queues.TaskDimensionsInfo, 1, parent=sets_key)
info_ent = info_key.get()
self.assertEqual(
info_ent.to_dict(), {
'sets': [{
'dimensions': dims,
'expiry': _to_timestamp(stored_exp)
} for dims in expected_sets],
'next_cleanup_ts':
stored_exp + datetime.timedelta(minutes=5),
})
# A bit later (before the stored expiry) add the exact same task. Should
# result in no TQ tasks and untouched entities.
now += datetime.timedelta(hours=1.5)
self.mock_now(now)
self.assertEqual(assert_task(now + datetime.timedelta(hours=1)), 0)
self.assertEqual(sets_key.get(), sets_ent)
self.assertEqual(info_key.get(), info_ent)
# Adding a task even later results in the bump to the stored expiry (but no
# new TQ tasks).
now += datetime.timedelta(hours=6)
self.mock_now(now)
self.assertEqual(assert_task(now + datetime.timedelta(hours=1)), 0)
self.assertEqual(
info_key.get().next_cleanup_ts,
self._stored_task_dims_expiry(now + datetime.timedelta(hours=1)) +
datetime.timedelta(minutes=5))
def test_assert_task_dimensions_async_collision(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
# Simulate a hash collision, since it is hard (but not impossible) to get
# one for real.
self.mock(task_queues, 'hash_dimensions', lambda _dims: 42)
expected_sets_id = 'pool:p:42'
sets_key = ndb.Key(task_queues.TaskDimensionsSets, expected_sets_id)
info_key = ndb.Key(task_queues.TaskDimensionsInfo, 1, parent=sets_key)
tq_tasks = []
@ndb.tasklet
def mocked_tq_task(task_sets_id, task_sets_dims, enqueued_ts):
self.assertEqual(task_sets_id, expected_sets_id)
self.assertEqual(len(task_sets_dims), 1)
self.assertEqual(enqueued_ts, utils.utcnow())
tq_tasks.append(tuple(task_sets_dims[0]))
raise ndb.Return(True)
self.mock(task_queues, '_tq_update_bot_matches_async', mocked_tq_task)
def assert_task(dim, exp):
task_queues._assert_task_dimensions_async({
'pool': ['p'],
'key': [dim],
}, exp).get_result()
self.execute_tasks()
seen = tq_tasks[:]
del tq_tasks[:]
return seen
# Create a new TaskDimensionsSets.
exp1 = now + datetime.timedelta(hours=1)
stored_exp1 = self._stored_task_dims_expiry(exp1)
self.assertEqual(assert_task('1', exp1), [('key:1', 'pool:p')])
# Append a new set to an existing TaskDimensionsSets.
now += datetime.timedelta(hours=1)
self.mock_now(now)
exp2 = now + datetime.timedelta(hours=1)
stored_exp2 = self._stored_task_dims_expiry(exp2)
self.assertEqual(assert_task('2', exp2), [('key:2', 'pool:p')])
# Have both of them now.
self.assertEqual(
info_key.get().to_dict(), {
'sets': [
{
'dimensions': ['key:1', 'pool:p'],
'expiry': _to_timestamp(stored_exp1),
},
{
'dimensions': ['key:2', 'pool:p'],
'expiry': _to_timestamp(stored_exp2),
},
],
'next_cleanup_ts':
stored_exp1 + datetime.timedelta(minutes=5),
})
# Refresh expiry of the second set, it should kick the first out as stale.
now = stored_exp2 - datetime.timedelta(hours=0.5)
self.mock_now(now)
exp3 = now + datetime.timedelta(hours=1)
stored_exp3 = self._stored_task_dims_expiry(exp3)
self.assertEqual(assert_task('2', exp3), [])
# Have only one set now.
self.assertEqual(
info_key.get().to_dict(), {
'sets': [
{
'dimensions': ['key:2', 'pool:p'],
'expiry': _to_timestamp(stored_exp3),
},
],
'next_cleanup_ts':
stored_exp3 + datetime.timedelta(minutes=5),
})
def test_update_bot_matches_async(self):
# Tested as a part of the overall workflow.
pass
def test_rescan_matching_task_sets_async(self):
# Tested as a part of the overall workflow.
pass
def test_tidy_task_dimension_sets_async(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
# Use a transaction to be able to use _put_task_dimensions_sets_async that
# has an assert inside. We don't really care about atomicity here.
@ndb.transactional_tasklet(xg=True)
def prep():
# Fully survives the cleanup.
yield task_queues._put_task_dimensions_sets_async(
'set0', {
('0:a', ): now + datetime.timedelta(hours=2),
('0:b', ): now + datetime.timedelta(hours=2),
})
# Partially survives the cleanup.
yield task_queues._put_task_dimensions_sets_async(
'set1', {
('1:a', ): now + datetime.timedelta(hours=1),
('1:b', ): now + datetime.timedelta(hours=2),
})
# Doesn't survive the cleanup.
yield task_queues._put_task_dimensions_sets_async(
'set2', {
('2:a', ): now + datetime.timedelta(hours=1),
('2:b', ): now + datetime.timedelta(hours=1),
})
prep().get_result()
self.mock_now(now + datetime.timedelta(hours=1.5))
self.assertTrue(task_queues.tidy_task_dimension_sets_async().get_result())
def fetch(sets_id):
sets_key = ndb.Key(task_queues.TaskDimensionsSets, sets_id)
info_key = ndb.Key(task_queues.TaskDimensionsInfo, 1, parent=sets_key)
sets, info = ndb.get_multi([sets_key, info_key])
if not sets:
# Both must be missing at the same time.
self.assertIsNone(info)
return None
self.assertIsNotNone(info)
# Must contain same sets (sans `expiry`).
expless = [{'dimensions': s['dimensions']} for s in info.sets]
self.assertEqual(sets.sets, expless)
return task_queues._sets_to_expiry_map(info.sets)
# Fully survived.
self.assertEqual(
fetch('set0'), {
('0:a', ): now + datetime.timedelta(hours=2),
('0:b', ): now + datetime.timedelta(hours=2),
})
# Partially survived.
self.assertEqual(fetch('set1'), {
('1:b', ): now + datetime.timedelta(hours=2),
})
# Fully gone.
self.assertIsNone(fetch('set2'))
# Noop run for code coverage.
self.assertTrue(task_queues.tidy_task_dimension_sets_async().get_result())
# Full cleanup.
self.mock_now(now + datetime.timedelta(hours=2, minutes=6))
self.assertTrue(task_queues.tidy_task_dimension_sets_async().get_result())
# All gone now.
self.assertIsNone(fetch('set0'))
self.assertIsNone(fetch('set1'))
@staticmethod
def _create_task_dims_set(sets_id, *task_dims):
exp_map = {
tuple(dims): utils.utcnow() + datetime.timedelta(hours=10)
for dims in task_dims
}
ndb.transaction_async(lambda: task_queues._put_task_dimensions_sets_async(
sets_id, exp_map)).get_result()
@staticmethod
def _delete_task_dims_set(sets_id):
ndb.transaction_async(lambda: task_queues.
_delete_task_dimensions_sets_async(sets_id
)).get_result()
def test_assert_bot_dimensions_async(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
tq_tasks = []
@ndb.tasklet
def mocked_rescan(bot_id, rescan_counter, rescan_reason):
tq_tasks.append({
u'bot_id': bot_id,
u'rescan_counter': rescan_counter,
u'rescan_reason': rescan_reason
})
raise ndb.Return(True)
self.mock(task_queues, '_tq_rescan_matching_task_sets_async', mocked_rescan)
def assert_no_tq_tasks():
self.assertEqual(tq_tasks, [])
self.execute_tasks()
self.assertEqual(tq_tasks, [])
def assert_tq_task(expected):
self.assertEqual(tq_tasks, [])
self.execute_tasks()
self.assertEqual(tq_tasks, [expected])
del tq_tasks[:]
dims = {
'id': ['bot-id'],
'pool': ['pool1', 'pool2'],
'dim': ['0'],
}
bot_matches_key = ndb.Key(task_queues.BotDimensionsMatches, 'bot-id')
# The first call ever, no queues are assigned yet.
queues = task_queues._assert_bot_dimensions_async(dims).get_result()
self.assertEqual(queues, set())
# Created the entity.
bot_matches = bot_matches_key.get()
self.assertEqual(
bot_matches.to_dict(), {
'dimensions':
[u'dim:0', u'id:bot-id', u'pool:pool1', u'pool:pool2'],
'last_addition_ts': None,
'last_cleanup_ts': now,
'last_rescan_enqueued_ts': now,
'last_rescan_finished_ts': None,
'matches': [],
'next_rescan_ts': now + datetime.timedelta(minutes=30),
'rescan_counter': 1
})
# Enqueued the rescan task.
assert_tq_task({
u'bot_id':
u'bot-id',
u'rescan_counter':
1,
u'rescan_reason':
u'dims added [dim:0 id:bot-id pool:pool1 pool:pool2]',
})
# Mock assignment of the queues. This happens in the TQ task usually.
self._create_task_dims_set('bot:bot-id:1', ['id:bot-id'])
self._create_task_dims_set('pool:pool1:2', ['pool:pool1'])
self._create_task_dims_set('pool:pool1:3', ['pool:pool1', 'dim:0'])
self._create_task_dims_set('pool:pool2:4', ['pool:pool2'])
self._create_task_dims_set('pool:pool2:5', ['pool:pool2', 'dim:0'])
bot_matches.matches = [
'bot:bot-id:1',
'pool:pool1:2',
'pool:pool1:3',
'pool:pool2:4',
'pool:pool2:5',
]
bot_matches.put()
# The next call discovers them and doesn't submit any TQ tasks.
queues = task_queues._assert_bot_dimensions_async(dims).get_result()
self.assertEqual(
queues, {
'bot:bot-id:1',
'pool:pool1:2',
'pool:pool1:3',
'pool:pool2:4',
'pool:pool2:5',
})
assert_no_tq_tasks()
# Some time later the rescan is triggered.
now += datetime.timedelta(minutes=31)
self.mock_now(now)
queues = task_queues._assert_bot_dimensions_async(dims).get_result()
self.assertEqual(
queues, {
'bot:bot-id:1',
'pool:pool1:2',
'pool:pool1:3',
'pool:pool2:4',
'pool:pool2:5',
})
# Enqueued the rescan task.
assert_tq_task({
u'bot_id': u'bot-id',
u'rescan_counter': 2,
u'rescan_reason': u'periodic',
})
# Bots dimensions change. Unmatching queues are no longer reported.
dims['dim'] = ['1']
queues = task_queues._assert_bot_dimensions_async(dims).get_result()
self.assertEqual(queues, {
'bot:bot-id:1',
'pool:pool1:2',
'pool:pool2:4',
})
# Enqueued the rescan task.
assert_tq_task({
u'bot_id':
u'bot-id',
u'rescan_counter':
3,
u'rescan_reason':
u'dims added [dim:1], dims removed [dim:0]',
})
# Changes are reflected in the entity.
bot_matches = bot_matches_key.get()
self.assertEqual(
bot_matches.to_dict(),
{
'dimensions':
[u'dim:1', u'id:bot-id', u'pool:pool1', u'pool:pool2'],
'last_addition_ts': None,
'last_cleanup_ts': now,
'last_rescan_enqueued_ts': now,
'last_rescan_finished_ts': None, # we mocked this out
'matches': [u'bot:bot-id:1', u'pool:pool1:2', u'pool:pool2:4'],
'next_rescan_ts': now + datetime.timedelta(minutes=30),
'rescan_counter': 3
})
# Some task set disappears (e.g. gets cleaned up by the cron).
now += datetime.timedelta(minutes=5)
self.mock_now(now)
self._delete_task_dims_set('pool:pool2:4')
# It is no longer assigned to the bot. This doesn't enqueue a task.
queues = task_queues._assert_bot_dimensions_async(dims).get_result()
self.assertEqual(queues, {
'bot:bot-id:1',
'pool:pool1:2',
})
assert_no_tq_tasks()
# Changes are reflected in the entity.
prev_state = bot_matches.to_dict()
bot_matches = bot_matches_key.get()
self.assertEqual(
bot_matches.to_dict(),
{
'dimensions':
[u'dim:1', u'id:bot-id', u'pool:pool1', u'pool:pool2'],
'last_addition_ts': None,
'last_cleanup_ts': now,
'last_rescan_enqueued_ts': prev_state['last_rescan_enqueued_ts'],
'last_rescan_finished_ts': None, # we mocked this out
'matches': [u'bot:bot-id:1', u'pool:pool1:2'],
'next_rescan_ts': prev_state['next_rescan_ts'],
'rescan_counter': prev_state['rescan_counter'],
})
def test_tq_rescan_matching_task_sets_async(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
# Prepare all active task dims sets.
self._create_task_dims_set('bot:bot-id:1', ['id:bot-id'])
self._create_task_dims_set('bot:bot-id:2', ['id:bot-id', 'dim:0'])
self._create_task_dims_set('bot:bot-id:3', ['id:bot-id', 'dim:1'])
self._create_task_dims_set('pool:pool1:4', ['pool:pool1'])
self._create_task_dims_set('pool:pool1:5', ['pool:pool1', 'dim:0'])
self._create_task_dims_set('pool:pool1:6', ['pool:pool1', 'dim:1'])
self._create_task_dims_set('pool:pool2:7', ['pool:pool2'])
self._create_task_dims_set('pool:pool2:8', ['pool:pool2', 'dim:0'])
self._create_task_dims_set('pool:pool2:9', ['pool:pool2', 'dim:1'])
# Put some extra garbage that must no be visited since it is outside of the
# range of datastore scan query.
self._create_task_dims_set('bot:bot-id:', ['id:bot-id'])
self._create_task_dims_set('bot:bot-id:/', ['id:bot-id'])
self._create_task_dims_set('bot:bot-id::', ['id:bot-id'])
self._create_task_dims_set('pool:pool1:', ['pool:pool1'])
self._create_task_dims_set('pool:pool1:/', ['pool:pool1'])
self._create_task_dims_set('pool:pool1::', ['pool:pool1'])
# Prepare BotDimensionsMatches in some initial pre-scan state: it has new
# dimensions (dim:1), but matches are still for old ones (dim:0).
task_queues.BotDimensionsMatches(
id='bot-id',
dimensions=[u'dim:1', u'id:bot-id', u'pool:pool1', u'pool:pool2'],
matches=[
u'bot:bot-id:1',
u'bot:bot-id:2',
u'bot:bot-id:444', # missing, should be unmatched
u'pool:missing:555', # missing, should be unmatched
u'pool:pool1:4',
u'pool:pool1:5',
u'pool:pool2:7',
u'pool:pool2:8',
],
last_rescan_enqueued_ts=now,
next_rescan_ts=now + datetime.timedelta(hours=1),
rescan_counter=555,
).put()
# Run the rescan.
self.assertTrue(
task_queues._tq_rescan_matching_task_sets_async('bot-id', 555,
'reason').get_result())
# The state was updated correctly to match on `dim:1`. Old entries were
# kicked out.
ent = ndb.Key(task_queues.BotDimensionsMatches, 'bot-id').get()
self.assertEqual(
ent.to_dict(), {
'dimensions':
[u'dim:1', u'id:bot-id', u'pool:pool1', u'pool:pool2'],
'last_addition_ts':
None,
'last_cleanup_ts':
now,
'last_rescan_enqueued_ts':
now,
'last_rescan_finished_ts':
now,
'matches': [
u'bot:bot-id:1',
u'bot:bot-id:3',
u'pool:pool1:4',
u'pool:pool1:6',
u'pool:pool2:7',
u'pool:pool2:9',
],
'next_rescan_ts':
now + datetime.timedelta(hours=1),
'rescan_counter':
555,
})
def test_tq_update_bot_matches_async_pool(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
task_sets_id = 'pool:p:1'
task_sets_dims = [['pool:p', 'dim:0'], ['pool:p', 'dim:1']]
self._create_task_dims_set(task_sets_id, *task_sets_dims)
def register_bot(bot_id, dims, matches):
task_queues.BotDimensionsMatches(
id=bot_id,
dimensions=sorted(dims + ['id:' + bot_id]),
matches=matches,
last_addition_ts=utils.EPOCH,
).put()
# Bots that should be matched.
register_bot('match-0', ['pool:p', 'dim:0'], ['pool:p:555'])
register_bot('match-1', ['pool:p', 'dim:1'], [])
register_bot('match-2', ['pool:p', 'dim:0', 'dim:1'], [])
register_bot('match-3', ['pool:p', 'dim:0', 'extra:0'], [])
# Will be visited, but not updated (since it already has the match)
register_bot('untouched-0', ['pool:p', 'dim:0'], [task_sets_id])
# Bots that should be left untouched.
register_bot('mismatch-0', ['pool:p'], ['pool:p:555'])
register_bot('mismatch-1', ['pool:p', 'dim:nope'], ['pool:p:555'])
register_bot('mismatch-2', ['pool:another', 'dim:0'], ['pool:p:555'])
self.assertTrue(
task_queues._tq_update_bot_matches_async(task_sets_id, task_sets_dims,
now).get_result())
def assert_matched(bot_id, matches):
ent = ndb.Key(task_queues.BotDimensionsMatches, bot_id).get()
self.assertEqual(ent.matches, matches)
self.assertEqual(ent.last_addition_ts, now)
def assert_untouched(bot_id, matches):
ent = ndb.Key(task_queues.BotDimensionsMatches, bot_id).get()
self.assertEqual(ent.matches, matches)
self.assertEqual(ent.last_addition_ts, utils.EPOCH)
assert_matched('match-0', [task_sets_id, 'pool:p:555'])
assert_matched('match-1', [task_sets_id])
assert_matched('match-2', [task_sets_id])
assert_matched('match-3', [task_sets_id])
assert_untouched('untouched-0', [task_sets_id])
assert_untouched('mismatch-0', ['pool:p:555'])
assert_untouched('mismatch-1', ['pool:p:555'])
assert_untouched('mismatch-2', ['pool:p:555'])
def test_tq_update_bot_matches_async_bot(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
task_queues.BotDimensionsMatches(
id='bot-id',
dimensions=['id:bot-id', 'dim:0', 'dim:1', 'pool:p'],
matches=['pool:p:123', 'bot:bot-id:456'],
last_addition_ts=utils.EPOCH,
).put()
def match_task(task_sets_id, task_sets_dims):
self._create_task_dims_set(task_sets_id, *task_sets_dims)
self.assertTrue(
task_queues._tq_update_bot_matches_async(task_sets_id, task_sets_dims,
now).get_result())
# Will be matched.
match_task('bot:bot-id:1', [['id:bot-id', 'dim:0']])
match_task('bot:bot-id:2', [['id:bot-id', 'dim:1']])
match_task('bot:bot-id:3', [['id:bot-id', 'dim:0', 'dim:1']])
match_task('bot:bot-id:4', [['id:bot-id', 'dim:z'], ['id:bot-id', 'dim:0']])
match_task('bot:bot-id:5', [['id:bot-id']])
# Will not be matched, wrong dims.
match_task('bot:bot-id:6', [['id:bot-id', 'dim:3']])
match_task('bot:bot-id:7', [['id:bot-id', 'dim:0', 'extra:1']])
ent = ndb.Key(task_queues.BotDimensionsMatches, 'bot-id').get()
self.assertEqual(ent.matches, [
u'bot:bot-id:1',
u'bot:bot-id:2',
u'bot:bot-id:3',
u'bot:bot-id:4',
u'bot:bot-id:456',
u'bot:bot-id:5',
u'pool:p:123',
])
def test_tq_update_bot_matches_async_skip(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
task_queues.BotDimensionsMatches(
id='bot-id',
dimensions=['id:bot-id', 'dim:0', 'dim:1', 'pool:p'],
matches=[],
last_addition_ts=utils.EPOCH,
).put()
def matches():
return ndb.Key(task_queues.BotDimensionsMatches, 'bot-id').get().matches
def call(sets_id, tasks_dims):
return task_queues._tq_update_bot_matches_async(sets_id, tasks_dims,
now).get_result()
# Will do nothing, no TaskDimensionsSets entity is stored.
self.assertTrue(call('bot:bot-id:1', [['id:bot-id']]))
self.assertEqual(matches(), [])
# Will do nothing, all sets are not in the entity.
self._create_task_dims_set('bot:bot-id:1', ['id:bot-id', 'dim:0'])
self.assertTrue(
call('bot:bot-id:1', [['id:bot-id'], ['id:bot-id', 'dim:1']]))
self.assertEqual(matches(), [])
def test_tq_update_bot_matches_async_no_matches(self):
now = datetime.datetime(2010, 1, 2, 3, 4, 5)
self.mock_now(now)
def call(sets_id, tasks_dims):
return task_queues._tq_update_bot_matches_async(sets_id, tasks_dims,
now).get_result()
# No matching bot.
self._create_task_dims_set('bot:bot-id:1', ['id:bot-id'])
self.assertTrue(call('bot:bot-id:1', [['id:bot-id']]))
# No matches in a pool.
self._create_task_dims_set('pool:p:1', ['pool:p'])
self.assertTrue(call('pool:p:1', [['pool:p']]))
class TestMapAsync(test_env_handlers.AppTestBase):
# Page size in queries.
PAGE = 4
@staticmethod
def populate(bot_dims):
ndb.put_multi([
task_queues.BotDimensionsMatches(id=bot_id, dimensions=dims)
for bot_id, dims in bot_dims.items()
])
return sorted(bot_dims.keys())
@staticmethod
def query(dim=None):
q = task_queues.BotDimensionsMatches.query()
if dim:
q = q.filter(task_queues.BotDimensionsMatches.dimensions == dim)
return q, task_queues._Logger('')
@staticmethod
def call(queries, cb, max_pages=None):
return task_queues._map_async(queries,
cb,
max_concurrency=3,
page_size=TestMapAsync.PAGE,
max_pages=max_pages).get_result()
def test_single_query_sync_cb(self):
seen = []
def cb(ent):
seen.append(ent.key.string_id())
bots = self.populate({'bot%d' % i: [] for i in range(10)})
self.assertTrue(self.call([self.query()], cb))
self.assertEqual(sorted(seen), bots)
def test_many_queries_sync_cb(self):
seen = []
def cb(ent):
seen.append(ent.key.string_id())
bots = []
bots.extend(self.populate({'bot1-%d' % i: ['k:v1'] for i in range(10)}))
bots.extend(self.populate({'bot2-%d' % i: ['k:v2'] for i in range(10)}))
bots.extend(self.populate({'bot3-%d' % i: ['k:v3'] for i in range(10)}))
queries = [self.query('k:v1'), self.query('k:v2'), self.query('k:v3')]
self.assertTrue(self.call(queries, cb))
self.assertEqual(sorted(seen), bots)
def test_single_query_async_cb(self):
seen = []
@ndb.tasklet
def cb(ent):
seen.append(ent.key.string_id())
yield ndb.sleep(random.uniform(0.0, 0.1))
raise ndb.Return(True)
bots = self.populate({'bot%d' % i: [] for i in range(10)})
self.assertTrue(self.call([self.query()], cb))
self.assertEqual(sorted(seen), bots)
def test_many_queries_async_cb(self):
seen = []
@ndb.tasklet
def cb(ent):
seen.append(ent.key.string_id())
yield ndb.sleep(random.uniform(0.0, 0.1))
raise ndb.Return(True)
bots = []
bots.extend(self.populate({'bot1-%d' % i: ['k:v1'] for i in range(10)}))
bots.extend(self.populate({'bot2-%d' % i: ['k:v2'] for i in range(10)}))
bots.extend(self.populate({'bot3-%d' % i: ['k:v3'] for i in range(10)}))
queries = [self.query('k:v1'), self.query('k:v2'), self.query('k:v3')]
self.assertTrue(self.call(queries, cb))
self.assertEqual(sorted(seen), bots)
def test_query_timeout(self):
seen = []
@ndb.tasklet
def cb(ent):
seen.append(ent.key.string_id())
yield ndb.sleep(random.uniform(0.0, 0.1))
raise ndb.Return(True)
b1 = self.populate({'b1-%d' % i: ['k:v1'] for i in range(self.PAGE * 2)})
b2 = self.populate({'b2-%d' % i: ['k:v2'] for i in range(self.PAGE * 2)})
b3 = self.populate({'b3-%d' % i: ['k:v3'] for i in range(self.PAGE * 3)})
queries = [self.query('k:v1'), self.query('k:v2'), self.query('k:v3')]
self.assertFalse(self.call(queries, cb, max_pages=2))
# The last page if k:v3 query is missing.
expected = []
expected.extend(b1)
expected.extend(b2)
expected.extend(b3[:self.PAGE * 2])
self.assertEqual(sorted(seen), expected)
def test_processing_error(self):
seen = []
@ndb.tasklet
def cb(ent):
seen.append(ent.key.string_id())
yield ndb.sleep(random.uniform(0.0, 0.1))
raise ndb.Return(ent.key.string_id() != 'bot5')
bots = self.populate({'bot%d' % i: [] for i in range(10)})
self.assertFalse(self.call([self.query()], cb))
self.assertEqual(sorted(seen), bots)
if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
unittest.main()