blob: ea7880d85ff4dae7ac0e5defb197c8ce1347801f [file] [log] [blame]
# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
from google.appengine.ext import ndb
from components import utils
import metrics
class NumberSequence(ndb.Model):
"""A named number sequence.
Entity key:
ID is the name of the sequence.
NumberSequence has no parent.
# Next number in the sequence.
next_number = ndb.IntegerProperty(default=1, indexed=False)
def _migrate_entity_async(seq_name):
"""Migrates NumberSequence from old name to the new name."""
parts = seq_name.split('/', 2) # project, bucket, builder
if len(parts) != 3:
new = yield NumberSequence.get_by_id_async(seq_name)
if new:
# New entity exists, so there is nothing to migrate.
old_name = 'luci.%s.%s/%s' % tuple(parts)
@ndb.transactional_tasklet(xg=True) # pylint: disable=no-value-for-parameter
def txn_async():
old, new = yield (
if new or not old:
new = NumberSequence(id=seq_name, next_number=old.next_number)
yield (new.put_async(), old.key.delete_async())
yield txn_async()
def generate_async(seq_name, count):
"""Generates sequence numbers.
Supports up to 5 QPS. If we need more, we will need to implement something
more advanced.
name: name of the sequence.
count: number of sequence numbers to allocate.
The generated number. For a returned number i, numbers [i, i+count) can be
used by the caller.
yield _migrate_entity_async(seq_name)
def txn():
seq = ((yield NumberSequence.get_by_id_async(seq_name)) or
result = seq.next_number
seq.next_number += count
yield seq.put_async()
raise ndb.Return(result)
started = utils.utcnow()
number = yield txn()
ellapsed_ms = (utils.utcnow() - started).total_seconds() * 1000
if ellapsed_ms > 1000: # pragma: no cover
'sequence number generation took > 1s\n'
'it took %dms\n'
'sequence: %s', ellapsed_ms, seq_name
else:'sequence number generation took %dms', ellapsed_ms)
ellapsed_ms, fields={'sequence': seq_name}
raise ndb.Return(number)
def set_next(seq_name, next_number):
"""Sets the next number to generate.
name: name of the sequence.
next_number: the next number. Cannot be less than the number
that would be generated otherwise.
ValueError if the supplied number is too small.
def txn():
assert isinstance(next_number, int)
seq = NumberSequence.get_by_id(seq_name) or NumberSequence(id=seq_name)
if next_number == seq.next_number:
elif next_number < seq.next_number:
raise ValueError('next number must be at least %d' % seq.next_number)
seq.next_number = next_number
def try_return_async(seq_name, number):
"""Attempts to return the generated number back to the sequence.
Returns False if the number wasn't returned and cannot be reused by someone
yield _migrate_entity_async(seq_name)
def txn_async():
seq = yield NumberSequence.get_by_id_async(seq_name)
if not seq:
# If there is not sequence entity, the number can be used by someone else.
raise ndb.Return(True)
if seq.next_number != number + 1:
raise ndb.Return(False)
seq.next_number = number
yield seq.put_async()
raise ndb.Return(True)
ret = yield txn_async()
raise ndb.Return(ret)
def builder_seq_name(bucket_id, builder): # pragma: no cover
"""Returns name of a number sequence for the builder."""
return '%s/%s' % (bucket_id, builder)