blob: 8b36b6f76c3a48407a1ba969336c79a57f0f50f3 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Google Factory Tools Common Library
This module includes several common utilities for Google Factory Tools
A detailed description of gft_common.
"""
import os
import re
import subprocess
import sys
import tempfile
import threading
import time
########################################################################
# Global Variables
_debug = False
_verbose = False
_log_path = None
DEFAULT_CONSOLE_LOG_PATH = '/var/log/factory.log'
########################################################################
# Common Utilities
class GFTError(Exception):
""" Exception for unrecoverable errors for GFT related functions. """
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def SetDebugLevel(level):
""" Sets the debugging level. """
global _debug
_debug = level
def SetVerboseLevel(level, log_path=None):
""" Sets the verbosity level and log output path. """
global _verbose, _log_path
_verbose = level
if log_path:
DebugMsg("Set log path to: " + log_path, log=False)
_log_path = log_path
def Log(msg):
""" Writes a message to pre-configured log file. """
if not _log_path:
return
try:
with open(_log_path, "at") as log_handle:
lines = ["(GFT) %s %s\n" % (time.strftime("%Y%m%d %H:%M:%S"), entry)
for entry in msg.splitlines()]
log_handle.writelines(lines)
except:
sys.stderr.write("FAILED TO WRITE TO LOG FILE %s.\n" % _log_path)
def WarningMsg(msg):
""" Prints warning messages (to stderr) as-is. """
sys.stderr.write("%s\n" % msg)
Log(msg)
def VerboseMsg(msg):
""" Prints verbose message, if SetVerboseLevel was called with True. """
if _verbose:
WarningMsg(msg)
else:
Log(msg)
def DebugMsg(msg, log=True):
""" Prints message when debug is enabled. """
for entry in msg.splitlines():
if _debug:
WarningMsg("(DEBUG) %s" % entry)
elif log:
Log("(DEBUG) %s" % entry)
def ErrorMsg(msg):
""" Prints messages to stderr with prefix "ERROR". """
for entry in msg.splitlines():
WarningMsg("ERROR: %s" % entry)
def ErrorDie(msg):
""" Raises a GFTError exception. """
raise GFTError(msg)
def GFTConsole(f):
"""Decorator for all Google Factory Test Tools console programs.
log path will be redirectd to DEFAULT_CONSOLE_LOG_PATH by default,
and all GFTError will be catched, logged, and exit as failure.
"""
def main_wrapper(*args, **kw):
# override the default log path
global _log_path
_log_path = DEFAULT_CONSOLE_LOG_PATH
try:
return f(*args, **kw)
except GFTError, e:
ErrorMsg(e.value)
sys.exit(1)
return main_wrapper
def GetTemporaryFileName(prefix='gft', suffix=''):
""" Gets a unique file name for temporary usage. """
(fd, filename) = tempfile.mkstemp(prefix=prefix, suffix=suffix)
os.close(fd)
return filename
def ShellExecution(command,
ignore_status=False,
show_progress=False,
progress_message=None):
""" Executes a shell command, and return the results.
Args:
ignore_status: False to raise exectopion when execution result is not zero
show_progress: Shows progress by messages and dots
progress_message: Messages printed before starting.
Returns:
(exit_code, stdout_messages, stderr_messages)
"""
DebugMsg("ShellExecution: %s" % command, log=False)
temp_stderr = tempfile.TemporaryFile()
temp_stdout = tempfile.TemporaryFile()
proc = subprocess.Popen(command,
stderr=temp_stderr,
stdout=temp_stdout,
shell=True)
if show_progress:
sys.stdout.flush()
if progress_message:
sys.stderr.write(progress_message)
while proc.poll() is None:
sys.stderr.write('.')
time.sleep(1)
if progress_message:
sys.stderr.write('\n')
else:
proc.communicate()
# collect output
temp_stdout.seek(0)
out = temp_stdout.read()
temp_stdout.close()
temp_stderr.seek(0)
err = temp_stderr.read()
temp_stderr.close()
exit_code = proc.wait()
if exit_code:
# prepare to log the error message.
message = ('Failed executing command: %s\n'
'Output and Error Messages: %s\n%s' % (command, out, err))
if ignore_status:
DebugMsg(message)
else:
ErrorDie(message)
return (exit_code, out, err)
def SystemOutput(command,
ignore_status=False,
show_progress=False,
progress_message=None):
""" Returns the stdout results from a shell command execution. """
return ShellExecution(command, ignore_status, show_progress,
progress_message)[1].rstrip('\n')
def System(command,
ignore_status=False,
show_progress=False,
progress_message=None):
""" Returns the exit code from a shell command execution. """
return ShellExecution(command, ignore_status, show_progress,
progress_message)[0]
def ReadOneLine(filename):
""" Reads one line from file. """
with open(filename) as opened_file:
return opened_file.readline().strip()
def ReadFile(filename):
""" Reads whole file. """
with open(filename) as opened_file:
return opened_file.read().strip()
def ReadBinaryFile(filename):
""" Reads whole binary file. """
with open(filename, "rb") as opened_file:
return opened_file.read()
def WriteFile(filename, data):
""" Writes one file and exit. """
with open(filename, "w") as opened_file:
opened_file.write(data)
def WriteBinaryFile(filename, data):
""" Writes one binary file and exit. """
with open(filename, "wb") as opened_file:
opened_file.write(data)
def ThreadSafe(f):
""" Decorator for functions that need synchronoization. """
lock = threading.Lock()
def threadsafe_call(*args):
try:
lock.acquire()
return f(*args)
finally:
lock.release()
return threadsafe_call
def Memorize(f):
""" Decorator for functions that need memorization. """
memorize_data = {}
def memorize_call(*args):
index = repr(args)
if index in memorize_data:
value = memorize_data[index]
# DebugMsg('Memorize: using cached value for: %s %s' % (repr(f), index))
return value
value = f(*args)
memorize_data[index] = value
return value
return memorize_call
if __name__ == '__main__':
print "Google Factory Tool Common Library."