blob: 3db617ae5fdbd6af2b15ff98e4ba2457b83515ad [file] [log] [blame]
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import re
from six import iteritems
import factory_common # pylint: disable=unused-import
from cros.factory.probe import function
from cros.factory.utils.arg_utils import Arg
class MatchFunction(function.Function):
"""Filter the results which does not match the rule.
Description
-----------
The rule might be a dict or a string. A result is matched if every value of
the rule is matched. If the rule is a string, then the matched result should
only contain one item and the value is matched to the string.
If the string starts with ``!re``, then the remaining string is treated as a
regular expression.
If the string starts with ``!num``, the probed value will be treated as
floating point number, and the remaining of rule string should be
``< '==' | '>' | '<' | '>=' | '<=' | '!=' > ' ' NUMBER``, e.g.
``!num >= 10``.
Otherwise, the value of the result should be the same.
Examples
--------
This function is used by the probe framework itself to filter the outputs
of the evaluated functions by ``expect`` field in the probe statement.
Below is a probe statement which simply asks the probe framework to output
all usb devices::
{
"eval": "usb"
}
Let's assume the probed output is::
[
{
"idVendor": "01ab",
"idProduct": "1122",
...
},
{
"idVendor": "01ac",
"idProduct": "3344",
...
},
{
"idVendor": "23cd",
"idProduct": "3344",
...
}
]
If we modify the probe statement to::
{
"eval": "usb",
"expect": {
"idVendor": "01ab"
}
}
, then the probed results will become::
[
{
"idVendor": "01ab",
"idProduct": "1122",
...
}
]
We can also use the regular expression described in above. For example,
if we modify ``expect`` field in the probe statement to::
"expect": {
"idVendor": "!re ^01.*$"
}
, then the probed results will contain both the item with ``idVendor=01ab``
and the item with ``idVendor=01ac``.
"""
REGEX_PREFIX = '!re'
NUM_CMP_PREFIX = '!num'
NUM_CMP_OPERATOR = {
'==': lambda a, b: a == b,
'>': lambda a, b: a > b,
'<': lambda a, b: a < b,
'>=': lambda a, b: a >= b,
'<=': lambda a, b: a <= b,
'!=': lambda a, b: a != b,
}
ARGS = [
Arg('rule', (str, dict), 'The matched rule.')]
def __init__(self, **kwargs):
super(MatchFunction, self).__init__(**kwargs)
self.is_dict = isinstance(self.args.rule, dict)
if self.is_dict:
self.rule = {key: self.ConstructRule(value)
for key, value in iteritems(self.args.rule)}
else:
self.rule = self.ConstructRule(self.args.rule)
def Apply(self, data):
results = list(filter(self.Match, data))
return [{'values': res} for res in results]
def Match(self, item):
def _Match(matcher, value):
return matcher(value)
if not self.is_dict:
return len(item) == 1 and _Match(self.rule, next(iter(item.values())))
else:
return all([key in item and _Match(rule, item[key])
for key, rule in iteritems(self.rule)])
@classmethod
def ConstructRule(cls, rule):
assert isinstance(rule, str)
transformers = {
cls.REGEX_PREFIX: cls.TryTransferRegex,
cls.NUM_CMP_PREFIX: cls.TryTransferNumberCompare,
}
prefix, unused_sep, rest = rule.partition(' ')
if prefix in transformers:
return transformers[prefix](rest)
else:
return lambda v: v == rule
@classmethod
def TryTransferRegex(cls, value):
regexp = re.compile(value)
def matcher(v):
try:
return regexp.match(v) is not None
except TypeError:
return False
return matcher
@classmethod
def TryTransferNumberCompare(cls, value):
op, unused_sep, num = value.partition(' ')
num = float(num)
if op not in cls.NUM_CMP_OPERATOR:
raise ValueError('invalid operator %s' % op)
def matcher(v):
try:
v = float(v)
except ValueError:
return False
return cls.NUM_CMP_OPERATOR[op](v, num)
return matcher