blob: 401a086d31f251decdb3cf654f88eb1719e87bab [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tag index backfilling."""
import collections
import logging
from google.appengine.ext import deferred
from google.appengine.ext import ndb
import buildtags
import bulkproc
import search
# Maximum number of entries to collect in a single iteration. This helps
# avoiding hitting the limit of task size and caps the number of transactions we
# need to do in a flush task.
ENTRY_LIMIT = 1000
PROC_NAME = 'backfill_tag_index'
bulkproc.register(
PROC_NAME,
lambda builds, payload: _process_builds(
builds, payload['tag_key'], ENTRY_LIMIT)
)
def launch(tag_key): # pragma: no cover
assert tag_key
assert ':' not in tag_key
assert isinstance(tag_key, basestring)
bulkproc.start(PROC_NAME, {'tag_key': tag_key})
def _process_builds(builds, tag_key, entry_limit):
entry_count = 0
new_entries = collections.defaultdict(list)
for b in builds: # pragma: no branch
for t in b.tags:
k, v = buildtags.parse(t)
if k == tag_key:
new_entries[v].append([b.bucket_id, b.key.id()])
entry_count += 1
if entry_count >= entry_limit:
break
if entry_count >= entry_limit:
break
logging.info('collected %d entries', entry_count)
_enqueue_flush_entries(tag_key, new_entries)
def _enqueue_flush_entries(tag_key, new_entries): # pragma: no cover
if new_entries:
deferred.defer(
_flush_entries,
tag_key,
new_entries,
_queue=bulkproc.QUEUE_NAME,
)
def _flush_entries(tag_key, new_entries):
"""Adds new entries to TagIndex entities.
new_entries is {tag_value: [[bucket_id, build_id]]}.
"""
logging.info(
'flushing %d tag entries in %d TagIndex entities for tag key %s',
sum(len(es) for es in new_entries.itervalues()),
len(new_entries),
tag_key,
)
new_entry_items = new_entries.items()
futs = [
_add_index_entries_async(buildtags.unparse(tag_key, tag_value), entries)
for tag_value, entries in new_entry_items
]
retry_entries = {}
updated = 0
for (tag_value, entries), f in zip(new_entry_items, futs):
ex = f.get_exception()
if ex:
logging.warning('failed to update TagIndex for "%s" %s', tag_value, ex)
retry_entries[tag_value] = entries
elif f.get_result():
updated += 1
logging.info('updated %d TagIndex entities', updated)
if retry_entries:
logging.warning(
'failed to update %d TagIndex entities, retrying...',
len(retry_entries),
)
_enqueue_flush_entries(tag_key, retry_entries)
@ndb.transactional_tasklet
def _add_index_entries_async(tag, entries):
"""Adds TagIndexEntries to one TagIndex.
entries is [[bucket_id, build_id]].
Returns True if made changes.
"""
idx_key = search.TagIndex.random_shard_key(tag)
idx = (yield idx_key.get_async()) or search.TagIndex(key=idx_key)
if idx.permanently_incomplete:
# no point in adding entries to an incomplete index.
raise ndb.Return(False)
existing = {e.build_id for e in idx.entries}
added = False
for bucket_id, build_id in entries:
if build_id not in existing:
if len(idx.entries) >= search.TagIndex.MAX_ENTRY_COUNT:
logging.warning((
'refusing to store more than %d entries in TagIndex(%s); '
'marking as incomplete.'
), search.TagIndex.MAX_ENTRY_COUNT, idx_key.id())
idx.permanently_incomplete = True
idx.entries = []
yield idx.put_async()
raise ndb.Return(True)
idx.entries.append(
search.TagIndexEntry(bucket_id=bucket_id, build_id=build_id)
)
added = True
if not added:
raise ndb.Return(False)
yield idx.put_async()
raise ndb.Return(True)