| #!/usr/bin/env python |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import collections |
| import logging |
| import os |
| import shelve |
| import threading |
| |
| from cros.factory import event_log |
| from cros.factory.test import factory |
| from cros.factory.test import utils |
| from cros.factory.utils.shelve_utils import OpenShelfOrBackup |
| |
| EVENT_SEPARATOR = '\n---\n' |
| KEY_OFFSET = 'offset' |
| EVENT_LOG_DB_FILE = os.path.join(factory.get_state_root(), 'event_log_db') |
| |
| |
| class ScanException(Exception): |
| pass |
| |
| |
| # Chunk scanned by the log watcher. |
| # |
| # Properties: |
| # |
| # log_name: Name of the log |
| # chunk: Value of the chunk |
| # pos: Position of the chunk within the file |
| class Chunk(collections.namedtuple('Chunk', 'log_name chunk pos')): |
| # pylint: disable=W0232 |
| # pylint: disable=E1101 |
| def __str__(self): |
| return 'Chunk(log_name=%r, len=%s, pos=%d)' % ( |
| self.log_name, len(self.chunk), self.pos) |
| |
| |
| class EventLogWatcher(object): |
| '''An object watches event log and invokes a callback as new logs appear.''' |
| |
| def __init__(self, |
| watch_period_sec=30, |
| event_log_dir=event_log.EVENT_LOG_DIR, |
| event_log_db_file=EVENT_LOG_DB_FILE, |
| handle_event_logs_callback=None, |
| num_log_per_callback=0): |
| '''Constructor. |
| |
| Args: |
| watch_period_sec: The time period in seconds between consecutive |
| watches. |
| event_log_db_file: The file in which to store the DB of event logs, |
| or None to use sync markers instead (see event_log.py). |
| handle_event_logs_callback: The callback to trigger after new event logs |
| found. |
| num_log_per_callback: The maximum number of log files per callback, or 0 |
| for unlimited number of log files. |
| ''' |
| self._watch_period_sec = watch_period_sec |
| self._event_log_dir = event_log_dir |
| self._event_log_db_file = event_log_db_file |
| self._handle_event_logs_callback = handle_event_logs_callback |
| self._num_log_per_callback = num_log_per_callback |
| self._watch_thread = None |
| self._aborted = threading.Event() |
| self._kick = threading.Event() |
| self._scan_lock = threading.Lock() |
| |
| self._use_sync_markers = event_log_db_file is None |
| self._db = {} if self._use_sync_markers else self.GetOrCreateDb() |
| |
| def StartWatchThread(self): |
| '''Starts a thread to watch event logs.''' |
| logging.info('Watching event logs...') |
| self._watch_thread = threading.Thread(target=self.WatchForever, |
| name='EventLogWatcher') |
| self._watch_thread.start() |
| |
| def IsThreadStarted(self): |
| '''Returns True if the thread is currently running.''' |
| return self._watch_thread is not None |
| |
| def IsScanning(self): |
| '''Returns True if currently scanning (i.e., the lock is held).''' |
| if self._scan_lock.acquire(blocking=False): |
| self._scan_lock.release() |
| return False |
| else: |
| return True |
| |
| def FlushEventLogs(self): |
| '''Flushes event logs. |
| |
| Call ScanEventLogs and with suppress_error flag to false. |
| ''' |
| with self._scan_lock: |
| self.ScanEventLogs(False) |
| |
| def _CallEventLogHandler(self, chunks, suppress_error): |
| '''Invoke event log handler callback. |
| |
| Args: |
| chunks: A list of Chunks. |
| suppress_error: if set to true then any exception from handle event |
| log callback will be ignored. |
| |
| Raises: |
| ScanException: if upload handler throws exception. |
| ''' |
| try: |
| if self._handle_event_logs_callback is not None: |
| self._handle_event_logs_callback(chunks) |
| if self._use_sync_markers: |
| # Update the sync marker in each chunk. |
| for chunk in chunks: |
| last_sync_marker = chunk.chunk.rfind(event_log.SYNC_MARKER_SEARCH) |
| if not last_sync_marker: |
| continue |
| with open(os.path.join(self._event_log_dir, chunk.log_name), |
| 'r+') as f: |
| f.seek(chunk.pos + last_sync_marker) |
| f.write(event_log.SYNC_MARKER_REPLACE) |
| f.flush() |
| os.fdatasync(f) |
| |
| except: # pylint: disable=W0702 |
| if suppress_error: |
| logging.exception('Upload handler error') |
| else: |
| raise ScanException(utils.FormatExceptionOnly()) |
| return |
| |
| try: |
| # Update log state to db. |
| for chunk in chunks: |
| log_state = self._db.setdefault(chunk.log_name, {KEY_OFFSET: 0}) |
| log_state[KEY_OFFSET] += len(chunk.chunk) |
| self._db[chunk.log_name] = log_state |
| if not self._use_sync_markers: |
| self._db.sync() |
| except: # pylint: disable=W0702 |
| if suppress_error: |
| logging.exception('Upload handler error') |
| else: |
| raise ScanException(utils.FormatExceptionOnly()) |
| |
| def ScanEventLogs(self, suppress_error=True): |
| '''Scans event logs. |
| |
| Args: |
| suppress_error: if set to true then any exception from handle event |
| log callback will be ignored. |
| |
| Raise: |
| ScanException: if at least one ScanEventLog call throws exception. |
| ''' |
| if not os.path.exists(self._event_log_dir): |
| logging.warn("Event log directory %s does not exist yet", |
| self._event_log_dir) |
| return |
| |
| chunks = [] |
| |
| # Sorts dirs by their names, as its modification time is changed when |
| # their files inside are changed/added/removed. Their names are more |
| # reliable than the time. |
| dir_name = lambda w: w[0] |
| for dir_path, _, file_names in sorted(os.walk(self._event_log_dir), |
| key=dir_name): |
| # Sorts files by their modification time. |
| file_mtime = lambda f: os.lstat(os.path.join(dir_path, f)).st_mtime |
| for file_name in sorted(file_names, key=file_mtime): |
| file_path = os.path.join(dir_path, file_name) |
| if not os.path.isfile(file_path): |
| continue |
| relative_path = os.path.relpath(file_path, self._event_log_dir) |
| if (not self._db.has_key(relative_path) or |
| self._db[relative_path][KEY_OFFSET] != os.path.getsize(file_path)): |
| try: |
| chunk_info = self.ScanEventLog(relative_path) |
| if chunk_info is not None: |
| chunks.append(chunk_info) |
| except: # pylint: disable=W0702 |
| msg = relative_path + ': ' + utils.FormatExceptionOnly() |
| if suppress_error: |
| logging.info(msg) |
| else: |
| raise ScanException(msg) |
| if (self._num_log_per_callback and |
| len(chunks) >= self._num_log_per_callback): |
| self._CallEventLogHandler(chunks, suppress_error) |
| chunks = [] |
| # Skip remaining when abort. We don't want to wait too long for the |
| # remaining finished. |
| if self._aborted.isSet(): |
| return |
| |
| if chunks: |
| self._CallEventLogHandler(chunks, suppress_error) |
| |
| |
| def StopWatchThread(self): |
| '''Stops the event logs watching thread.''' |
| self._aborted.set() |
| self._kick.set() |
| self._watch_thread.join() |
| self._watch_thread = None |
| logging.info('Stopped watching.') |
| self.Close() |
| |
| def KickWatchThread(self): |
| self._kick.set() |
| |
| def Close(self): |
| '''Closes the database.''' |
| if not self._use_sync_markers: |
| self._db.close() |
| |
| def WatchForever(self): |
| '''Watches event logs forever.''' |
| while True: |
| # Flush the event logs once every watch period. |
| self._kick.wait(self._watch_period_sec) |
| self._kick.clear() |
| if self._aborted.isSet(): |
| return |
| try: |
| with self._scan_lock: |
| self.ScanEventLogs() |
| except: # pylint: disable=W0702 |
| logging.exception('Error in event log watcher thread') |
| |
| def GetOrCreateDb(self): |
| '''Gets the database or recreate one if exception occurs.''' |
| assert not self._use_sync_markers |
| |
| try: |
| db = OpenShelfOrBackup(self._event_log_db_file) |
| except: # pylint: disable=W0702 |
| logging.exception('Corrupted database, recreating') |
| os.unlink(self._event_log_db_file) |
| db = shelve.open(self._event_log_db_file) |
| return db |
| |
| def ScanEventLog(self, log_name): |
| '''Scans new generated event log. |
| |
| Scans event logs in given file path and flush to our database. |
| If the log name has no record, create an empty event log for it. |
| |
| Args: |
| log_name: name of the log file. |
| ''' |
| log_state = self._db.get(log_name) |
| if not log_state: |
| # We haven't seen this file yet since starting up. |
| offset = 0 |
| if self._use_sync_markers: |
| # Read in the file and set offset from the last sync marker. |
| with open(os.path.join(self._event_log_dir, log_name)) as f: |
| contents = f.read() |
| # Set the offset to just after the last instance of |
| # "\n#S\n---\n". |
| replace_pos = contents.rfind(event_log.SYNC_MARKER_REPLACE) |
| if replace_pos == -1: |
| # Not found; start at the beginning. |
| offset = 0 |
| else: |
| offset = replace_pos + len(event_log.SYNC_MARKER_REPLACE) |
| else: |
| # No sync markers; start from the beginning. |
| offset = 0 |
| log_state = {KEY_OFFSET: offset} |
| self._db[log_name] = log_state |
| |
| with open(os.path.join(self._event_log_dir, log_name)) as f: |
| f.seek(log_state[KEY_OFFSET]) |
| |
| chunk = f.read() |
| last_separator = chunk.rfind(EVENT_SEPARATOR) |
| # No need to proceed if available chunk is empty. |
| if last_separator == -1: |
| return None |
| |
| chunk = chunk[0:(last_separator + len(EVENT_SEPARATOR))] |
| return Chunk(log_name, chunk, log_state[KEY_OFFSET]) |
| |
| def GetEventLog(self, log_name): |
| '''Gets the log for given log name.''' |
| return self._db.get(log_name) |