# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import importlib
import logging
import os
import re
import yaml
from autotest_lib.client.common_lib import error
class DeviceCapability(object):
Generate capabilities status on DUT from yaml files in a given path.
Answer from the capabilities whether some capability is satisfied on DUT.
def __init__(self, settings_path='/usr/local/etc/autotest-capability'):
@param settings_path: string, the base directory for autotest
capability. There should be yaml files.
self.capabilities = self.__get_autotest_capability(settings_path)"Capabilities:\n%r", self.capabilities)
def __get_autotest_capability(self, settings_path):
Generate and summarize capabilities from yaml files in
settings_path with detectors.
@param settings_path: string, the base directory for autotest
capability. There should be yaml files.
@returns dict:
The capabilities on DUT.
Its key is string denoting a capability. Its value is 'yes', 'no' or
def run_detector(name):
Run a detector in the detector directory. (i.e.
Return the result of the detector.
@param name: string, the name of running detector.
@returns string, a result of detect() in the detector script.
if name not in detect_results:
detector = importlib.import_module(
% name)
detect_results[name] = detector.detect()"Detector result (%s): %s",
name, detect_results[name])
return detect_results[name]
managed_cap_fpath = os.path.join(settings_path,
if not os.path.exists(managed_cap_fpath):
raise error.TestFail("%s is not installed" % managed_cap_fpath)
managed_caps = yaml.load(file(managed_cap_fpath))
cap_files = [f for f in os.listdir(settings_path)
if re.match(r'^[0-9]+-.*\.yaml$', f)]
cap_files.sort(key=lambda f: int(f.split('-', 1)[0]))
detect_results = {}
autotest_caps = dict.fromkeys(managed_caps, 'no')
for fname in cap_files:
logging.debug('Processing caps: %s', fname)
fname = os.path.join(settings_path, fname)
for rule in yaml.load(file(fname)):
# The type of rule is string or dict
# If the type is a string, it is a capability (e.g. webcam).
# If a specific condition (e.g. kepler, cpu type) is required,
# rule would be dict, for example,
# {'detector': 'intel_cpu',
# 'match': ['intel_celeron_1007U'],
# 'capabilities': ['no hw_h264_enc_1080_30'] }.
logging.debug("%r", rule)
caps = []
if isinstance(rule, dict):
if run_detector(rule['detector']) in rule['match']:
caps = rule['capabilities']
caps = [rule]
for capability in caps:
m = re.match(r'(?:(disable|no)\s+)?([\w\-]+)$', capability)
prefix, capability = m.groups()
if capability in managed_caps:
autotest_caps[capability] = prefix or 'yes'
raise error.TestFail(
"Unexpected capability: %s" % capability)
return autotest_caps
def get_managed_caps(self):
return self.capabilities.keys()
def get_capability_results(self):
return self.capabilities
def get_capability(self, cap):
Decide if a device satisfies a required capability for an autotest.
@param cap: string, denoting one capability. It must be one in
settings_path + 'managed-capabilities.yaml.'
@returns 'yes', 'no', or 'disable.'
return self.capabilities[cap]
except KeyError:
raise error.TestFail("Unexpected capability: %s" % cap)
def ensure_capability(self, cap):
Raise TestNAError if a device doesn't satisfy cap.
if self.get_capability(cap) != 'yes':
raise error.TestNAError("Missing Capability: %s" % cap)
def have_capability(self, cap):
Return whether cap is available.
return self.get_capability(cap) == 'yes'