[buildbucket] Generalize bulkproc.py

Generalize task handlers in bulkproc to allow any processing code, not
necessarily tag index backfilling.

Bug: 856724
Change-Id: I71ec2d51f075c88f7ea58c6e63d5d7a2b7de0f42
Reviewed-on: https://chromium-review.googlesource.com/1115431
Commit-Queue: Nodir Turakulov <nodir@chromium.org>
Reviewed-by: Vadim Shtayura <vadimsh@chromium.org>
diff --git a/appengine/cr-buildbucket/api.py b/appengine/cr-buildbucket/api.py
index b3f71d0..6e9b192 100644
--- a/appengine/cr-buildbucket/api.py
+++ b/appengine/cr-buildbucket/api.py
@@ -18,7 +18,7 @@
 import gae_ts_mon
 
 import api_common
-import bulkproc
+import backfill_tag_index
 import config
 import creation
 import errors
@@ -740,27 +740,13 @@
   @buildbucket_api_method(
       endpoints.ResourceContainer(
           message_types.VoidMessage,
-          tag=messages.StringField(1, required=True),
-          shards=messages.IntegerField(2, required=True),
+          tag_key=messages.StringField(1, required=True),
       ), message_types.VoidMessage
   )
   @auth.require(auth.is_admin)
   def backfill_tag_index(self, request):
     """Backfills TagIndex entites from builds."""
-    if request.shards <= 0:
-      raise endpoints.BadRequestException('shards must be positive')
-    enqueue_task(
-        'backfill-tag-index',
-        bulkproc.PATH_PREFIX + 'start',
-        utils.encode_to_json({
-            'tag': request.tag,
-            'shards': request.shards,
-        }),
-    )
+    if ':' in request.tag_key:
+      raise endpoints.BadRequestException('invalid tag_key')
+    backfill_tag_index.launch(request.tag_key)
     return message_types.VoidMessage()
