blob: 346bda1a8f79a814826226db96576f7e1a883fae [file] [log] [blame]
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""SystemLogManager scans system logs periodically and clear/sync them."""
import glob
import logging
import os
import Queue
import threading
import time
from collections import namedtuple
from urlparse import urlparse
import factory_common # pylint: disable=W0611
from cros.factory import event_log
from cros.factory.test import shopfloor
from cros.factory.utils.debug_utils import CatchException
from cros.factory.utils.process_utils import Spawn, TerminateOrKillProcess
KickRequest = namedtuple('KickRequest',
['extra_files', 'callback', 'clear_only'])
MIN_SYNC_LOG_PERIOD_SECS = 120
class SystemLogManagerException(Exception):
"""Exception for SystemLogManager."""
pass
class SystemLogManager(object):
"""The manager that takes care of system log files.
Properties set from __init__ arguments:
sync_log_paths: A list of log paths to sync.
sync_log_period_secs: The time period in seconds between consecutive syncs.
Set it to None to disable periodic syncs. If it is not None, it should be
larger than MIN_SYNC_LOG_PERIOD_SECS.
scan_log_period_secs: The time period in seconds between consecutive scans.
A scan includes clearing logs and optionally syncing logs.
shopfloor_timeout: Timeout to get shopfloor instance.
rsync_io_timeout: I/O timeout argument in rsync command. When file is
large, system needs more time to compute the difference. Sets it to
20 seconds which is enough to compute the difference on a 300M file.
polling_period: The period to poll rsync subprocess.
clear_log_paths: A list of log paths to clear. Each item in the list can be
a pattern. E.g. ['/var/log/messages*', '/var/log/net.log',
'/var/log/connectivity.log'].
clear_log_excluded_paths: A list of log path patterns to be excluded from
clearing. E.g. ['/var/log/messages', '/var/log/messages.1',
'/var/log/messages.2'], then SystemLogManager will
preserve these files while they match '/var/log/messages*' in
clear_log_paths.
Other properties:
main_thread: The thread that scans logs periodically.
aborted: The event to abort main_thread.
queue: The queue to store all the sync requests.
"""
def __init__(self, sync_log_paths, sync_log_period_secs=300,
scan_log_period_secs=120, shopfloor_timeout=5,
rsync_io_timeout=20, polling_period=1, clear_log_paths=None,
clear_log_excluded_paths=None):
self._sync_log_paths = sync_log_paths
self._sync_log_period_secs = sync_log_period_secs
self._scan_log_period_secs = scan_log_period_secs
self._shopfloor_timeout = shopfloor_timeout
self._rsync_io_timeout = rsync_io_timeout
self._polling_period = polling_period
self._clear_log_paths = clear_log_paths if clear_log_paths else []
self._clear_log_excluded_paths = (
clear_log_excluded_paths if clear_log_excluded_paths else [])
self._main_thread = None
self._aborted = threading.Event()
self._queue = Queue.Queue()
self._CheckSettings()
def _CheckSettings(self):
"""Checks the parameters are valid."""
if self._sync_log_period_secs:
if self._sync_log_period_secs < MIN_SYNC_LOG_PERIOD_SECS:
raise SystemLogManagerException('sync_log_period_secs should not'
' be less than %d.' % MIN_SYNC_LOG_PERIOD_SECS)
if self._scan_log_period_secs > self._sync_log_period_secs:
raise SystemLogManagerException('scan_log_period_secs should not'
' be greater than sync_log_period_seconds.')
for list_name in ['_clear_log_paths', '_clear_log_excluded_paths']:
list_attribute = getattr(self, list_name)
if list_attribute and not isinstance(list_attribute, list):
raise SystemLogManagerException('%r should be a list.', list_name)
if self._clear_log_paths and self._sync_log_paths:
if (set(self._clear_log_paths) & set(self._sync_log_paths)):
raise SystemLogManagerException('clear_log_paths should not be '
'overlapped with sync_log_paths.')
if self._clear_log_paths and self._clear_log_excluded_paths:
if set(self._clear_log_paths) & set(self._clear_log_excluded_paths):
raise SystemLogManagerException('clear_log_paths should not be '
'overlapped with clear_log_excluded_paths.')
def IsThreadRunning(self):
"""Returns True if _main_thread is running."""
return self._main_thread and self._main_thread.isAlive()
def KickToClear(self):
"""Kicks _main_thread to clear logs.
Stores the request into _queue and kicks _main_thread to handle the request.
Raises:
SystemLogManagerException: If thread is not running.
"""
if not self.IsThreadRunning():
raise SystemLogManagerException('Thread is not running.')
self._queue.put(KickRequest([], None, True))
logging.debug('Puts a clear request.')
def KickToSync(self, extra_files=None, callback=None):
"""Kicks _main_thread to sync logs with extra files and a callbalk.
Stores the request into _queue and kicks _main_thread to handle the request.
SystemLogManager will clear logs, sync logs, and execute the callback.
Args:
extra_files: A list of extra files to sync.
callback: A callback to call after sync succeeds. This is to serve the
case such as deleting the files after syncing them to server.
Raises:
SystemLogManagerException: If thread is not running.
"""
if not self.IsThreadRunning():
raise SystemLogManagerException('Thread is not running.')
if extra_files is None:
extra_files = []
self._queue.put(KickRequest(extra_files, callback, False))
logging.debug('Puts extra_files: %r.', extra_files)
def Start(self):
"""Starts SystemLogManager _main_thread with _RunForever method."""
logging.info('Start SystemLogManager thread.')
self._ClearLogs()
self._main_thread = threading.Thread(target=self._RunForever,
name='SystemLogManager')
self._main_thread.start()
def Stop(self):
"""Stops SystemLogManager _main_thread.
Raises:
SystemLogManagerException: If thread is not running.
"""
if not self.IsThreadRunning():
raise SystemLogManagerException('Thread is not running.')
logging.debug('Sets aborted event to SystemLogManager main thread.')
self._aborted.set()
# Puts a request to kick _main_thread.
self._queue.put(KickRequest([], None, False))
self._main_thread.join()
self._main_thread = None
logging.info('SystemLogManager main thread stopped.')
def _RsyncDestination(self):
"""Gets rsync destination including server url, module, and folder name."""
url = (shopfloor.get_server_url() or
shopfloor.detect_default_server_url())
proxy = shopfloor.get_instance(detect=True, timeout=self._shopfloor_timeout)
factory_log_port = proxy.GetFactoryLogPort()
folder_name = ('%s_%s' %
(event_log.GetDeviceId().replace(':', ''), event_log.GetReimageId()))
return ['rsync://%s:%s/system_logs/%s' %
(urlparse(url).hostname, factory_log_port, folder_name)]
@CatchException('SystemLogManager')
def _ClearLogs(self):
"""Clears system logs.
Clear logs listed in _clear_log_paths, excluding files in
_clear_log_excluded_paths.
"""
clear_files = sum([glob.glob(x) for x in self._clear_log_paths], [])
exclusive_files = sum(
[glob.glob(x) for x in self._clear_log_excluded_paths], [])
file_list = list(set(clear_files) - set(exclusive_files))
logging.debug('Clearing %r', file_list)
for f in file_list:
try:
os.unlink(f)
except: # pylint: disable=W0702
logging.exception('Fails to remove file %s.', f)
logging.debug('Logs cleared.')
@CatchException('SystemLogManager')
def _SyncLogs(self, extra_files, callback, abort_time):
"""Syncs system logs and extra files to server with a callback.
If the threads gets kicked, terminates the running subprocess.
If rsync takes too long and exceeds abort_time, terminates the running
subprocess.
Args:
extra_files: A list of extra files to sync.
callback: A callback function to call after sync succeeds.
abort_time: The time to abort rsync subprocess if abort_time is not None.
"""
logging.debug('Starts _SyncLogs.')
rsync_command = ['rsync', '-azR', '--stats', '--chmod=o-t',
'--timeout=%s' % self._rsync_io_timeout]
rsync_command += sum([glob.glob(x) for x in self._sync_log_paths], [])
rsync_command += extra_files
rsync_command += self._RsyncDestination()
rsync = Spawn(rsync_command, ignore_stdout=True, ignore_stderr=True)
while rsync.poll() is None:
logging.debug('Polls once.')
self._aborted.wait(self._polling_period)
# Aborts rsync if rsync takes too long and exceeds abort_time.
if self._aborted.isSet() or (abort_time and time.time() > abort_time):
TerminateOrKillProcess(rsync)
logging.warning('System log rsync aborted.')
return
if rsync.returncode:
logging.error('Factory log rsync returned status %d.',
rsync.returncode)
else:
# rsync succeeded, invoke callback function.
logging.info('rsync succeeded.')
if callback:
callback(extra_files)
def _RunForever(self):
"""The main method that SystemLogManager runs forever until being stopped.
SystemLogManager clears system logs periodically and syncs them optionally.
If user calls KickToSync method, SystemLogManager will clear logs and then
deal with the request.
If user calls KickToClear method, SystemLogManager will clear logs.
"""
last_sync_time = None
while True:
try:
extra_files, callback, clear_only = self._queue.get(
block=True, timeout=self._scan_log_period_secs)
except Queue.Empty:
# clears obsolete logs.
self._ClearLogs()
# There is no sync request.
# Syncs if periodic syncing is enabled and the time
# difference from last_sync_time to current time is greater than
# _sync_log_period_secs. Note that last_sync_time is the starting time
# of the last sync, not the end time. Also, the last sync might fail.
if (self._sync_log_period_secs and
(last_sync_time is None or
(time.time() - last_sync_time > self._sync_log_period_secs))):
last_sync_time = time.time()
self._SyncLogs([], None, time.time() + self._scan_log_period_secs)
else:
logging.debug('Gets a request with extra_files, callback, clear_only:'
'%r, %r, %r.', extra_files, callback, clear_only)
# There is a request from queue. But it might be set from Stop().
if self._aborted.isSet():
self._queue.task_done()
logging.info('SystemLogManager got aborted.')
return
else:
# Always clears obsolete logs.
self._ClearLogs()
if not clear_only:
# Syncs logs according to the request.
last_sync_time = time.time()
self._SyncLogs(extra_files, callback,
time.time() + self._scan_log_period_secs)
self._queue.task_done()