blob: 803105f41c12936c782bc4f418b403d81c77d136 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Main logics for archiving raw logs into unified archives."""
import hashlib
import logging
import os
import re
import shutil
import tempfile
import time
import uuid
import yaml
from subprocess import PIPE, Popen
from twisted.internet import reactor
import common
from common import GetMetadataPath, METADATA_DIRECTORY
SNAPSHOT = '.snapshot'
# These suffix indicate another process is using.
SKIP_SUFFIX = ['.part', '.inprogress', '.lock', '.swp']
ARCHIVER_SOURCE_FILES = ['archiver.py', 'archiver_exception.py',
'archiver_cli.py', 'archiver_config.py']
# Global variable to keep locked file open during process life-cycle
locks = []
def _ListEligibleFiles(dir_path):
"""Returns a list of files consider to be archived.
Corrupted or missing metadata will be fixed during the travesal process.
Args:
dir_path: direcotry path wehre to search eligible files.
Returns:
A list contains tuples for eligible files, having appended bytes since
last archiving cycle. The tuple consist of three fields, namely
(last_completed_bytes, current_size, full path of the file).
"""
ret = []
for current_dir, _, filenames in os.walk(dir_path):
# The second arguments is sub-directories that we don't care because
# we focus on files and os.walk will traverse all of them eventually.
if os.path.basename(current_dir) == METADATA_DIRECTORY:
logging.debug('Metadata directory %r found, skipped.', current_dir)
continue
logging.debug('Scanning directory %r...', current_dir)
for filename in filenames:
# Check its suffix.
if any(filename.endswith(x) for x in SKIP_SUFFIX):
logging.debug(
'File %r is possibily been used by other process, skipped.',
filename)
continue
# Get the bytes information
full_path = os.path.join(current_dir, filename)
appended_range = _GetRangeOfAppendedBytes(full_path, current_dir)
if (appended_range[1] - appended_range[0]) <= 0:
logging.debug(
'No bytes appended to file %r since last archiving. Skipped.',
full_path)
else:
ret.append((appended_range[0], appended_range[1], full_path))
return ret
def _GetRangeOfAppendedBytes(file_path, dir_path=None):
"""Returns a tuple indicating range of appended bytes.
Regenerate metadata if
1) completed_bytes field is unreasonable (completed_bytes > filesize).
2) completed_bytes field is missing.
Args:
file_path: The path to the file of interested.
dir_path:
The directory path of the file_path. If the caller has the infomation
of its directory name in place, we can save a call of calling
os.path.dirname() by assigning this.
Returns:
A tuple indicates the appended range [start_pos, end_pos).
The start_pos is included but not the end_pos. It doesn't garantee to be
reasonable (i.e. end_pos > start_pos). It levaes to caller to decide
how to cope with these tuple.
"""
metadata_path = GetMetadataPath(file_path, dir_path)
metadata = common.GetOrCreateArchiverMetadata(metadata_path)
current_size = os.path.getsize(file_path)
# current_size+1 to trigger if completed_bytes is not in the dictionary.
completed_bytes = metadata.get('completed_bytes', current_size + 1)
if completed_bytes > current_size:
logging.info(
'Metadata %r seems corrupted. Either completed_bytes cannot '
'be found or is larger than current file size. Reconstruct it.',
metadata_path)
common.RegenerateArchiverMetadataFile(metadata_path)
completed_bytes = 0
return (completed_bytes, current_size)
def _UpdateArchiverMetadata(new_metadatas, config):
"""Updates the metadata of files.
Args:
new_metadatas:
A dictionary from the archive's metadata under key 'files', which
indicated the successful archvied files. The value of 'files' is in
format like below:
{
'filename1: {'start': start position, 'end': end position},
'filename2: {'start': start position, 'end': end position},
}
filename is the relative path from config.source_dir or
dirname(config.source_file). End and start position are archived range
of filename.
"""
root_dir = (os.path.dirname(config.source_file) if
config.source_file else config.source_dir)
for filename, archived_range in new_metadatas.iteritems():
full_path = os.path.join(root_dir, filename)
metadata_path = GetMetadataPath(full_path)
logging.info('Updating %s', metadata_path)
with open(metadata_path, 'w') as fd:
fd.write(common.GenerateArchiverMetadata(
completed_bytes=archived_range['end']))
def _CopyCompleteChunks(files, tmp_dir, config):
"""Identifies chunks and copies them into tmp_dir.
This function will copy the non archived, completed chunks into temporary
directory for later processing. If no delimiter assigned in config, all
non archived bytes will be treated as a complete chunk.
The structure of the files will be preserved. For example
abcd/foo.txt -> /tmp_dir/abcd/foo.txt
Args:
files:
A list of tuple about non archived bytes in the form (start position,
end position, full path of the file).
tmp_dir: Path to the temporary directory where new files are created.
config: An ArchiverConfig to indicate delimiter information.
Returns:
The metadata contains the copied bytes. In the form of a dictionary
{'files':
'filename1: {'start': start position, 'end': end position},
'filename2: {'start': start position, 'end': end position},
....
'archiver': dictionary form of ArchiverConfig
'umpire': {}
}
"""
archive_metadata = {'files': {},
'archiver': config.ToDictionary(),
'umpire': {}}
source_dir = (
os.path.dirname(config.source_file) if config.source_file else
config.source_dir)
for start, end, filename in files:
logging.debug(
'Identifying appended bytes of %r into temporary directory %r',
filename, tmp_dir)
last_complete_chunk_pos = appended_length = end - start
with open(filename, 'r') as fd:
fd.seek(start)
appended = fd.read(appended_length)
# Search for the complete chunk
if config.delimiter:
matches = list(config.delimiter.finditer(appended, re.MULTILINE))
if not len(matches):
logging.debug('No complete chunk in appended bytes for %r', filename)
continue
last_complete_chunk_pos = matches[-1].end()
if (start + last_complete_chunk_pos) < end:
logging.debug('One incomplete chunk will be skipped: %r',
appended[last_complete_chunk_pos:])
end = start + last_complete_chunk_pos
relative_path = os.path.relpath(filename, source_dir)
# Create related sub-directories
tmp_full_path = os.path.join(tmp_dir, relative_path)
try:
common.TryMakeDirs(os.path.dirname(tmp_full_path), raise_exception=True)
except Exception:
logging.error(
'Failed to create sub-directory for %r in temporary folder %r',
tmp_full_path, tmp_dir)
with open(tmp_full_path, 'w') as output_fd:
output_fd.write(appended[:last_complete_chunk_pos])
# Add to the metadata
archive_metadata['files'][relative_path] = {'start': start, 'end': end}
logging.debug('%r for archiving prepared.', filename)
return archive_metadata
def _GenerateArchiveName(config):
"""Returns an archive name based on the ArchiverConfig.
The name follows the format:
<project>,<data_types>,<formatted_time>,<hash>.<compress_format>
For example:
spring,report,20130205T1415Z,c7c62ea462.zip
spring,regcode,20130307T1932Z,c7a462a462.tar.xz.pgp
spring,eventlog,20130307T1945Z,c7a462c5a4.tar.xz
The hash has 10 hex digits. The 10 digits will separate into two parts,
first 2 digits for identifying host (Umpire) and following 8 digits are
generated randomly.
"""
# pylint: disable=E1101
prefix = ','.join(
[config.project, config.data_type,
time.strftime('%Y%m%dT%H%MZ', time.gmtime(time.time())),
(hashlib.sha256(str(uuid.getnode())).hexdigest()[:2] +
str(uuid.uuid4())[:8])])
# TODO(itspeter): If config.encrypt_key put additional string to suffix.
suffix = config.compress_format
return prefix + suffix
def _Recycle(config):
"""Recycles directory from raw source if possible.
A directory under config.source_dir will be identified as recyclable if
time criteria and snapshot criteria are both met.
config.source_file will not be recycled because of potential racing writing.
1) Time criteria:
The directory name can be recognized by _ConvertToTimestamp() and the
time difference between current time are larger than
config.save_to_recycle_duration.
2) Snapshop criteria:
The .archiver/.snapshot has identical content as the one we are going to
create.
Although it shouldn't happen logically (unless the system time is drifted
a lot), we still have a rule to deal with recycle failure. When recycle
failed (i.e. a directory with same name exists), a 4 digits random hash
will be added as a suffix with one retry.
Returns:
True if recycle run until the end, False if terminate prematurely.
"""
def _ConvertToTimestamp(dir_name):
"""Converts to time in seconds since the epoch.
The following format are supported:
Format Example
YYYYMMDD/ 20130929/
logs.YYYYMMDD/ logs.20140220/
logs.YYYYMMDD-hh/ logs.20140220-14/
Returns:
If dir_name can be converted, a time in seconds as a floating point will
be returned. Otherwise, return None.
"""
ret = None
support_formats = ['logs.%Y%m%d-%H', 'logs.%Y%m%d', '%Y%m%d']
for format_str in support_formats:
try:
ret = time.strptime(dir_name, format_str)
break
except ValueError:
pass # Not able to parse the string
if ret is not None:
ret = time.mktime(ret)
return ret
def _WriteSnapshot(file_path, snapshot_to_write):
with open(file_path, 'w') as fd:
fd.write(yaml.dump(snapshot_to_write, default_flow_style=False))
logging.debug('Snapshot %r created.', file_path)
if not config.recycle_dir:
logging.debug('No recycle_dir assigned, skipped.')
return False
if config.source_file:
logging.debug('config.source_file %r will not be recycled because of'
'potential racing writing, skipped', config.source_file)
return False
for current_dir, sub_dirs, filenames in os.walk(config.source_dir):
if os.path.basename(current_dir) == METADATA_DIRECTORY:
logging.debug('Metadata directory %r found, skipped.', current_dir)
continue
# Check if the directory format are recognizable.
dir_time_in_secs = _ConvertToTimestamp(os.path.basename(current_dir))
if dir_time_in_secs is None:
logging.debug('Cannot recognized the timestamp of %r. Will be skipped.',
dir_time_in_secs)
continue
# Make sure the directory is the deepest (i.e. not more sub_dirs)
if not (len(sub_dirs) == 1 and sub_dirs[0] == METADATA_DIRECTORY):
logging.debug('Directory %r still have sub-directories %r, not suitable'
' for screenshot creating. Will be skipped.',
current_dir, sub_dirs)
continue
# Establish new snpahost.
new_snapshot = []
for filename in filenames:
full_path = os.path.join(current_dir, filename)
mtime = os.path.getmtime(full_path)
size = os.path.getsize(full_path)
new_snapshot.append({'path': filename, 'mtime': mtime, 'size': size})
# Sort the new_snapshot by filename
new_snapshot.sort(key=lambda _dict: _dict['path'])
snapshot_path = os.path.join(current_dir, METADATA_DIRECTORY, SNAPSHOT)
# Check if the time criteria is already matched.
if time.time() - dir_time_in_secs < config.save_to_recycle_duration:
logging.debug(
'Directory %r still immature for recycling. %.2f secs left.',
current_dir,
config.save_to_recycle_duration - (time.time() - dir_time_in_secs))
# Update snapshot
_WriteSnapshot(snapshot_path, new_snapshot)
continue
# Read the current snapshot to check if we meet snapshot criteria.
current_snapshot = []
if os.path.isfile(snapshot_path):
with open(snapshot_path) as fd:
current_snapshot = yaml.load(fd.read())
_WriteSnapshot(snapshot_path, new_snapshot) # Override the current.
# Compare the snapshot.
if current_snapshot != new_snapshot:
continue
# Move to recycle_dir. Example of the moving path:
# raw-logs/eventlog/20130307/* -> recycle/raw-logs/eventlog/20130307/*
target_dir = os.path.join(config.recycle_dir,
os.path.relpath(current_dir, config.source_dir))
logging.info('Both time and snapshot criteria meet, moving %r '
'to %r', current_dir, target_dir)
try:
os.rename(current_dir, target_dir)
except OSError: # Might already have a directory with same name
target_dir = target_dir.rstrip('/') + '_' + str(uuid.uuid4())[-4:]
logging.info('A directory with same name might already existed. Retry'
'to move %r to %r instead.', current_dir, target_dir)
try:
os.rename(current_dir, target_dir)
except OSError: # Fail again.
logging.error('Fail to recycle directory %r, will try again in next '
'archiving cycle.', current_dir)
continue
logging.info('Successfully recycled %r to %r', current_dir, target_dir)
return True
def Archive(config, next_cycle=True):
"""Archives the files based on the ArchiverConfig.
This is the main logic of the archiver. Each config will call this function
to set up its archiving cycle running long-lived.
Args:
config: An ArchiverConfig instructs how to do the archiving.
next_cycle:
True to follow the config.duration and False to make this a one-time
execution, usually used in run-once mode or unittest.
Returns:
Full path of generated archive.
"""
# TODO(itspeter): Special treat for zip (prepare file list instead of chunks)
started_time = common.TimeString()
generated_archive = None
def _UpdateMetadata(archive_metadata, started_time):
"""Writes other information about current archiver and its enviornment."""
archive_metadata['archiver']['started'] = started_time
archive_metadata['archiver']['version_md5'] = (
common.GetMD5ForFiles(ARCHIVER_SOURCE_FILES, os.path.dirname(__file__)))
archive_metadata['archiver']['host'] = hex(uuid.getnode())
# TODO(itspeter): Write Umpire related information if available
def _CompressIntoTarXz(tmp_dir, archive_metadata, config):
"""Writes metadata into tmp_dir and compresses it into archived_dir."""
with open(os.path.join(tmp_dir, 'archive.metadata'), 'w') as fd:
fd.write(yaml.dump(archive_metadata, default_flow_style=False))
# Compress it.
generated_archive = os.path.join(
config.archived_dir, _GenerateArchiveName(config))
tmp_archive = generated_archive + '.part'
logging.info('Compressing %r into %r', tmp_dir, tmp_archive)
# TODO(itspeter): Handle the failure cases.
output_tuple = Popen( # pylint: disable=W0612
['tar', '-cvJf', tmp_archive, '-C', tmp_dir, '.'],
stdout=PIPE, stderr=PIPE).communicate()
# Remove .part suffix.
os.rename(tmp_archive, generated_archive)
return generated_archive
try:
tmp_dir = tempfile.mkdtemp(prefix='FactoryArchiver_',
suffix='_' + config.data_type)
logging.info('%r created for archiving data_type[%s]',
tmp_dir, config.data_type)
if config.source_dir:
eligible_files = _ListEligibleFiles(config.source_dir)
else: # Single file archiving.
appended_range = _GetRangeOfAppendedBytes(config.source_file)
if (appended_range[1] - appended_range[0]) <= 0:
logging.debug(
'No bytes appended to file %r since last archiving. Skipped.',
config.source_file)
eligible_files = []
else:
eligible_files = [
(appended_range[0], appended_range[1], config.source_file)]
archive_metadata = _CopyCompleteChunks(eligible_files, tmp_dir, config)
if len(archive_metadata['files']) > 0:
_UpdateMetadata(archive_metadata, started_time) # Write other info
# TODO(itspeter): Handle the .zip compress_format.
generated_archive = _CompressIntoTarXz(tmp_dir, archive_metadata, config)
# Update metadata data for archived files.
_UpdateArchiverMetadata(archive_metadata['files'], config)
else:
logging.info('No data available for archiving for %r', config.data_type)
finally:
shutil.rmtree(tmp_dir)
logging.info('%r deleted', tmp_dir)
# Create snapshot and recycle direcotry in source_dir
_Recycle(config)
if next_cycle:
reactor.callLater(config.duration, Archive, config) # pylint: disable=E1101
logging.info('%r created for archiving data_type[%s]',
generated_archive, config.data_type)
return generated_archive