-
-
-# mocked in tests.
-def enqueue_task(queue_name, url, payload):  # pragma: no cover
-  task = taskqueue.Task(url=url, payload=payload)
-  return task.add(queue_name=queue_name)
diff --git a/appengine/cr-buildbucket/backfill_tag_index.py b/appengine/cr-buildbucket/backfill_tag_index.py
new file mode 100644
index 0000000..d9e6e6b
--- /dev/null
+++ b/appengine/cr-buildbucket/backfill_tag_index.py
@@ -0,0 +1,137 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Tag index backfilling."""
+
+import collections
+import json
+import logging
+
+from google.appengine.ext import deferred
+from google.appengine.ext import ndb
+import webapp2
+
+import buildtags
+import bulkproc
+import search
+
+# Maximum number of entries to collect in a single iteration. This helps
+# avoiding hitting the limit of task size and caps the number of transactions we
+# need to do in a flush task.
+ENTRY_LIMIT = 1000
+
+PROC_NAME = 'backfill_tag_index'
+
+bulkproc.register(
+    PROC_NAME,
+    lambda builds, payload: _process_builds(
+        builds, payload['tag_key'], ENTRY_LIMIT)
+)
+
+
+def launch(tag_key):  # pragma: no cover
+  assert tag_key
+  assert ':' not in tag_key
+  assert isinstance(tag_key, basestring)
+  bulkproc.start(PROC_NAME, {'tag_key': tag_key})
+
+
+def _process_builds(builds, tag_key, entry_limit):
+  entry_count = 0
+  new_entries = collections.defaultdict(list)
+
+  for b in builds:  # pragma: no branch
+    for t in b.tags:
+      k, v = buildtags.parse(t)
+      if k == tag_key:
+        new_entries[v].append([b.bucket, b.key.id()])
+        entry_count += 1
+        if entry_count >= entry_limit:
+          break
+    if entry_count >= entry_limit:
+      break
+
+  logging.info('collected %d entries', entry_count)
+  _enqueue_flush_entries(tag_key, new_entries)
+
+
+def _enqueue_flush_entries(tag_key, new_entries):  # pragma: no cover
+  if new_entries:
+    deferred.defer(
+        _flush_entries,
+        tag_key,
+        new_entries,
+        _queue=bulkproc.QUEUE_NAME,
+    )
+
+
+def _flush_entries(tag_key, new_entries):
+  """Adds new entries to TagIndex entities.
+
+  new_entries is {tag_value: [[bucket, build_id]]}.
+  """
+  logging.info(
+      'flushing %d tag entries in %d TagIndex entities for tag key %s',
+      sum(len(es) for es in new_entries.itervalues()),
+      len(new_entries),
+      tag_key,
+  )
+
+  new_entry_items = new_entries.items()
+  futs = [
+      _add_index_entries_async(buildtags.unparse(tag_key, tag_value), entries)
+      for tag_value, entries in new_entry_items
+  ]
+  retry_entries = {}
+  updated = 0
+  for (tag_value, entries), f in zip(new_entry_items, futs):
+    ex = f.get_exception()
+    if ex:
+      logging.warning('failed to update TagIndex for "%s" %s', tag_value, ex)
+      retry_entries[tag_value] = entries
+    elif f.get_result():
+      updated += 1
+  logging.info('updated %d TagIndex entities', updated)
+  if retry_entries:
+    logging.warning(
+        'failed to update %d TagIndex entities, retrying...',
+        len(retry_entries),
+    )
+    _enqueue_flush_entries(tag_key, retry_entries)
+
+
+@ndb.transactional_tasklet
+def _add_index_entries_async(tag, entries):
+  """Adds TagIndexEntries to one TagIndex.
+
+  entries is [[bucket, build_id]].
+
+  Returns True if made changes.
+  """
+  idx_key = search.TagIndex.random_shard_key(tag)
+  idx = (yield idx_key.get_async()) or search.TagIndex(key=idx_key)
+  if idx.permanently_incomplete:
+    # no point in adding entries to an incomplete index.
+    raise ndb.Return(False)
+
+  existing = {e.build_id for e in idx.entries}
+  added = False
+  for bucket, build_id in entries:
+    if build_id not in existing:
+      if len(idx.entries) >= search.TagIndex.MAX_ENTRY_COUNT:
+        logging.warning((
+            'refusing to store more than %d entries in TagIndex(%s); '
+            'marking as incomplete.'
+        ), search.TagIndex.MAX_ENTRY_COUNT, idx_key.id())
+        idx.permanently_incomplete = True
+        idx.entries = []
+        yield idx.put_async()
+        raise ndb.Return(True)
+
+      idx.entries.append(search.TagIndexEntry(bucket=bucket, build_id=build_id))
+      added = True
+  if not added:
+    raise ndb.Return(False)
+  yield idx.put_async()
+  raise ndb.Return(True)
diff --git a/appengine/cr-buildbucket/bulkproc.py b/appengine/cr-buildbucket/bulkproc.py
index e1134ec..8b2a16b 100644
--- a/appengine/cr-buildbucket/bulkproc.py
+++ b/appengine/cr-buildbucket/bulkproc.py
@@ -2,9 +2,8 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
-"""Bulk processing of Build entities."""
+"""Build bulk processing of builds, a miniature map-reduce."""
 
-import collections
 import datetime
 import json
 import logging
@@ -15,16 +14,64 @@
 from google.appengine.ext import ndb
 import webapp2
 
+from components import decorators
 from components import utils
 
 import webapp2
 
-from v2 import api as v2_api
-import config
 import model
-import search
 
-PATH_PREFIX = '/internal/task/backfill-tag-index/'
+QUEUE_NAME = 'bulkproc'
+PATH_PREFIX = '/internal/task/bulkproc/'
+SHARD_COUNT = 64
+
+# See register().
+PROCESSOR_REGISTRY = {}
+
+
+def register(name, processor, keys_only=False):
+  """Registers a processor.
+
+  Args:
+    name: identifies the processor.
+    processor: functiton (builds, payload),
+      where builds is an iterable of builds and payload is the payload specified
+      in start(). Builds not read from the iterable will be rescheduled for
+      processing in a separate job.
+      processor is eventually executed on all builds in the datastore.
+    keys_only: whether the builds passed to processor are only a ndb key, not
+      Build entity.
+  """
+
+  assert name not in PROCESSOR_REGISTRY
+  PROCESSOR_REGISTRY[name] = {
+      'func': processor,
+      'keys_only': keys_only,
+  }
+
+
+def start(name, payload):  # pragma: no cover
+  """Starts a processor by name. See register docstring for params.
+
+  It should be called by a module that calls register().
+  """
+  assert name in PROCESSOR_REGISTRY
+  task = (
+      None,
+      PATH_PREFIX + 'start',
+      utils.encode_to_json({
+          'shards': SHARD_COUNT,
+          'proc': {
+              'name': name,
+              'payload': payload,
+          },
+      }),
+  )
+  enqueue_tasks(QUEUE_NAME, [task])
+
+
+def _get_proc(name):  # pragma: no cover
+  return PROCESSOR_REGISTRY[name]
 
 
 class TaskBase(webapp2.RequestHandler):
@@ -62,17 +109,15 @@
   Assumes that build creation rate was about the same forever.
 
   Payload properties:
-    tag: tag to reindex. Required.
     shards: number of workers to create. Must be positive. Required.
+    proc: processor to run, see TaskSegment docstring.
   """
 
   def do(self, payload):
-    tag = payload['tag']
     shards = payload['shards']
-    assert isinstance(tag, basestring), tag
-    assert tag
     assert isinstance(shards, int)
     assert shards > 0
+    proc = payload['proc']
 
     first, = model.Build.query().fetch(1, keys_only=True) or [None]
     if not first:  # pragma: no cover
@@ -102,28 +147,26 @@
           None,
           'segment/seg:{seg_index}-percent:0',
           {
-              'tag': tag,
               'job_id': self.request.headers['X-AppEngine-TaskName'],
               'iteration': 0,
               'seg_index': len(tasks),
               'seg_start': seg_start,
               'seg_end': seg_end,
               'started_ts': utils.datetime_to_timestamp(utils.utcnow()),
+              'proc': proc,
           },
       ))
     self._recurse(tasks)
