[buildbucket] Generalize bulkproc.py
Generalize task handlers in bulkproc to allow any processing code, not
necessarily tag index backfilling.
Bug: 856724
Change-Id: I71ec2d51f075c88f7ea58c6e63d5d7a2b7de0f42
Reviewed-on: https://chromium-review.googlesource.com/1115431
Commit-Queue: Nodir Turakulov <nodir@chromium.org>
Reviewed-by: Vadim Shtayura <vadimsh@chromium.org>
diff --git a/appengine/cr-buildbucket/api.py b/appengine/cr-buildbucket/api.py
index b3f71d0..6e9b192 100644
--- a/appengine/cr-buildbucket/api.py
+++ b/appengine/cr-buildbucket/api.py
@@ -18,7 +18,7 @@
import gae_ts_mon
import api_common
-import bulkproc
+import backfill_tag_index
import config
import creation
import errors
@@ -740,27 +740,13 @@
@buildbucket_api_method(
endpoints.ResourceContainer(
message_types.VoidMessage,
- tag=messages.StringField(1, required=True),
- shards=messages.IntegerField(2, required=True),
+ tag_key=messages.StringField(1, required=True),
), message_types.VoidMessage
)
@auth.require(auth.is_admin)
def backfill_tag_index(self, request):
"""Backfills TagIndex entites from builds."""
- if request.shards <= 0:
- raise endpoints.BadRequestException('shards must be positive')
- enqueue_task(
- 'backfill-tag-index',
- bulkproc.PATH_PREFIX + 'start',
- utils.encode_to_json({
- 'tag': request.tag,
- 'shards': request.shards,
- }),
- )
+ if ':' in request.tag_key:
+ raise endpoints.BadRequestException('invalid tag_key')
+ backfill_tag_index.launch(request.tag_key)
return message_types.VoidMessage()
-
-
-# mocked in tests.
-def enqueue_task(queue_name, url, payload): # pragma: no cover
- task = taskqueue.Task(url=url, payload=payload)
- return task.add(queue_name=queue_name)
diff --git a/appengine/cr-buildbucket/backfill_tag_index.py b/appengine/cr-buildbucket/backfill_tag_index.py
new file mode 100644
index 0000000..d9e6e6b
--- /dev/null
+++ b/appengine/cr-buildbucket/backfill_tag_index.py
@@ -0,0 +1,137 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tag index backfilling."""
+
+import collections
+import json
+import logging
+
+from google.appengine.ext import deferred
+from google.appengine.ext import ndb
+import webapp2
+
+import buildtags
+import bulkproc
+import search
+
+# Maximum number of entries to collect in a single iteration. This helps
+# avoiding hitting the limit of task size and caps the number of transactions we
+# need to do in a flush task.
+ENTRY_LIMIT = 1000
+
+PROC_NAME = 'backfill_tag_index'
+
+bulkproc.register(
+ PROC_NAME,
+ lambda builds, payload: _process_builds(
+ builds, payload['tag_key'], ENTRY_LIMIT)
+)
+
+
+def launch(tag_key): # pragma: no cover
+ assert tag_key
+ assert ':' not in tag_key
+ assert isinstance(tag_key, basestring)
+ bulkproc.start(PROC_NAME, {'tag_key': tag_key})
+
+
+def _process_builds(builds, tag_key, entry_limit):
+ entry_count = 0
+ new_entries = collections.defaultdict(list)
+
+ for b in builds: # pragma: no branch
+ for t in b.tags:
+ k, v = buildtags.parse(t)
+ if k == tag_key:
+ new_entries[v].append([b.bucket, b.key.id()])
+ entry_count += 1
+ if entry_count >= entry_limit:
+ break
+ if entry_count >= entry_limit:
+ break
+
+ logging.info('collected %d entries', entry_count)
+ _enqueue_flush_entries(tag_key, new_entries)
+
+
+def _enqueue_flush_entries(tag_key, new_entries): # pragma: no cover
+ if new_entries:
+ deferred.defer(
+ _flush_entries,
+ tag_key,
+ new_entries,
+ _queue=bulkproc.QUEUE_NAME,
+ )
+
+
+def _flush_entries(tag_key, new_entries):
+ """Adds new entries to TagIndex entities.
+
+ new_entries is {tag_value: [[bucket, build_id]]}.
+ """
+ logging.info(
+ 'flushing %d tag entries in %d TagIndex entities for tag key %s',
+ sum(len(es) for es in new_entries.itervalues()),
+ len(new_entries),
+ tag_key,
+ )
+
+ new_entry_items = new_entries.items()
+ futs = [
+ _add_index_entries_async(buildtags.unparse(tag_key, tag_value), entries)
+ for tag_value, entries in new_entry_items
+ ]
+ retry_entries = {}
+ updated = 0
+ for (tag_value, entries), f in zip(new_entry_items, futs):
+ ex = f.get_exception()
+ if ex:
+ logging.warning('failed to update TagIndex for "%s" %s', tag_value, ex)
+ retry_entries[tag_value] = entries
+ elif f.get_result():
+ updated += 1
+ logging.info('updated %d TagIndex entities', updated)
+ if retry_entries:
+ logging.warning(
+ 'failed to update %d TagIndex entities, retrying...',
+ len(retry_entries),
+ )
+ _enqueue_flush_entries(tag_key, retry_entries)
+
+
+@ndb.transactional_tasklet
+def _add_index_entries_async(tag, entries):
+ """Adds TagIndexEntries to one TagIndex.
+
+ entries is [[bucket, build_id]].
+
+ Returns True if made changes.
+ """
+ idx_key = search.TagIndex.random_shard_key(tag)
+ idx = (yield idx_key.get_async()) or search.TagIndex(key=idx_key)
+ if idx.permanently_incomplete:
+ # no point in adding entries to an incomplete index.
+ raise ndb.Return(False)
+
+ existing = {e.build_id for e in idx.entries}
+ added = False
+ for bucket, build_id in entries:
+ if build_id not in existing:
+ if len(idx.entries) >= search.TagIndex.MAX_ENTRY_COUNT:
+ logging.warning((
+ 'refusing to store more than %d entries in TagIndex(%s); '
+ 'marking as incomplete.'
+ ), search.TagIndex.MAX_ENTRY_COUNT, idx_key.id())
+ idx.permanently_incomplete = True
+ idx.entries = []
+ yield idx.put_async()
+ raise ndb.Return(True)
+
+ idx.entries.append(search.TagIndexEntry(bucket=bucket, build_id=build_id))
+ added = True
+ if not added:
+ raise ndb.Return(False)
+ yield idx.put_async()
+ raise ndb.Return(True)
diff --git a/appengine/cr-buildbucket/bulkproc.py b/appengine/cr-buildbucket/bulkproc.py
index e1134ec..8b2a16b 100644
--- a/appengine/cr-buildbucket/bulkproc.py
+++ b/appengine/cr-buildbucket/bulkproc.py
@@ -2,9 +2,8 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
-"""Bulk processing of Build entities."""
+"""Build bulk processing of builds, a miniature map-reduce."""
-import collections
import datetime
import json
import logging
@@ -15,16 +14,64 @@
from google.appengine.ext import ndb
import webapp2
+from components import decorators
from components import utils
import webapp2
-from v2 import api as v2_api
-import config
import model
-import search
-PATH_PREFIX = '/internal/task/backfill-tag-index/'
+QUEUE_NAME = 'bulkproc'
+PATH_PREFIX = '/internal/task/bulkproc/'
+SHARD_COUNT = 64
+
+# See register().
+PROCESSOR_REGISTRY = {}
+
+
+def register(name, processor, keys_only=False):
+ """Registers a processor.
+
+ Args:
+ name: identifies the processor.
+ processor: functiton (builds, payload),
+ where builds is an iterable of builds and payload is the payload specified
+ in start(). Builds not read from the iterable will be rescheduled for
+ processing in a separate job.
+ processor is eventually executed on all builds in the datastore.
+ keys_only: whether the builds passed to processor are only a ndb key, not
+ Build entity.
+ """
+
+ assert name not in PROCESSOR_REGISTRY
+ PROCESSOR_REGISTRY[name] = {
+ 'func': processor,
+ 'keys_only': keys_only,
+ }
+
+
+def start(name, payload): # pragma: no cover
+ """Starts a processor by name. See register docstring for params.
+
+ It should be called by a module that calls register().
+ """
+ assert name in PROCESSOR_REGISTRY
+ task = (
+ None,
+ PATH_PREFIX + 'start',
+ utils.encode_to_json({
+ 'shards': SHARD_COUNT,
+ 'proc': {
+ 'name': name,
+ 'payload': payload,
+ },
+ }),
+ )
+ enqueue_tasks(QUEUE_NAME, [task])
+
+
+def _get_proc(name): # pragma: no cover
+ return PROCESSOR_REGISTRY[name]
class TaskBase(webapp2.RequestHandler):
@@ -62,17 +109,15 @@
Assumes that build creation rate was about the same forever.
Payload properties:
- tag: tag to reindex. Required.
shards: number of workers to create. Must be positive. Required.
+ proc: processor to run, see TaskSegment docstring.
"""
def do(self, payload):
- tag = payload['tag']
shards = payload['shards']
- assert isinstance(tag, basestring), tag
- assert tag
assert isinstance(shards, int)
assert shards > 0
+ proc = payload['proc']
first, = model.Build.query().fetch(1, keys_only=True) or [None]
if not first: # pragma: no cover
@@ -102,28 +147,26 @@
None,
'segment/seg:{seg_index}-percent:0',
{
- 'tag': tag,
'job_id': self.request.headers['X-AppEngine-TaskName'],
'iteration': 0,
'seg_index': len(tasks),
'seg_start': seg_start,
'seg_end': seg_end,
'started_ts': utils.datetime_to_timestamp(utils.utcnow()),
+ 'proc': proc,
},
))
self._recurse(tasks)
- logging.info('enqueued %d segment tasks for tag %s', len(tasks), tag)
+ logging.info('enqueued %d segment tasks with proc %r', len(tasks), proc)
class TaskSegment(TaskBase):
"""Processes a chunk of builds in a segment.
- When finished, enqueues a flush task to persist new tag index entries.
- If there are more builds in the segment to process, enqueues itself with a
+ If didn't finish processing entire segment, enqueues itself with a
new query cursor.
Payload properties:
- tag: tag to reindex. Required.
job_id: id of this backfill job. Required.
iteration: segment task iteration. Required.
seg_index: index of this shard. Required.
@@ -131,12 +174,11 @@
seg_end: id of the first build in the next segment. Required.
start_from: start from this build towards seg_end. Defaults to seg_start.
started_ts: timestamp when we started to process this segment.
+ proc: processor to run on the segment. A JSON object with two properties:
+ name: name of the processor, see register()
+ payload: processor payload, see register() and start().
"""
- # Maximum number of entries to collect in a single iteration.
- # This helps avoiding hitting the limit of task size and caps the number of
- # transactions we need to do in a flush task.
- ENTRY_LIMIT = 500
# Maximum number of builds to process in a single iteration.
# Value 3000 is derived from experimentation on the dev server.
# It prevents "Exceeded soft private memory limit" error.
@@ -144,10 +186,11 @@
def do(self, payload):
attempt = int(self.request.headers.get('X-AppEngine-TaskExecutionCount', 0))
-
seg_start = payload['seg_start']
seg_end = payload['seg_end']
start_from = payload.get('start_from', seg_start)
+ proc = payload['proc']
+ proc_def = _get_proc(proc['name'])
logging.info('range %d-%d', seg_start, seg_end)
logging.info('starting from %s', start_from)
@@ -159,43 +202,30 @@
model.Build.key >= ndb.Key(model.Build, start_from),
model.Build.key < ndb.Key(model.Build, seg_end)
)
- iterator = q.iter()
+ iterator = q.iter(keys_only=proc_def['keys_only'])
- entry_count = 0
- build_count = 0
- new_entries = collections.defaultdict(list)
+ build_count = [0]
- # Datastore query timeout is 60 sec. Limit it to 50 sec.
- deadline = utils.utcnow() + datetime.timedelta(seconds=50)
- while (utils.utcnow() < deadline and entry_count < self.ENTRY_LIMIT and
- build_count < self.BUILD_LIMIT and iterator.has_next()):
- b = iterator.next()
- build_count += 1
- for t in b.tags:
- k, v = t.split(':', 1)
- if k == payload['tag']:
- new_entries[v].append([b.bucket, b.key.id()])
- entry_count += 1
- logging.info(
- 'collected %d entries from %d builds', entry_count, build_count
- )
+ def iterate_segment():
+ # Datastore query timeout is 60 sec. Limit it to 50 sec.
+ deadline = utils.utcnow() + datetime.timedelta(seconds=50)
+ while (utils.utcnow() < deadline and build_count[0] < self.BUILD_LIMIT and
+ iterator.has_next()):
+ b = iterator.next()
+ yield b
+ build_count[0] += 1
- if new_entries: # pragma: no branch
- logging.info(
- 'enqueuing a task to flush %d tag entries in %d TagIndex entities...',
- entry_count, len(new_entries)
- )
- flush_payload = {
- 'tag': payload['tag'],
- 'new_entries': new_entries,
- }
- self._recurse([(None, 'flush', flush_payload)])
+ proc_def['func'](iterate_segment(), proc['payload'])
+ logging.info('processed %d builds', build_count[0])
+
if iterator.has_next():
logging.info('enqueuing a task for the next iteration...')
p = payload.copy()
p['iteration'] += 1
- p['start_from'] = iterator.next().key.id()
+ p['start_from'] = (
+ iterator.next() if proc_def['keys_only'] else iterator.next().key.id()
+ )
seg_len = seg_end - seg_start
percent = 100 * (p['start_from'] - seg_start) / seg_len
@@ -218,90 +248,17 @@
)
-class TaskFlushTagIndexEntries(TaskBase):
- """Saves new tag index entries.
-
- Payload properties:
- tag: tag to reindex. Required.
- new_entries: a dict {tag_value: [[bucket, id}]]} of new index entries to
- add. Required.
- """
-
- def do(self, payload):
- new_entries = payload['new_entries']
- logging.info(
- 'flushing %d tag entries in %d TagIndex entities',
- sum(len(es) for es in new_entries.itervalues()), len(new_entries)
- )
-
- futs = [
- self._add_index_entries_async(
- payload['tag'] + ':' + tag_value, entries
- ) for tag_value, entries in new_entries.iteritems()
- ]
- ndb.Future.wait_all(futs)
-
- retry_entries = {}
- updated = 0
- for (tag, entries), f in zip(new_entries.iteritems(), futs):
- ex = f.get_exception()
- if ex:
- logging.warning('failed to update TagIndex for %r: %s', tag, ex)
- retry_entries[tag] = entries
- elif f.get_result():
- updated += 1
- logging.info('updated %d TagIndex entities', updated)
- if retry_entries:
- logging.warning(
- 'failed to update %d TagIndex entities, retrying...',
- len(retry_entries)
- )
- p = payload.copy()
- p['new_entries'] = retry_entries
- self._recurse([(None, 'flush', p)])
-
- @staticmethod
- @ndb.transactional_tasklet
- def _add_index_entries_async(tag, entries):
- idx_key = search.TagIndex.random_shard_key(tag)
- idx = (yield idx_key.get_async()) or search.TagIndex(key=idx_key)
- if idx.permanently_incomplete:
- # no point in adding entries to an incomplete index.
- raise ndb.Return(False)
- existing = {e.build_id for e in idx.entries}
- added = False
- for bucket, build_id in entries:
- if build_id not in existing:
- if len(idx.entries) >= search.TagIndex.MAX_ENTRY_COUNT:
- logging.warning((
- 'refusing to store more than %d entries in TagIndex(%s); '
- 'marking as incomplete.'
- ), search.TagIndex.MAX_ENTRY_COUNT, idx_key.id())
- idx.permanently_incomplete = True
- idx.entries = []
- yield idx.put_async()
- raise ndb.Return(True)
-
- idx.entries.append(
- search.TagIndexEntry(bucket=bucket, build_id=build_id)
- )
- added = True
- if not added:
- raise ndb.Return(False)
- yield idx.put_async()
- raise ndb.Return(True)
-
-
# mocked in tests.
def enqueue_tasks(queue_name, tasks): # pragma: no cover
"""Adds tasks to the queue.
tasks must be a list of tuples (name, url, payload).
"""
- taskqueue.Queue(queue_name).add([
- taskqueue.Task(name=name, url=url, payload=payload)
- for name, url, payload in tasks
- ])
+ if tasks:
+ taskqueue.Queue(queue_name).add([
+ taskqueue.Task(name=name, url=url, payload=payload)
+ for name, url, payload in tasks
+ ])
def get_routes(): # pragma: no cover
@@ -309,5 +266,4 @@
return [
webapp2.Route(PATH_PREFIX + r'start', TaskStart),
webapp2.Route(PATH_PREFIX + r'segment/<rest>', TaskSegment),
- webapp2.Route(PATH_PREFIX + r'flush', TaskFlushTagIndexEntries),
]
diff --git a/appengine/cr-buildbucket/queue.yaml b/appengine/cr-buildbucket/queue.yaml
index b797273..4868f23 100644
--- a/appengine/cr-buildbucket/queue.yaml
+++ b/appengine/cr-buildbucket/queue.yaml
@@ -3,7 +3,7 @@
- name: backend-default
target: backend
rate: 100/s
-- name: backfill-tag-index
+- name: bulkproc
target: backend
rate: 100/s
max_concurrent_requests: 128
diff --git a/appengine/cr-buildbucket/test/api_test.py b/appengine/cr-buildbucket/test/api_test.py
index d0eb97e..f0f44a1 100644
--- a/appengine/cr-buildbucket/test/api_test.py
+++ b/appengine/cr-buildbucket/test/api_test.py
@@ -23,6 +23,7 @@
from test import config_test
from test.test_util import future, future_exception
import api
+import backfill_tag_index
import config
import creation
import errors
@@ -763,27 +764,14 @@
####### BACKFILL_TAG_INDEX ###################################################
- @mock.patch('api.enqueue_task')
- def test_backfill_tag_index(self, enqueue_task):
+ @mock.patch('backfill_tag_index.launch')
+ def test_backfill_tag_index(self, launch_tag_index_backfilling):
auth.bootstrap_group(auth.ADMIN_GROUP, [auth.Anonymous])
- req = {
- 'tag': 'buildset',
- 'shards': '64',
- }
+ req = {'tag_key': 'buildset'}
self.call_api('backfill_tag_index', req, status=(200, 204))
- enqueue_task.assert_called_once_with(
- 'backfill-tag-index',
- '/internal/task/backfill-tag-index/start',
- utils.encode_to_json({
- 'tag': 'buildset',
- 'shards': 64,
- }),
- )
+ launch_tag_index_backfilling.assert_called_once_with('buildset')
def test_backfill_tag_index_fails(self):
auth.bootstrap_group(auth.ADMIN_GROUP, [auth.Anonymous])
self.call_api('backfill_tag_index', {}, status=400)
- self.call_api('backfill_tag_index', {'tag': 'buildset'}, status=400)
- self.call_api(
- 'backfill_tag_index', {'tag': 'buildset', 'shards': 0}, status=400
- )
+ self.call_api('backfill_tag_index', {'tag_key': 'a:b'}, status=400)
diff --git a/appengine/cr-buildbucket/test/backfill_tag_index_test.py b/appengine/cr-buildbucket/test/backfill_tag_index_test.py
new file mode 100644
index 0000000..c6eff03
--- /dev/null
+++ b/appengine/cr-buildbucket/test/backfill_tag_index_test.py
@@ -0,0 +1,143 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import contextlib
+import datetime
+import mock
+
+from google.appengine.ext import ndb
+
+from test.test_util import future_exception
+from testing_utils import testing
+import backfill_tag_index
+import bulkproc
+import main
+import model
+import search
+import v2
+
+
+class BackfillTagIndexTest(testing.AppengineTestCase):
+
+ def setUp(self):
+ super(BackfillTagIndexTest, self).setUp()
+ self.patch('search.TagIndex.random_shard_index', return_value=0)
+ self.patch('backfill_tag_index._enqueue_flush_entries')
+
+ def test_process(self):
+ builds = [
+ model.Build(
+ id=i, bucket='chromium', tags=[
+ 'buildset:%d' % (i % 3),
+ 'a:b',
+ ]
+ ) for i in xrange(50, 60)
+ ]
+
+ backfill_tag_index._process_builds(builds, 'buildset', 5)
+
+ backfill_tag_index._enqueue_flush_entries.assert_called_with(
+ 'buildset', {
+ '0': [
+ ['chromium', 51],
+ ['chromium', 54],
+ ],
+ '1': [['chromium', 52]],
+ '2': [
+ ['chromium', 50],
+ ['chromium', 53],
+ ],
+ }
+ )
+
+ def test_flush_entries(self):
+ search.TagIndex(
+ id='buildset:0',
+ entries=[
+ search.TagIndexEntry(bucket='chormium', build_id=51),
+ ]
+ ).put()
+ search.TagIndex(
+ id='buildset:2',
+ entries=[
+ search.TagIndexEntry(bucket='chormium', build_id=1),
+ search.TagIndexEntry(bucket='chormium', build_id=100),
+ ]
+ ).put()
+
+ backfill_tag_index._flush_entries(
+ 'buildset',
+ {
+ '0': [['chromium', 51]],
+ '1': [['chromium', 52]],
+ '2': [['chromium', 50]],
+ },
+ )
+
+ idx0 = search.TagIndex.get_by_id('buildset:0')
+ self.assertIsNotNone(idx0)
+ self.assertEqual(len(idx0.entries), 1)
+ self.assertEqual(idx0.entries[0].build_id, 51)
+
+ idx1 = search.TagIndex.get_by_id('buildset:1')
+ self.assertIsNotNone(idx1)
+ self.assertEqual(len(idx1.entries), 1)
+ self.assertEqual(idx1.entries[0].build_id, 52)
+
+ idx2 = search.TagIndex.get_by_id('buildset:2')
+ self.assertIsNotNone(idx2)
+ self.assertEqual(len(idx2.entries), 3)
+ self.assertEqual({e.build_id for e in idx2.entries}, {1, 50, 100})
+
+ def test_flush_entries_retry(self):
+ orig_add_async = backfill_tag_index._add_index_entries_async
+
+ def add_async(tag, entries):
+ if tag == 'buildset:1':
+ return future_exception(Exception('transient error'))
+ return orig_add_async(tag, entries)
+
+ with mock.patch(
+ 'backfill_tag_index._add_index_entries_async',
+ side_effect=add_async,
+ ):
+ backfill_tag_index._flush_entries(
+ 'buildset',
+ {
+ '0': [['chromium', 51]],
+ '1': [['chromium', 52]],
+ '2': [['chromium', 50]],
+ },
+ )
+
+ idx0 = search.TagIndex.get_by_id('buildset:0')
+ self.assertIsNotNone(idx0)
+ self.assertEqual(len(idx0.entries), 1)
+ self.assertEqual(idx0.entries[0].build_id, 51)
+
+ idx2 = search.TagIndex.get_by_id('buildset:2')
+ self.assertIsNotNone(idx2)
+ self.assertEqual(len(idx2.entries), 1)
+ self.assertEqual(idx2.entries[0].build_id, 50)
+
+ backfill_tag_index._enqueue_flush_entries.assert_called_with(
+ 'buildset', {'1': [['chromium', 52]]}
+ )
+
+ def test_flush_entries_too_many(self):
+ backfill_tag_index._flush_entries(
+ 'buildset',
+ {'0': [['chromium', i] for i in xrange(1, 2001)]},
+ )
+
+ idx0 = search.TagIndex.get_by_id('buildset:0')
+ self.assertIsNotNone(idx0)
+ self.assertTrue(idx0.permanently_incomplete)
+ self.assertEqual(len(idx0.entries), 0)
+
+ # once more for coverage
+ backfill_tag_index._flush_entries(
+ 'buildset',
+ {'0': [['chromium', 1]]},
+ )
diff --git a/appengine/cr-buildbucket/test/bulkproc_test.py b/appengine/cr-buildbucket/test/bulkproc_test.py
index 3e6c849..a5d9618 100644
--- a/appengine/cr-buildbucket/test/bulkproc_test.py
+++ b/appengine/cr-buildbucket/test/bulkproc_test.py
@@ -4,6 +4,7 @@
import contextlib
import datetime
+import itertools
import mock
from google.appengine.ext import ndb
@@ -30,12 +31,11 @@
super(TestBase, self).setUp()
self.now = datetime.datetime(2017, 1, 1)
self.patch('components.utils.utcnow', side_effect=lambda: self.now)
- self.patch('search.TagIndex.random_shard_index', return_value=0)
def post(self, payload, headers=None):
assert self.path_suffix
headers = headers or {}
- headers['X-AppEngine-QueueName'] = 'backfill-tag-index'
+ headers['X-AppEngine-QueueName'] = bulkproc.QUEUE_NAME
headers['X-AppEngine-TaskName'] = 'taskname'
task_url = bulkproc.PATH_PREFIX + self.path_suffix
return self.test_app.post(
@@ -46,7 +46,7 @@
class StartTest(TestBase):
path_suffix = 'start'
- @mock.patch('bulkproc.enqueue_tasks')
+ @mock.patch('bulkproc.enqueue_tasks', autospec=True)
def test_start(self, enqueue_tasks):
ndb.put_multi([
model.Build(
@@ -55,67 +55,71 @@
create_time=self.now - datetime.timedelta(minutes=i)
) for i in xrange(1, 11)
])
- self.post({'tag': 'buildset', 'shards': 3})
+ proc = {'name': 'foo', 'payload': 'bar'}
+ self.post({
+ 'shards': 3,
+ 'proc': proc,
+ })
seg_path_prefix = bulkproc.PATH_PREFIX + 'segment/'
enqueue_tasks.assert_called_with(
- 'backfill-tag-index', [
+ 'bulkproc', [
(
None,
seg_path_prefix + 'seg:0-percent:0',
utils.encode_to_json({
- 'tag': 'buildset',
'job_id': 'taskname',
'iteration': 0,
'seg_index': 0,
'seg_start': 1,
'seg_end': 4,
'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': proc,
}),
),
(
None,
seg_path_prefix + 'seg:1-percent:0',
utils.encode_to_json({
- 'tag': 'buildset',
'job_id': 'taskname',
'iteration': 0,
'seg_index': 1,
'seg_start': 4,
'seg_end': 7,
'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': proc,
}),
),
(
None,
seg_path_prefix + 'seg:2-percent:0',
utils.encode_to_json({
- 'tag': 'buildset',
'job_id': 'taskname',
'iteration': 0,
'seg_index': 2,
'seg_start': 7,
'seg_end': 10,
'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': proc,
}),
),
(
None,
seg_path_prefix + 'seg:3-percent:0',
utils.encode_to_json({
- 'tag': 'buildset',
'job_id': 'taskname',
'iteration': 0,
'seg_index': 3,
'seg_start': 10,
'seg_end': 11,
'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': proc,
}),
),
]
)
- @mock.patch('bulkproc.enqueue_tasks')
+ @mock.patch('bulkproc.enqueue_tasks', autospec=True)
def test_start_many_shards(self, enqueue_tasks):
ndb.put_multi([
model.Build(
@@ -125,8 +129,8 @@
) for i in xrange(1, 150)
])
self.post({
- 'tag': 'buildset',
'shards': 100,
+ 'proc': {'name': 'foo', 'payload': 'bar'},
})
self.assertEqual(enqueue_tasks.call_count, 2)
@@ -135,16 +139,15 @@
class SegmentTest(TestBase):
path_suffix = 'segment/rest'
- @contextlib.contextmanager
- def entry_limit(self, limit):
- orig_entry_limit = bulkproc.TaskSegment.ENTRY_LIMIT
- bulkproc.TaskSegment.ENTRY_LIMIT = limit
- try:
- yield
- finally:
- bulkproc.TaskSegment.ENTRY_LIMIT = orig_entry_limit
+ def setUp(self):
+ super(SegmentTest, self).setUp()
+ self.proc = {
+ 'func': lambda builds, _: list(builds), # process all
+ 'keys_only': False,
+ }
+ self.patch('bulkproc._get_proc', return_value=self.proc)
- @mock.patch('bulkproc.enqueue_tasks')
+ @mock.patch('bulkproc.enqueue_tasks', autospec=True)
def test_segment_partial(self, enqueue_tasks):
ndb.put_multi([
model.Build(
@@ -155,40 +158,25 @@
) for i in xrange(50, 60)
])
- with self.entry_limit(5):
- self.post({
- 'tag': 'buildset',
- 'job_id': 'jobid',
- 'iteration': 0,
- 'seg_index': 0,
- 'seg_start': 50,
- 'seg_end': 60,
- 'started_ts': utils.datetime_to_timestamp(self.now),
- })
+ def process(builds, payload):
+ # process 5 builds
+ page = list(itertools.islice(builds, 5))
+ self.assertEqual([b.key.id() for b in page], range(50, 55))
+ self.assertEqual(payload, 'bar')
- enqueue_tasks.assert_any_call(
- 'backfill-tag-index', [(
- None,
- bulkproc.PATH_PREFIX + 'flush',
- utils.encode_to_json({
- 'tag': 'buildset',
- 'new_entries': {
- '0': [
- ['chromium', 51],
- ['chromium', 54],
- ],
- '1': [['chromium', 52]],
- '2': [
- ['chromium', 50],
- ['chromium', 53],
- ],
- },
- }),
- )]
- )
+ self.proc['func'] = process
+
+ self.post({
+ 'job_id': 'jobid',
+ 'iteration': 0,
+ 'seg_index': 0,
+ 'seg_start': 50,
+ 'seg_end': 60,
+ 'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': {'name': 'foo', 'payload': 'bar'},
+ })
expected_next_payload = {
- 'tag': 'buildset',
'job_id': 'jobid',
'iteration': 1,
'seg_index': 0,
@@ -196,10 +184,10 @@
'seg_end': 60,
'start_from': 55,
'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': {'name': 'foo', 'payload': 'bar'},
}
- print enqueue_tasks.call_args_list
- enqueue_tasks.assert_any_call(
- 'backfill-tag-index',
+ enqueue_tasks.assert_called_with(
+ 'bulkproc',
[(
'jobid-0-1',
bulkproc.PATH_PREFIX + 'segment/seg:0-percent:50',
@@ -207,177 +195,63 @@
)],
)
- self.post(expected_next_payload)
-
- @mock.patch('bulkproc.enqueue_tasks')
+ @mock.patch('bulkproc.enqueue_tasks', autospec=True)
def test_segment_full(self, enqueue_tasks):
ndb.put_multi([
model.Build(id=i, bucket='chromium', tags=['buildset:%d' % (i % 3)])
for i in xrange(50, 52)
])
self.post({
- 'tag': 'buildset',
+ 'job_id': 'jobid',
+ 'iteration': 0,
'seg_index': 0,
'seg_start': 50,
'seg_end': 60,
'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': {'name': 'foo', 'payload': 'bar'},
})
- self.assertEqual(enqueue_tasks.call_count, 1)
- enqueue_tasks.assert_called_with(
- 'backfill-tag-index',
- [(
- None,
- bulkproc.PATH_PREFIX + 'flush',
- utils.encode_to_json({
- 'tag': 'buildset',
- 'new_entries': {
- '0': [['chromium', 51]],
- '2': [['chromium', 50]],
- },
- }),
- )],
- )
+ self.assertEqual(enqueue_tasks.call_count, 0)
- @mock.patch('bulkproc.enqueue_tasks')
+ @mock.patch('bulkproc.enqueue_tasks', autospec=True)
def test_segment_attempt_2(self, enqueue_tasks):
ndb.put_multi([
model.Build(id=i, bucket='chromium', tags=['buildset:%d' % (i % 3)])
for i in xrange(50, 60)
])
- with self.entry_limit(1):
- headers = {
- 'X-AppEngine-TaskExecutionCount': '1',
- }
- self.post(
- {
- 'tag': 'buildset',
- 'job_id': 'jobid',
- 'iteration': 0,
- 'seg_index': 0,
- 'seg_start': 50,
- 'seg_end': 60,
- 'started_ts': utils.datetime_to_timestamp(self.now),
- },
- headers=headers,
- )
+ # process 5 builds
+ self.proc['func'] = lambda builds, _: list(itertools.islice(builds, 5))
- enqueue_tasks.assert_any_call(
- 'backfill-tag-index', [(
+ self.post(
+ {
+ 'job_id': 'jobid',
+ 'iteration': 0,
+ 'seg_index': 0,
+ 'seg_start': 50,
+ 'seg_end': 60,
+ 'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': {'name': 'foo', 'payload': 'bar'},
+ },
+ headers={
+ 'X-AppEngine-TaskExecutionCount': '1',
+ },
+ )
+
+ enqueue_tasks.assert_called_with(
+ 'bulkproc',
+ [(
'jobid-0-1',
- bulkproc.PATH_PREFIX + 'segment/seg:0-percent:10',
+ bulkproc.PATH_PREFIX + 'segment/seg:0-percent:50',
utils.encode_to_json({
- 'tag': 'buildset',
'job_id': 'jobid',
'iteration': 1,
'seg_index': 0,
'seg_start': 50,
'seg_end': 60,
- 'start_from': 51,
+ 'start_from': 55,
'started_ts': utils.datetime_to_timestamp(self.now),
+ 'proc': {'name': 'foo', 'payload': 'bar'},
}),
- )]
+ )],
)
-
-
-class FlushTest(TestBase):
- path_suffix = 'flush'
-
- def test_flush(self):
- search.TagIndex(
- id='buildset:0',
- entries=[
- search.TagIndexEntry(bucket='chormium', build_id=51),
- ]
- ).put()
- search.TagIndex(
- id='buildset:2',
- entries=[
- search.TagIndexEntry(bucket='chormium', build_id=1),
- search.TagIndexEntry(bucket='chormium', build_id=100),
- ]
- ).put()
- self.post({
- 'tag': 'buildset',
- 'new_entries': {
- '0': [['chromium', 51]],
- '1': [['chromium', 52]],
- '2': [['chromium', 50]],
- },
- })
-
- idx0 = search.TagIndex.get_by_id('buildset:0')
- self.assertIsNotNone(idx0)
- self.assertEqual(len(idx0.entries), 1)
- self.assertEqual(idx0.entries[0].build_id, 51)
-
- idx1 = search.TagIndex.get_by_id('buildset:1')
- self.assertIsNotNone(idx1)
- self.assertEqual(len(idx1.entries), 1)
- self.assertEqual(idx1.entries[0].build_id, 52)
-
- idx2 = search.TagIndex.get_by_id('buildset:2')
- self.assertIsNotNone(idx2)
- self.assertEqual(len(idx2.entries), 3)
- self.assertEqual({e.build_id for e in idx2.entries}, {1, 50, 100})
-
- @mock.patch('bulkproc.enqueue_tasks')
- def test_flush_retry(self, enqueue_tasks):
- orig_add = bulkproc.TaskFlushTagIndexEntries._add_index_entries_async
-
- def add(tag, entries):
- if tag == 'buildset:1':
- return future_exception(Exception('transient error'))
- return orig_add(tag, entries)
-
- with mock.patch(
- 'bulkproc.TaskFlushTagIndexEntries._add_index_entries_async',
- side_effect=add,
- ):
- self.post({
- 'tag': 'buildset',
- 'new_entries': {
- '0': [['chromium', 51]],
- '1': [['chromium', 52]],
- '2': [['chromium', 50]],
- },
- })
-
- idx0 = search.TagIndex.get_by_id('buildset:0')
- self.assertIsNotNone(idx0)
- self.assertEqual(len(idx0.entries), 1)
- self.assertEqual(idx0.entries[0].build_id, 51)
-
- idx2 = search.TagIndex.get_by_id('buildset:2')
- self.assertIsNotNone(idx2)
- self.assertEqual(len(idx2.entries), 1)
- self.assertEqual(idx2.entries[0].build_id, 50)
-
- enqueue_tasks.assert_called_with(
- 'backfill-tag-index', [(
- None,
- bulkproc.PATH_PREFIX + 'flush',
- utils.encode_to_json({
- 'tag': 'buildset',
- 'new_entries': {'1': [['chromium', 52]]},
- }),
- )]
- )
-
- def test_flush_too_many(self):
- self.post({
- 'tag': 'buildset',
- 'new_entries': {'0': [['chromium', i] for i in xrange(1, 2001)]},
- })
-
- idx0 = search.TagIndex.get_by_id('buildset:0')
- self.assertIsNotNone(idx0)
- self.assertTrue(idx0.permanently_incomplete)
- self.assertEqual(len(idx0.entries), 0)
-
- # Again, for code coverage.
- self.post({
- 'tag': 'buildset',
- 'new_entries': {'0': [['chromium', 1]]},
- })