blob: 1b0578a72a970ec18a31ce6bee20fe3908619181 [file]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Defines the period to wait until a new database entry to appear."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import datetime
import pytz
class GracePeriod(object):
"""Defines the period to wait until a new database entry to appear.
Imported database rows have autoincrement integer primary keys. This means
their primary keys are usually contiguous, but not always. For example,
a row with ID=7 might be committed after a row with ID=8 is committed if the
transaction for ID=7 takes more time than that for ID=8. If we scan the table
during such concurrent transactions, we observe an "ID skip".
We have two notable reasons to observe ID skips: concurrent transactions and
rolled-back transactions. For the former case, ID skips will be eventually
filled, but for the latter case, ID skips will remain forever.
In order not to drop rows, on encountering ID skips, importers will wait
for them to be filled for some time ("grace period"). While waiting,
importers will not process rows after the ID skips.
Grace period is defined by two constraints: timeout and capacity.
1. timeout: A duration. If the insertion timestamp of the row next to an ID
skip is older than this, we assume it will never be filled.
2. capacity: A number. If the number of committed rows after an ID skip is
more than this number, we assume it will never be filled.
A valid grace period has at least one valid constraint set.
Note that, whenever possible, timeout constraint should be preferred over
capacity constraint because it is difficult to guess the maximum rate of
insertion.
"""
def __init__(self, id_column, insertion_timestamp_column, timeout, capacity):
"""Constructs a grace period object.
Args:
id_column: Name of the ID column.
insertion_timestamp_column: Name of the insertion timestamp column.
timeout: datetime.timedelta object specifying the grace period timeout.
This can be None.
capacity: The number of entries specifying the grace period capacity.
This can be None.
"""
# At least one of the constraints must be valid.
assert timeout is not None or capacity is not None
self._id_column = id_column
self._insertion_timestamp_column = insertion_timestamp_column
self._timeout = timeout
self._capacity = capacity
def FilterEntries(self, entries, now=None):
"""Filters out entries in the grace period.
This method must be called in AbstractImporter implementations.
Args:
entries: A list of dictionaries representing entries.
now: The current datetime. If not given, the system time is used.
Returns:
A list of filtered entries.
"""
if now is None:
now = datetime.datetime.now(pytz.utc)
entries = sorted(entries, key=lambda e: e[self._id_column])
num_entries = len(entries)
for i, entry in enumerate(entries):
# Check if this entry is just after skips. For the first entry, we always
# assume it is after skips for safety.
if (i == 0 or
entry[self._id_column] - entries[i - 1][self._id_column] != 1):
if not self._IsGracePeriodFinished(entry, num_entries - i, now):
return entries[:i]
return entries
def _IsGracePeriodFinished(self, next_entry, num_following_entries, now):
"""Checks if grace period for a skip has finished.
Args:
next_entry: The first committed entry after a skip.
num_following_entries: The number of committed entries after a skip.
now: The current datetime.
Returns:
True if grace period has finished.
"""
if (self._timeout is not None and
next_entry[self._insertion_timestamp_column] < now - self._timeout):
return True
if self._capacity is not None and num_following_entries >= self._capacity:
return True
return False