-    logging.info('enqueued %d segment tasks for tag %s', len(tasks), tag)
+    logging.info('enqueued %d segment tasks with proc %r', len(tasks), proc)
 
 
 class TaskSegment(TaskBase):
   """Processes a chunk of builds in a segment.
 
-  When finished, enqueues a flush task to persist new tag index entries.
-  If there are more builds in the segment to process, enqueues itself with a
+  If didn't finish processing entire segment, enqueues itself with a
   new query cursor.
 
   Payload properties:
-    tag: tag to reindex. Required.
     job_id: id of this backfill job. Required.
     iteration: segment task iteration. Required.
     seg_index: index of this shard. Required.
@@ -131,12 +174,11 @@
     seg_end: id of the first build in the next segment. Required.
     start_from: start from this build towards seg_end. Defaults to seg_start.
     started_ts: timestamp when we started to process this segment.
+    proc: processor to run on the segment. A JSON object with two properties:
+      name: name of the processor, see register()
+      payload: processor payload, see register() and start().
   """
 
-  # Maximum number of entries to collect in a single iteration.
-  # This helps avoiding hitting the limit of task size and caps the number of
-  # transactions we need to do in a flush task.
-  ENTRY_LIMIT = 500
   # Maximum number of builds to process in a single iteration.
   # Value 3000 is derived from experimentation on the dev server.
   # It prevents "Exceeded soft private memory limit" error.
@@ -144,10 +186,11 @@
 
   def do(self, payload):
     attempt = int(self.request.headers.get('X-AppEngine-TaskExecutionCount', 0))
-
     seg_start = payload['seg_start']
     seg_end = payload['seg_end']
     start_from = payload.get('start_from', seg_start)
+    proc = payload['proc']
+    proc_def = _get_proc(proc['name'])
 
     logging.info('range %d-%d', seg_start, seg_end)
     logging.info('starting from %s', start_from)
@@ -159,43 +202,30 @@
         model.Build.key >= ndb.Key(model.Build, start_from),
         model.Build.key < ndb.Key(model.Build, seg_end)
     )
-    iterator = q.iter()
+    iterator = q.iter(keys_only=proc_def['keys_only'])
 
-    entry_count = 0
-    build_count = 0
-    new_entries = collections.defaultdict(list)
+    build_count = [0]
 
-    # Datastore query timeout is 60 sec. Limit it to 50 sec.
-    deadline = utils.utcnow() + datetime.timedelta(seconds=50)
-    while (utils.utcnow() < deadline and entry_count < self.ENTRY_LIMIT and
-           build_count < self.BUILD_LIMIT and iterator.has_next()):
-      b = iterator.next()
-      build_count += 1
-      for t in b.tags:
-        k, v = t.split(':', 1)
-        if k == payload['tag']:
-          new_entries[v].append([b.bucket, b.key.id()])
-          entry_count += 1
-    logging.info(
-        'collected %d entries from %d builds', entry_count, build_count
-    )
+    def iterate_segment():
+      # Datastore query timeout is 60 sec. Limit it to 50 sec.
+      deadline = utils.utcnow() + datetime.timedelta(seconds=50)
+      while (utils.utcnow() < deadline and build_count[0] < self.BUILD_LIMIT and
+             iterator.has_next()):
+        b = iterator.next()
+        yield b
+        build_count[0] += 1
 
-    if new_entries:  # pragma: no branch
-      logging.info(
-          'enqueuing a task to flush %d tag entries in %d TagIndex entities...',
-          entry_count, len(new_entries)
-      )
-      flush_payload = {
-          'tag': payload['tag'],
-          'new_entries': new_entries,
-      }
-      self._recurse([(None, 'flush', flush_payload)])
+    proc_def['func'](iterate_segment(), proc['payload'])
+    logging.info('processed %d builds', build_count[0])
+
     if iterator.has_next():
       logging.info('enqueuing a task for the next iteration...')
 
       p = payload.copy()
       p['iteration'] += 1
-      p['start_from'] = iterator.next().key.id()
+      p['start_from'] = (
+          iterator.next() if proc_def['keys_only'] else iterator.next().key.id()
+      )
 
       seg_len = seg_end - seg_start
       percent = 100 * (p['start_from'] - seg_start) / seg_len
@@ -218,90 +248,17 @@
     )
 
 
