blob: 1a48ad0d0769fda9b0c3c2aefdf29ddd2f0fbae7 [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Sequence generator for testlog's seq field."""
import json
import logging
import os
import time
from .utils import file_utils
# Each boot, the sequence number increases by this amount, to try to
# help ensure monotonicity.
#
# For example, say we write events #55 and #56 to the event file and
# sync them to the Shopfloor server, but then we have a power problem
# and then lose those events before they are completely flushed to
# disk. On reboot, the last event we will find in the events file is
# #54, so if we started again with #55 we would violate monotonicity
# in the Shopfloor server record. But this way we will start with
# sequence number #1000055.
#
# This is not bulletproof: we could write and sync event #1000055,
# then have a similar power problem, and on the next reboot write and
# sync event #1000055 again. But this is much more unlikely than the
# above scenario.
SEQ_INCREMENT_ON_BOOT = 1000000
FILELOCK_WAITSECS = 0.5
class SeqGenerator(object):
"""Maintains a monotonically increasing sequence in best effort.
Currently, only support a sequence recovery (i.e. sequence file is missing
from disk) for JSON format log. FileLock is used to prevent racing across
different process.
Args:
path: Path to the sequence number file.
log_file_path: Path to JSON log while recovery tried when absence of
sequence number file.
_after_read: A function to call immediately after reading the sequence
number (for testing).
_filelock_waitsecs: It needs to be longer if lots of threads / processes
competing for the same sequence file. Half seconds should be sufficient
for most of the cases (expecting 1000 log accesses per second). For
special testing purposes, this can be tweaked.
"""
def __init__(self, path, log_file_path, _after_read=lambda: True,
_filelock_waitsecs=FILELOCK_WAITSECS):
self.path = path
self.log_file_path = log_file_path
self._after_read = _after_read
self._filelock_waitsecs = _filelock_waitsecs
self._Create()
def _Create(self):
"""Creates the file if it does not yet exist or is invalid."""
file_utils.TryMakeDirs(os.path.dirname(self.path))
with file_utils.FileLock(self.path, self._filelock_waitsecs):
with open(self.path, 'r+') as f: # FileLock verified the existence.
contents = f.read()
if contents:
try:
_ = int(contents)
return # It's all good.
except ValueError:
logging.exception(
'Sequence number file %s contains non-integer %r',
self.path, contents)
value = self._FindNextSequenceNumber()
f.write(str(value))
logging.info('Created global sequence file %s with sequence number %d',
self.path, value)
def _GetLastSeq(self):
"""Finds the next sequence number based on the log file.
This is the current maximum sequence number (or 0 if none is found). Takes
the last line as the most recent event.
"""
try:
if not self.log_file_path:
# No log file is assigned for recovery purpose.
logging.exception(
'No JSON log file is assigned for recovery purpose. Starts from 0')
return 0
if (not os.path.exists(self.log_file_path) or
os.path.getsize(self.log_file_path) == 0):
# There is no log file, or it is empty. It's safe to start at 0.
return 0
except os.error:
# If the size can't be accessed for some reason, let's still try to
# continue to the recovery phase.
pass
try:
max_seq = 0
seq = None
last_line = ''
for l in open(self.log_file_path).readlines():
# Attempt to load the JSON to get the seq.
try:
seq = int(json.loads(l)['seq'])
max_seq = max(max_seq, seq)
except (ValueError, KeyError):
pass
last_line = l
if not seq:
# This could potentially happen if the JSON file was very small
# and every single line was corrupted.
logging.exception('JSON file %s is corrupted, last line is %r',
self.log_file_path, last_line)
return None
return max_seq + 1
except os.error:
# This should really never happen; maybe the JSON file is
# so corrupted that a read operation is failing.
logging.exception('Failed to read JSON file %s', self.log_file_path)
return None
def _FindNextSequenceNumber(self):
"""Recovers the sequence number and add SEQ_INCREMENT_ON_BOOT."""
recovery_seq = self._GetLastSeq()
if recovery_seq is None:
# This should really never happen; maybe the events file is
# so corrupted that a read operation is failing.
logging.exception('Unable to find next sequence number from '
'events file; using system time in ms')
return int(time.time() * 1000)
elif recovery_seq == 0:
# There is no log file. It's safe to start at 0.
return recovery_seq
else:
return recovery_seq + SEQ_INCREMENT_ON_BOOT
def _NextOrRaise(self):
"""Returns the next sequence number, raising an exception on failure."""
with file_utils.FileLock(self.path, self._filelock_waitsecs):
with open(self.path, 'r+') as f:
# The file will be closed, and the lock freed, as soon as this
# block goes out of scope.
value = int(f.read())
self._after_read()
f.seek(0)
f.write(str(value + 1))
# Don't bother flushing to disk. If a reboot occurs before flushing, the
# sequence number will be increased by SEQ_INCREMENT_ON_BOOT,
# maintaining the monotonicity property.
return value
def Current(self):
"""Returns the last-used sequence number, or None on failure."""
try:
with file_utils.FileLock(self.path, self._filelock_waitsecs):
with open(self.path, 'r') as f:
value = int(f.read()) - 1
return value
except (IOError, OSError, ValueError):
logging.exception('Unable to read global sequence number from %s; ',
self.path)
return None
def Next(self):
"""Returns the next sequence number.
This needs to be run in the context of the log file being locked.
Otherwise, there's a chance that the same `seq` number will be produced
by two separate processes.
"""
try:
return self._NextOrRaise()
except (IOError, OSError, ValueError):
logging.exception('Unable to read global sequence number from %s; '
'trying to re-create', self.path)
# This should really never happen (unless, say, some process
# corrupts or deletes the file). Try our best to re-create it;
# this is not completely safe but better than totally hosing the
# machine. On failure, we're really screwed, so just propagate
# any exception.
file_utils.TryUnlink(self.path)
self._Create()
return self._NextOrRaise()