blob: fca95888c2715fa2a7586c4499ef7ffaf2e5617e [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Independent general functions useful for most other code."""
import collections
import logging
import re
import time
import yaml
from subprocess import Popen, PIPE
class Error(Exception):
"""Generic fatal error."""
pass
class TimeoutError(Error):
"""Timeout error."""
pass
class Obj(object):
"""Generic wrapper allowing dot-notation dict access."""
def __init__(self, **field_dict):
self.__dict__.update(field_dict)
def __repr__(self):
return repr(self.__dict__)
def Shell(cmd, stdin=None, log=True):
"""Run cmd in a shell, return Obj containing stdout, stderr, and status.
The cmd stdout and stderr output is debug-logged.
Args:
cmd: Full shell command line as a string, which can contain
redirection (popes, etc).
stdin: String that will be passed as stdin to the command.
log: log command and result.
"""
process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = process.communicate(input=stdin) # pylint: disable=E1123
if log:
logging.debug('running %s' % repr(cmd) +
(', stdout: %s' % repr(stdout.strip()) if stdout else '') +
(', stderr: %s' % repr(stderr.strip()) if stderr else ''))
status = process.poll()
return Obj(stdout=stdout, stderr=stderr, status=status, success=(status == 0))
def CompactStr(data):
"""Converts data to string with compressed white space.
Args:
data: Single string or a list/tuple of strings.
Returns:
If data is a string, compress all contained contiguous spaces to
single spaces. If data is a list or tuple, space-join and then
treat like string input.
"""
if isinstance(data, list) or isinstance(data, tuple):
data = ' '.join(x for x in data if x)
return re.sub('\s+', ' ', data).strip()
def SetupLogging(level=logging.WARNING, log_file_name=None):
"""Configure logging level, format, and target file/stream.
Args:
level: The logging.{DEBUG,INFO,etc} level of verbosity to show.
log_file_name: File for appending log data.
"""
logging.basicConfig(
format='%(levelname)-8s %(asctime)-8s %(message)s',
datefmt='%H:%M:%S',
level=level,
**({'filename': log_file_name} if log_file_name else {}))
logging.Formatter.converter = time.gmtime
logging.info(time.strftime('%Y.%m.%d %Z', time.gmtime()))
def YamlWrite(structured_data):
"""Wrap yaml.dump to make calling convention consistent."""
return yaml.dump(structured_data, default_flow_style=False)
def YamlRead(serialized_data):
"""Wrap yaml.load to make calling convention consistent."""
return yaml.safe_load(serialized_data)
def ParseKeyValueData(pattern, data):
"""Converts structured text into a {(key, value)} dict.
Args:
pattern: A regex pattern to decode key/value pairs
data: The text to be parsed.
Returns:
A { key: value, ... } dict.
Raises:
ValueError: When the input is invalid.
"""
parsed_list = {}
for line in data.splitlines():
matched = re.match(pattern, line.strip())
if not matched:
raise ValueError('Invalid data: %s' % line)
(name, value) = (matched.group(1), matched.group(2))
if name in parsed_list:
raise ValueError('Duplicate key: %s' % name)
parsed_list[name] = value
return parsed_list
def MakeList(value):
"""Converts the given value to a list.
Returns:
A list of elements from "value" if it is iterable (except string);
otherwise, a list contains only one element.
"""
if (isinstance(value, collections.Iterable) and
not isinstance(value, basestring)):
return list(value)
return [value]
def MakeSet(value):
"""Converts the given value to a set.
Returns:
A set of elements from "value" if it is iterable (except string);
otherwise, a set contains only one element.
"""
if (isinstance(value, collections.Iterable) and
not isinstance(value, basestring)):
return set(value)
return set([value])
def CheckDictKeys(dict_to_check, allowed_keys):
"""Makes sure that a dictionary's keys are valid.
Args:
dict_to_check: A dictionary.
allowed_keys: The set of allowed keys in the dictionary.
"""
if not isinstance(dict_to_check, dict):
raise TypeError('Expected dict but found %s' % type(dict_to_check))
extra_keys = set(dict_to_check) - set(allowed_keys)
if extra_keys:
raise ValueError('Found extra keys: %s' % list(extra_keys))
class AttrDict(dict):
"""Attribute dictionary.
Use subclassed dict to store attributes. On __init__, the values inside
initial iterable will be converted to AttrDict if its type is a builtin
dict or builtin list.
Example:
foo = AttrDict()
foo['xyz'] = 'abc'
assertEqual(foo.xyz, 'abc')
bar = AttrDict({'x': {'y': 'value_x_y'},
'z': [{'m': 'value_z_0_m'}]})
assertEqual(bar.x.y, 'value_x_y')
assertEqual(bar.z[0].m, 'value_z_0_m')
"""
def _IsBuiltinDict(self, item):
return (isinstance(item, dict) and
item.__class__.__module__ == '__builtin__' and
item.__class__.__name__ == 'dict')
def _IsBuiltinList(self, item):
return (isinstance(item, list) and
item.__class__.__module__ == '__builtin__' and
item.__class__.__name__ == 'list')
def _ConvertList(self, itemlist):
converted = []
for item in itemlist:
if self._IsBuiltinDict(item):
converted.append(AttrDict(item))
elif self._IsBuiltinList(item):
converted.append(self._ConvertList(item))
else:
converted.append(item)
return converted
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
for key, value in self.iteritems():
if self._IsBuiltinDict(value):
self[key] = AttrDict(value)
elif self._IsBuiltinList(value):
self[key] = self._ConvertList(value)
self.__dict__ = self
class Singleton(type):
"""Singleton metaclass.
Set __metaclass__ to Singleton to make it a singleton class. The instances
are stored in:
Singleton._instances[CLASSNAME]
Example:
class C(object):
__metaclass__ = Singleton
foo = C()
bar = C() # foo == bar
"""
_instances = {}
def __call__(mcs, *args, **kwargs):
if mcs not in mcs._instances:
mcs._instances[mcs] = super(Singleton, mcs).__call__(*args, **kwargs)
return mcs._instances[mcs]