-class TaskFlushTagIndexEntries(TaskBase):
-  """Saves new tag index entries.
-
-  Payload properties:
-    tag: tag to reindex. Required.
-    new_entries: a dict {tag_value: [[bucket, id}]]} of new index entries to
-      add. Required.
-  """
-
-  def do(self, payload):
-    new_entries = payload['new_entries']
-    logging.info(
-        'flushing %d tag entries in %d TagIndex entities',
-        sum(len(es) for es in new_entries.itervalues()), len(new_entries)
-    )
-
-    futs = [
-        self._add_index_entries_async(
-            payload['tag'] + ':' + tag_value, entries
-        ) for tag_value, entries in new_entries.iteritems()
-    ]
-    ndb.Future.wait_all(futs)
-
-    retry_entries = {}
-    updated = 0
-    for (tag, entries), f in zip(new_entries.iteritems(), futs):
-      ex = f.get_exception()
-      if ex:
-        logging.warning('failed to update TagIndex for %r: %s', tag, ex)
-        retry_entries[tag] = entries
-      elif f.get_result():
-        updated += 1
-    logging.info('updated %d TagIndex entities', updated)
-    if retry_entries:
-      logging.warning(
-          'failed to update %d TagIndex entities, retrying...',
-          len(retry_entries)
-      )
-      p = payload.copy()
-      p['new_entries'] = retry_entries
-      self._recurse([(None, 'flush', p)])
-
-  @staticmethod
-  @ndb.transactional_tasklet
-  def _add_index_entries_async(tag, entries):
-    idx_key = search.TagIndex.random_shard_key(tag)
-    idx = (yield idx_key.get_async()) or search.TagIndex(key=idx_key)
-    if idx.permanently_incomplete:
-      # no point in adding entries to an incomplete index.
-      raise ndb.Return(False)
-    existing = {e.build_id for e in idx.entries}
-    added = False
-    for bucket, build_id in entries:
-      if build_id not in existing:
-        if len(idx.entries) >= search.TagIndex.MAX_ENTRY_COUNT:
-          logging.warning((
-              'refusing to store more than %d entries in TagIndex(%s); '
-              'marking as incomplete.'
-          ), search.TagIndex.MAX_ENTRY_COUNT, idx_key.id())
-          idx.permanently_incomplete = True
-          idx.entries = []
-          yield idx.put_async()
-          raise ndb.Return(True)
-
-        idx.entries.append(
-            search.TagIndexEntry(bucket=bucket, build_id=build_id)
-        )
-        added = True
-    if not added:
-      raise ndb.Return(False)
-    yield idx.put_async()
-    raise ndb.Return(True)
-
-
 # mocked in tests.
 def enqueue_tasks(queue_name, tasks):  # pragma: no cover
   """Adds tasks to the queue.
 
   tasks must be a list of tuples (name, url, payload).
   """
-  taskqueue.Queue(queue_name).add([
-      taskqueue.Task(name=name, url=url, payload=payload)
-      for name, url, payload in tasks
-  ])
+  if tasks:
+    taskqueue.Queue(queue_name).add([
+        taskqueue.Task(name=name, url=url, payload=payload)
+        for name, url, payload in tasks
+    ])
 
 
 def get_routes():  # pragma: no cover
@@ -309,5 +266,4 @@
   return [
       webapp2.Route(PATH_PREFIX + r'start', TaskStart),
       webapp2.Route(PATH_PREFIX + r'segment/<rest>', TaskSegment),
-      webapp2.Route(PATH_PREFIX + r'flush', TaskFlushTagIndexEntries),
   ]
diff --git a/appengine/cr-buildbucket/queue.yaml b/appengine/cr-buildbucket/queue.yaml
index b797273..4868f23 100644
--- a/appengine/cr-buildbucket/queue.yaml
+++ b/appengine/cr-buildbucket/queue.yaml
@@ -3,7 +3,7 @@
 - name: backend-default
   target: backend
   rate: 100/s
-- name: backfill-tag-index
+- name: bulkproc
   target: backend
   rate: 100/s
   max_concurrent_requests: 128
diff --git a/appengine/cr-buildbucket/test/api_test.py b/appengine/cr-buildbucket/test/api_test.py
index d0eb97e..f0f44a1 100644
--- a/appengine/cr-buildbucket/test/api_test.py
+++ b/appengine/cr-buildbucket/test/api_test.py
@@ -23,6 +23,7 @@
 from test import config_test
 from test.test_util import future, future_exception
 import api
+import backfill_tag_index
 import config
 import creation
 import errors
@@ -763,27 +764,14 @@
 
   ####### BACKFILL_TAG_INDEX ###################################################
 
-  @mock.patch('api.enqueue_task')
-  def test_backfill_tag_index(self, enqueue_task):
+  @mock.patch('backfill_tag_index.launch')
+  def test_backfill_tag_index(self, launch_tag_index_backfilling):
     auth.bootstrap_group(auth.ADMIN_GROUP, [auth.Anonymous])
-    req = {
-        'tag': 'buildset',
-        'shards': '64',
-    }
+    req = {'tag_key': 'buildset'}
     self.call_api('backfill_tag_index', req, status=(200, 204))
-    enqueue_task.assert_called_once_with(
-        'backfill-tag-index',
-        '/internal/task/backfill-tag-index/start',
-        utils.encode_to_json({
-            'tag': 'buildset',
-            'shards': 64,
-        }),
-    )
+    launch_tag_index_backfilling.assert_called_once_with('buildset')
 
   def test_backfill_tag_index_fails(self):
     auth.bootstrap_group(auth.ADMIN_GROUP, [auth.Anonymous])
     self.call_api('backfill_tag_index', {}, status=400)
