blob: 8d26783a1ef4ab6857ba6b13712d841ca1cc0774 [file] [log] [blame]
#!/usr/bin/python2
#
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Input archive plugin.
Import all events from archives which are made by output_archive.
The archive name:
'InstalogEvents_' + year + month + day + hour + minute + second
The archive structure:
InstalogEvents_YYYYmmddHHMMSS.tar.gz
InstalogEvents_YYYYmmddHHMMSS/
events.json
attachments/ # Will not have this dir if no attachment.
000_${EVENT_0_ATTACHMENT_0_NAME}
000_${EVENT_0_ATTACHMENT_1_NAME}
001_${EVENT_1_ATTACHMENT_0_NAME}
001_${EVENT_1_ATTACHMENT_1_NAME}
...
"""
# TODO(kitching): Add a unittest.
from __future__ import print_function
import glob
import os
import tarfile
import instalog_common # pylint: disable=unused-import
from instalog import datatypes
from instalog import plugin_base
from instalog.utils.arg_utils import Arg
from instalog.utils import file_utils
class InputArchive(plugin_base.InputPlugin):
ARGS = [
Arg('path', (str, unicode),
'Path to the set of archives on disk. Uses glob syntax. '
'e.g. "/path/to/InstalogEvents_*.tar.gz"'),
]
def ExtractArchive(self, archive_path, tmp_path):
"""Extracts archive to tmp_path, and checks that events.json exists.
Returns:
json_path: The path of log file in tmp_path.
"""
with tarfile.open(archive_path, 'r:gz') as tar:
tar.extractall(tmp_path)
archive_name = os.path.basename(archive_path)
# Remove '.tar.gz'
dir_name = archive_name.split(os.extsep)[0]
json_path = os.path.join(tmp_path, dir_name, 'events.json')
if os.path.isfile(json_path):
return json_path
else:
self.error('File "%s" does not have event.json', archive_name)
raise IOError
def ProcessArchive(self, archive_path):
"""Extracts archive to tmp_path, then parses and emits events within."""
self.info('Processing archive %s...', archive_path)
with file_utils.TempDirectory(prefix='input_archive_') as tmp_path:
try:
json_path = self.ExtractArchive(archive_path, tmp_path)
if not self.ParseAndEmit(json_path):
self.error('Emit failed!')
raise IOError
except Exception:
# We might not have permission to access this file, or there could be
# some other IO problem, or the tarfile was broken.
self.exception('Exception while accessing file, check permissions, '
'files in archive, and "path" argument.')
raise
return
def Main(self):
"""Main thread of the plugin."""
archive_paths = sorted(glob.glob(self.args.path))
self.info('Scanned for archives, %d files detected', len(archive_paths))
for archive_path in archive_paths:
self.ProcessArchive(archive_path)
self.info('Finished importing all archives')
def ParseAndEmit(self, path):
"""Parses lines in path to events, and emits to Instalog.
Returns:
Result from the Emit call (boolean representing its success).
"""
events = []
event_dir = os.path.dirname(path)
with open(path, 'r') as f:
for line in f:
event = self.ParseEvent(path, line)
for att_id, att_path in event.attachments.iteritems():
event.attachments[att_id] = os.path.join(event_dir, att_path)
events.append(event)
self.info('Parsed %d events', len(events))
return self.Emit(events)
def ParseEvent(self, path, line):
"""Returns an Instalog Event parsed from line.
Args:
path: Path to the log file in question.
line: The JSON line to be parsed.
May include trailing \r and \n characters.
"""
try:
return datatypes.Event.Deserialize(line)
except Exception:
self.error('Encountered invalid line "%s" in %s, aborting import',
line.rstrip(), path, exc_info=True)
raise
if __name__ == '__main__':
plugin_base.main()