blob: f971d3a637cd0f7d1fa1e87cdeab04105a3d97fb [file] [log] [blame]
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Utils class and functions. """
import collections
from contextlib import contextmanager
import inspect
import itertools
import math
import os
import tempfile
import time
from .default_setting import CONFIG_DIR
from .default_setting import LOG_DIR
from .default_setting import logger
def Enum(*args, **kwargs):
"""Create the immutable enumeration set.
Usage:
1. C-style enum. The value starts from 0 and increase orderly.
A = Enum('foo', 'bar')
then A.foo == 0, A.bar == 1
2. Key-value pair enum.
B = Enum(foo='FOO', bar='BAR')
then B.foo == 'FOO', B.bar == 'BAR'
"""
fields = dict(zip(args, itertools.count()), **kwargs)
enum_type = collections.namedtuple('Enum', fields.keys())
return enum_type(**fields)
def SearchConfig(filepath, search_dirs=None):
"""Find the config file and return the content.
The order of searching is:
1. relative path
2. config folder
3. search_dirs
"""
possible_dirs = ['', CONFIG_DIR]
if search_dirs is not None:
if type(search_dirs) != list:
search_dirs = [search_dirs]
possible_dirs += search_dirs
for possible_dir in possible_dirs:
path = os.path.abspath(os.path.join(possible_dir, filepath))
if os.path.exists(path):
return path
logger.error('Cannot find config file: %s', filepath)
raise IOError
def PrepareOutputFile(file_path):
""" Confirm the output file path is ok.
1. If the file_path is not absolute, then assign it to default log folder.
2. Check if the directory exists. If not, create the folder first.
"""
if not os.path.isabs(file_path):
logger.debug('file path %s is not absolute, assign to default log folder',
file_path)
file_path = os.path.join(LOG_DIR, file_path)
dir_path = os.path.dirname(file_path)
if not os.path.isdir(dir_path):
logger.debug('%s folder is not existed, create it.', dir_path)
os.mkdir(os.path.dirname(file_path))
return file_path
@contextmanager
def UnopenedTemporaryFile(**kwargs):
"""Yields an unopened temporary file.
The file is not opened, and it is deleted when the context manager
is closed if it still exists at that moment.
Args:
Any allowable arguments to tempfile.mkstemp (e.g., prefix,
suffix, dir).
"""
f, path = tempfile.mkstemp(**kwargs)
os.close(f)
try:
yield path
finally:
if os.path.exists(path):
os.unlink(path)
def IsInBound(results, bound):
"""Check the results meet the bound or not.
Args:
results: A number for SISO case, or a dict for MIMO case. The values of the
dict should be numbers.
bound: A tuple of the lower bound and uppper bound. The bound is a value or
None.
Returns:
True if all the result are between the lower bound and upper bound.
"""
def _CheckNumberType(value):
return isinstance(value, int) or isinstance(value, float)
def _OneValueInBound(value, bound):
if value is None:
return False
lower_bound, upper_bound = bound
return ((lower_bound is None or value >= lower_bound) and
(upper_bound is None or value <= upper_bound))
if isinstance(results, dict):
value_list = results.values()
else:
value_list = [results]
if not all([_CheckNumberType(value) for value in value_list]):
logger.error('The type of the result %s is invalid.', results)
return False
return all([_OneValueInBound(value, bound) for value in value_list])
def CalculateAverage(values, average_type='Linear'):
"""Calculate the average value.
Args:
values: A list of float value.
average_type: one of 'Linear', '10Log10', '20Log10'.
Returns: the average value.
"""
length = len(values)
values = map(float, values)
if length == 0:
return float('nan')
if length == 1:
return values[0]
if average_type == 'Linear':
return sum(values) / length
else:
deno = {'10Log10': 10,
'20Log10': 20}[average_type]
try:
actual_values = [math.pow(10, value / deno) for value in values]
average_value = sum(actual_values) / length
return deno * math.log10(average_value)
except ValueError:
return float('-inf')
except OverflowError:
logger.warning('The values exceed the range. Return NaN.')
return float('nan')
class TimeoutError(Exception):
"""Timeout error."""
def __init__(self, message, output=None):
Exception.__init__(self)
self.message = message
self.output = output
def __str__(self):
return repr(self.message)
def WaitFor(condition, timeout_secs, poll_interval_secs=0.1):
"""Wait for the given condition for at most the specified time.
Args:
condition: A function object.
timeout_secs: Timeout value in seconds.
poll_interval_secs: Interval to poll condition in seconds.
Raises:
ValueError: If condition is not a function.
TimeoutError: If cond does not become True after timeout_secs seconds.
"""
if not callable(condition):
raise ValueError('condition must be a callable object')
condition_name = condition.__name__
if condition_name == '<lambda>':
try:
condition_name = inspect.getsource(condition).strip()
except IOError:
pass
end_time = time.time() + timeout_secs if timeout_secs else float('inf')
while True:
if not math.isinf(end_time):
logger.info('[%ds left] %s', end_time - time.time(), condition_name)
ret = condition()
if ret:
return ret
if time.time() + poll_interval_secs > end_time:
error_msg = 'Timeout waiting for condition: %s' % condition_name
logger.error(error_msg)
raise TimeoutError(error_msg, ret)
time.sleep(poll_interval_secs)