-    self.call_api('backfill_tag_index', {'tag': 'buildset'}, status=400)
-    self.call_api(
-        'backfill_tag_index', {'tag': 'buildset', 'shards': 0}, status=400
-    )
+    self.call_api('backfill_tag_index', {'tag_key': 'a:b'}, status=400)
diff --git a/appengine/cr-buildbucket/test/backfill_tag_index_test.py b/appengine/cr-buildbucket/test/backfill_tag_index_test.py
new file mode 100644
index 0000000..c6eff03
--- /dev/null
+++ b/appengine/cr-buildbucket/test/backfill_tag_index_test.py
@@ -0,0 +1,143 @@
+# Copyright 2018 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import contextlib
+import datetime
+import mock
+
+from google.appengine.ext import ndb
+
+from test.test_util import future_exception
+from testing_utils import testing
+import backfill_tag_index
+import bulkproc
+import main
+import model
+import search
+import v2
+
+
+class BackfillTagIndexTest(testing.AppengineTestCase):
+
+  def setUp(self):
+    super(BackfillTagIndexTest, self).setUp()
+    self.patch('search.TagIndex.random_shard_index', return_value=0)
+    self.patch('backfill_tag_index._enqueue_flush_entries')
+
+  def test_process(self):
+    builds = [
+        model.Build(
+            id=i, bucket='chromium', tags=[
+                'buildset:%d' % (i % 3),
+                'a:b',
+            ]
+        ) for i in xrange(50, 60)
+    ]
+
+    backfill_tag_index._process_builds(builds, 'buildset', 5)
+
+    backfill_tag_index._enqueue_flush_entries.assert_called_with(
+        'buildset', {
+            '0': [
+                ['chromium', 51],
+                ['chromium', 54],
+            ],
+            '1': [['chromium', 52]],
+            '2': [
+                ['chromium', 50],
+                ['chromium', 53],
+            ],
+        }
+    )
+
+  def test_flush_entries(self):
+    search.TagIndex(
+        id='buildset:0',
+        entries=[
+            search.TagIndexEntry(bucket='chormium', build_id=51),
+        ]
+    ).put()
+    search.TagIndex(
+        id='buildset:2',
+        entries=[
+            search.TagIndexEntry(bucket='chormium', build_id=1),
+            search.TagIndexEntry(bucket='chormium', build_id=100),
+        ]
+    ).put()
+
+    backfill_tag_index._flush_entries(
+        'buildset',
+        {
+            '0': [['chromium', 51]],
+            '1': [['chromium', 52]],
+            '2': [['chromium', 50]],
+        },
+    )
+
+    idx0 = search.TagIndex.get_by_id('buildset:0')
+    self.assertIsNotNone(idx0)
+    self.assertEqual(len(idx0.entries), 1)
+    self.assertEqual(idx0.entries[0].build_id, 51)
+
+    idx1 = search.TagIndex.get_by_id('buildset:1')
+    self.assertIsNotNone(idx1)
+    self.assertEqual(len(idx1.entries), 1)
+    self.assertEqual(idx1.entries[0].build_id, 52)
+
+    idx2 = search.TagIndex.get_by_id('buildset:2')
+    self.assertIsNotNone(idx2)
+    self.assertEqual(len(idx2.entries), 3)
+    self.assertEqual({e.build_id for e in idx2.entries}, {1, 50, 100})
+
+  def test_flush_entries_retry(self):
+    orig_add_async = backfill_tag_index._add_index_entries_async
+
+    def add_async(tag, entries):
+      if tag == 'buildset:1':
+        return future_exception(Exception('transient error'))
+      return orig_add_async(tag, entries)
+
+    with mock.patch(
+        'backfill_tag_index._add_index_entries_async',
+        side_effect=add_async,
+    ):
+      backfill_tag_index._flush_entries(
+          'buildset',
+          {
+              '0': [['chromium', 51]],
+              '1': [['chromium', 52]],
+              '2': [['chromium', 50]],
+          },
+      )
+
+    idx0 = search.TagIndex.get_by_id('buildset:0')
+    self.assertIsNotNone(idx0)
+    self.assertEqual(len(idx0.entries), 1)
+    self.assertEqual(idx0.entries[0].build_id, 51)
+
+    idx2 = search.TagIndex.get_by_id('buildset:2')
+    self.assertIsNotNone(idx2)
+    self.assertEqual(len(idx2.entries), 1)
+    self.assertEqual(idx2.entries[0].build_id, 50)
+
+    backfill_tag_index._enqueue_flush_entries.assert_called_with(
+        'buildset', {'1': [['chromium', 52]]}
+    )
+
+  def test_flush_entries_too_many(self):
+    backfill_tag_index._flush_entries(
+        'buildset',
+        {'0': [['chromium', i] for i in xrange(1, 2001)]},
+    )
+
+    idx0 = search.TagIndex.get_by_id('buildset:0')
+    self.assertIsNotNone(idx0)
+    self.assertTrue(idx0.permanently_incomplete)
+    self.assertEqual(len(idx0.entries), 0)
+
+    # once more for coverage
+    backfill_tag_index._flush_entries(
+        'buildset',
+        {'0': [['chromium', 1]]},
+    )
diff --git a/appengine/cr-buildbucket/test/bulkproc_test.py b/appengine/cr-buildbucket/test/bulkproc_test.py
index 3e6c849..a5d9618 100644
--- a/appengine/cr-buildbucket/test/bulkproc_test.py
+++ b/appengine/cr-buildbucket/test/bulkproc_test.py
@@ -4,6 +4,7 @@
 
 import contextlib
 import datetime
+import itertools
 import mock
 
 from google.appengine.ext import ndb
@@ -30,12 +31,11 @@
     super(TestBase, self).setUp()
     self.now = datetime.datetime(2017, 1, 1)
     self.patch('components.utils.utcnow', side_effect=lambda: self.now)
-    self.patch('search.TagIndex.random_shard_index', return_value=0)
 
   def post(self, payload, headers=None):
     assert self.path_suffix
     headers = headers or {}
-    headers['X-AppEngine-QueueName'] = 'backfill-tag-index'
+    headers['X-AppEngine-QueueName'] = bulkproc.QUEUE_NAME
     headers['X-AppEngine-TaskName'] = 'taskname'
     task_url = bulkproc.PATH_PREFIX + self.path_suffix
     return self.test_app.post(
@@ -46,7 +46,7 @@
 class StartTest(TestBase):
   path_suffix = 'start'
 
-  @mock.patch('bulkproc.enqueue_tasks')
+  @mock.patch('bulkproc.enqueue_tasks', autospec=True)
   def test_start(self, enqueue_tasks):
     ndb.put_multi([
         model.Build(
@@ -55,67 +55,71 @@
             create_time=self.now - datetime.timedelta(minutes=i)
         ) for i in xrange(1, 11)
     ])
-    self.post({'tag': 'buildset', 'shards': 3})
+    proc = {'name': 'foo', 'payload': 'bar'}
+    self.post({
+        'shards': 3,
+        'proc': proc,
+    })
 
     seg_path_prefix = bulkproc.PATH_PREFIX + 'segment/'
     enqueue_tasks.assert_called_with(
-        'backfill-tag-index', [
+        'bulkproc', [
             (
                 None,
                 seg_path_prefix + 'seg:0-percent:0',
                 utils.encode_to_json({
-                    'tag': 'buildset',
                     'job_id': 'taskname',
                     'iteration': 0,
                     'seg_index': 0,
                     'seg_start': 1,
                     'seg_end': 4,
                     'started_ts': utils.datetime_to_timestamp(self.now),
+                    'proc': proc,
                 }),
             ),
             (
                 None,
                 seg_path_prefix + 'seg:1-percent:0',
                 utils.encode_to_json({
-                    'tag': 'buildset',
                     'job_id': 'taskname',
                     'iteration': 0,
                     'seg_index': 1,
                     'seg_start': 4,
                     'seg_end': 7,
                     'started_ts': utils.datetime_to_timestamp(self.now),
+                    'proc': proc,
                 }),
             ),
             (
                 None,
                 seg_path_prefix + 'seg:2-percent:0',
                 utils.encode_to_json({
-                    'tag': 'buildset',
                     'job_id': 'taskname',
                     'iteration': 0,
                     'seg_index': 2,
                     'seg_start': 7,
                     'seg_end': 10,
                     'started_ts': utils.datetime_to_timestamp(self.now),
+                    'proc': proc,
                 }),
             ),
             (
                 None,
                 seg_path_prefix + 'seg:3-percent:0',
                 utils.encode_to_json({
-                    'tag': 'buildset',
                     'job_id': 'taskname',
                     'iteration': 0,
                     'seg_index': 3,
                     'seg_start': 10,
                     'seg_end': 11,
                     'started_ts': utils.datetime_to_timestamp(self.now),
+                    'proc': proc,
                 }),
             ),
         ]
     )
 
-  @mock.patch('bulkproc.enqueue_tasks')
+  @mock.patch('bulkproc.enqueue_tasks', autospec=True)
   def test_start_many_shards(self, enqueue_tasks):
     ndb.put_multi([
         model.Build(
@@ -125,8 +129,8 @@
         ) for i in xrange(1, 150)
     ])
     self.post({
-        'tag': 'buildset',
         'shards': 100,
+        'proc': {'name': 'foo', 'payload': 'bar'},
     })
 
     self.assertEqual(enqueue_tasks.call_count, 2)
@@ -135,16 +139,15 @@
 class SegmentTest(TestBase):
   path_suffix = 'segment/rest'
 
-  @contextlib.contextmanager
-  def entry_limit(self, limit):
-    orig_entry_limit = bulkproc.TaskSegment.ENTRY_LIMIT
-    bulkproc.TaskSegment.ENTRY_LIMIT = limit
-    try:
-      yield
-    finally:
-      bulkproc.TaskSegment.ENTRY_LIMIT = orig_entry_limit
+  def setUp(self):
+    super(SegmentTest, self).setUp()
+    self.proc = {
+        'func': lambda builds, _: list(builds),  # process all
+        'keys_only': False,
+    }
+    self.patch('bulkproc._get_proc', return_value=self.proc)
 
-  @mock.patch('bulkproc.enqueue_tasks')
+  @mock.patch('bulkproc.enqueue_tasks', autospec=True)
   def test_segment_partial(self, enqueue_tasks):
     ndb.put_multi([
         model.Build(
@@ -155,40 +158,25 @@
         ) for i in xrange(50, 60)
     ])
 
-    with self.entry_limit(5):
-      self.post({
-          'tag': 'buildset',
-          'job_id': 'jobid',
-          'iteration': 0,
-          'seg_index': 0,
-          'seg_start': 50,
-          'seg_end': 60,
-          'started_ts': utils.datetime_to_timestamp(self.now),
-      })
+    def process(builds, payload):
+      # process 5 builds
+      page = list(itertools.islice(builds, 5))
+      self.assertEqual([b.key.id() for b in page], range(50, 55))
+      self.assertEqual(payload, 'bar')
 
-    enqueue_tasks.assert_any_call(
-        'backfill-tag-index', [(
-            None,
-            bulkproc.PATH_PREFIX + 'flush',
-            utils.encode_to_json({
-                'tag': 'buildset',
-                'new_entries': {
-                    '0': [
-                        ['chromium', 51],
-                        ['chromium', 54],
-                    ],
-                    '1': [['chromium', 52]],
-                    '2': [
-                        ['chromium', 50],
-                        ['chromium', 53],
-                    ],
-                },
-            }),
-        )]
-    )
+    self.proc['func'] = process
+
+    self.post({
+        'job_id': 'jobid',
+        'iteration': 0,
+        'seg_index': 0,
+        'seg_start': 50,
+        'seg_end': 60,
+        'started_ts': utils.datetime_to_timestamp(self.now),
+        'proc': {'name': 'foo', 'payload': 'bar'},
+    })
 
     expected_next_payload = {
-        'tag': 'buildset',
         'job_id': 'jobid',
         'iteration': 1,
         'seg_index': 0,
@@ -196,10 +184,10 @@
         'seg_end': 60,
         'start_from': 55,
         'started_ts': utils.datetime_to_timestamp(self.now),
+        'proc': {'name': 'foo', 'payload': 'bar'},
     }
-    print enqueue_tasks.call_args_list
-    enqueue_tasks.assert_any_call(
-        'backfill-tag-index',
+    enqueue_tasks.assert_called_with(
+        'bulkproc',
         [(
             'jobid-0-1',
             bulkproc.PATH_PREFIX + 'segment/seg:0-percent:50',
@@ -207,177 +195,63 @@
         )],
     )
 
-    self.post(expected_next_payload)
-
-  @mock.patch('bulkproc.enqueue_tasks')
+  @mock.patch('bulkproc.enqueue_tasks', autospec=True)
   def test_segment_full(self, enqueue_tasks):
     ndb.put_multi([
         model.Build(id=i, bucket='chromium', tags=['buildset:%d' % (i % 3)])
         for i in xrange(50, 52)
     ])
     self.post({
-        'tag': 'buildset',
+        'job_id': 'jobid',
+        'iteration': 0,
         'seg_index': 0,
         'seg_start': 50,
         'seg_end': 60,
         'started_ts': utils.datetime_to_timestamp(self.now),
+        'proc': {'name': 'foo', 'payload': 'bar'},
     })
 
-    self.assertEqual(enqueue_tasks.call_count, 1)
-    enqueue_tasks.assert_called_with(
-        'backfill-tag-index',
-        [(
-            None,
-            bulkproc.PATH_PREFIX + 'flush',
-            utils.encode_to_json({
-                'tag': 'buildset',
-                'new_entries': {
-                    '0': [['chromium', 51]],
-                    '2': [['chromium', 50]],
-                },
-            }),
-        )],
-    )
+    self.assertEqual(enqueue_tasks.call_count, 0)
 
-  @mock.patch('bulkproc.enqueue_tasks')
+  @mock.patch('bulkproc.enqueue_tasks', autospec=True)
   def test_segment_attempt_2(self, enqueue_tasks):
     ndb.put_multi([
         model.Build(id=i, bucket='chromium', tags=['buildset:%d' % (i % 3)])
         for i in xrange(50, 60)
     ])
 
-    with self.entry_limit(1):
-      headers = {
-          'X-AppEngine-TaskExecutionCount': '1',
-      }
-      self.post(
-          {
-              'tag': 'buildset',
-              'job_id': 'jobid',
-              'iteration': 0,
-              'seg_index': 0,
-              'seg_start': 50,
-              'seg_end': 60,
-              'started_ts': utils.datetime_to_timestamp(self.now),
-          },
-          headers=headers,
-      )
+    # process 5 builds
+    self.proc['func'] = lambda builds, _: list(itertools.islice(builds, 5))
 
-    enqueue_tasks.assert_any_call(
-        'backfill-tag-index', [(
+    self.post(
+        {
+            'job_id': 'jobid',
+            'iteration': 0,
+            'seg_index': 0,
+            'seg_start': 50,
+            'seg_end': 60,
+            'started_ts': utils.datetime_to_timestamp(self.now),
+            'proc': {'name': 'foo', 'payload': 'bar'},
+        },
+        headers={
+            'X-AppEngine-TaskExecutionCount': '1',
+        },
+    )
+
+    enqueue_tasks.assert_called_with(
+        'bulkproc',
+        [(
             'jobid-0-1',
-            bulkproc.PATH_PREFIX + 'segment/seg:0-percent:10',
+            bulkproc.PATH_PREFIX + 'segment/seg:0-percent:50',
             utils.encode_to_json({
-                'tag': 'buildset',
                 'job_id': 'jobid',
                 'iteration': 1,
                 'seg_index': 0,
                 'seg_start': 50,
                 'seg_end': 60,
-                'start_from': 51,
+                'start_from': 55,
                 'started_ts': utils.datetime_to_timestamp(self.now),
+                'proc': {'name': 'foo', 'payload': 'bar'},
             }),
-        )]
+        )],
     )
-
-
-class FlushTest(TestBase):
-  path_suffix = 'flush'
-
-  def test_flush(self):
-    search.TagIndex(
-        id='buildset:0',
-        entries=[
-            search.TagIndexEntry(bucket='chormium', build_id=51),
-        ]
-    ).put()
-    search.TagIndex(
-        id='buildset:2',
-        entries=[
-            search.TagIndexEntry(bucket='chormium', build_id=1),
-            search.TagIndexEntry(bucket='chormium', build_id=100),
-        ]
-    ).put()
-    self.post({
-        'tag': 'buildset',
-        'new_entries': {
-            '0': [['chromium', 51]],
-            '1': [['chromium', 52]],
-            '2': [['chromium', 50]],
-        },
-    })
-
-    idx0 = search.TagIndex.get_by_id('buildset:0')
-    self.assertIsNotNone(idx0)
-    self.assertEqual(len(idx0.entries), 1)
-    self.assertEqual(idx0.entries[0].build_id, 51)
-
-    idx1 = search.TagIndex.get_by_id('buildset:1')
-    self.assertIsNotNone(idx1)
-    self.assertEqual(len(idx1.entries), 1)
-    self.assertEqual(idx1.entries[0].build_id, 52)
-
-    idx2 = search.TagIndex.get_by_id('buildset:2')
-    self.assertIsNotNone(idx2)
-    self.assertEqual(len(idx2.entries), 3)
-    self.assertEqual({e.build_id for e in idx2.entries}, {1, 50, 100})
-
-  @mock.patch('bulkproc.enqueue_tasks')
-  def test_flush_retry(self, enqueue_tasks):
-    orig_add = bulkproc.TaskFlushTagIndexEntries._add_index_entries_async
-
-    def add(tag, entries):
-      if tag == 'buildset:1':
-        return future_exception(Exception('transient error'))
-      return orig_add(tag, entries)
-
-    with mock.patch(
-        'bulkproc.TaskFlushTagIndexEntries._add_index_entries_async',
-        side_effect=add,
-    ):
-      self.post({
-          'tag': 'buildset',
-          'new_entries': {
-              '0': [['chromium', 51]],
-              '1': [['chromium', 52]],
-              '2': [['chromium', 50]],
-          },
-      })
-
-    idx0 = search.TagIndex.get_by_id('buildset:0')
-    self.assertIsNotNone(idx0)
-    self.assertEqual(len(idx0.entries), 1)
-    self.assertEqual(idx0.entries[0].build_id, 51)
-
-    idx2 = search.TagIndex.get_by_id('buildset:2')
-    self.assertIsNotNone(idx2)
-    self.assertEqual(len(idx2.entries), 1)
-    self.assertEqual(idx2.entries[0].build_id, 50)
-
-    enqueue_tasks.assert_called_with(
-        'backfill-tag-index', [(
-            None,
-            bulkproc.PATH_PREFIX + 'flush',
-            utils.encode_to_json({
-                'tag': 'buildset',
-                'new_entries': {'1': [['chromium', 52]]},
-            }),
-        )]
-    )
-
-  def test_flush_too_many(self):
-    self.post({
-        'tag': 'buildset',
-        'new_entries': {'0': [['chromium', i] for i in xrange(1, 2001)]},
-    })
-
-    idx0 = search.TagIndex.get_by_id('buildset:0')
-    self.assertIsNotNone(idx0)
-    self.assertTrue(idx0.permanently_incomplete)
-    self.assertEqual(len(idx0.entries), 0)
-
-    # Again, for code coverage.
-    self.post({
-        'tag': 'buildset',
-        'new_entries': {'0': [['chromium', 1]]},
-    })