blob: 109db6de67cabb28fa1d3d63b054b93ee226142f [file] [log] [blame]
# Copyright 2015 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
r"""Rules parser.
The input syntax is:
[{"comment": ignored_value},
{"rule_class_name1": {"arg1": value, "arg2": value, ...}},
{"rule_class_name2": {"arg1": value, "arg2": value, ...}},
...]
E.g.:
[{"comment": "this text is ignored"},
{"SendStatus": {"url": "example\\.com/ss.*", "status": 204}},
{"ModifyUrl": {"url": "(example\\.com)(/.*)", "new_url": "{1}"}}
]
"""
import json
import re
class Error(Exception):
pass
class Rules(object):
"""A parsed sequence of Rule objects."""
def __init__(self, file_obj=None, allowed_imports=None):
"""Initializes from the given file object.
Args:
file_obj: A file object.
allowed_imports: A set of strings, defaults to {'rules'}.
Use {'*'} to allow any import path.
"""
if allowed_imports is None:
allowed_imports = {'rules'}
self._rules = [] if file_obj is None else _Load(file_obj, allowed_imports)
def Contains(self, rule_type_name):
"""Returns true if any rule matches the given type name.
Args:
rule_type_name: a string.
Returns:
True if any rule matches, else False.
"""
return any(rule for rule in self._rules if rule.IsType(rule_type_name))
def Find(self, rule_type_name):
"""Returns a _Rule object containing all rules with the given type name.
Args:
rule_type_name: a string.
Returns:
A callable object that expects two arguments:
request: the httparchive ArchivedHttpRequest
response: the httparchive ArchivedHttpResponse
and returns the rule return_value of the first rule that returns
should_stop == True, or the last rule's return_value if all rules returns
should_stop == False.
"""
matches = [rule for rule in self._rules if rule.IsType(rule_type_name)]
return _Rule(matches)
def __str__(self):
return _ToString(self._rules)
def __repr__(self):
return str(self)
class _Rule(object):
"""Calls a sequence of Rule objects until one returns should_stop."""
def __init__(self, rules):
self._rules = rules
def __call__(self, request, response):
"""Calls the rules until one returns should_stop.
Args:
request: the httparchive ArchivedHttpRequest.
response: the httparchive ArchivedHttpResponse, which may be None.
Returns:
The rule return_value of the first rule that returns should_stop == True,
or the last rule's return_value if all rules return should_stop == False.
"""
return_value = None
for rule in self._rules:
should_stop, return_value = rule.ApplyRule(
return_value, request, response)
if should_stop:
break
return return_value
def __str__(self):
return _ToString(self._rules)
def __repr__(self):
return str(self)
def _ToString(rules):
"""Formats a sequence of Rule objects into a string."""
return '[\n%s\n]' % '\n'.join('%s' % rule for rule in rules)
def _Load(file_obj, allowed_imports):
"""Parses and evaluates all rules in the given file.
Args:
file_obj: a file object.
allowed_imports: a sequence of strings, e.g.: {'rules'}.
Returns:
a list of rules.
"""
rules = []
entries = json.load(file_obj)
if not isinstance(entries, list):
raise Error('Expecting a list, not %s', type(entries))
for i, entry in enumerate(entries):
if not isinstance(entry, dict):
raise Error('%s: Expecting a dict, not %s', i, type(entry))
if len(entry) != 1:
raise Error('%s: Expecting 1 item, not %d', i, len(entry))
name, args = next(entry.iteritems())
if not isinstance(name, basestring):
raise Error('%s: Expecting a string TYPE, not %s', i, type(name))
if not re.match(r'(\w+\.)*\w+$', name):
raise Error('%s: Expecting a classname TYPE, not %s', i, name)
if name == 'comment':
continue
if not isinstance(args, dict):
raise Error('%s: Expecting a dict ARGS, not %s', i, type(args))
fullname = str(name)
if '.' not in fullname:
fullname = 'rules.%s' % fullname
modulename, classname = fullname.rsplit('.', 1)
if '*' not in allowed_imports and modulename not in allowed_imports:
raise Error('%s: Package %r is not in allowed_imports', i, modulename)
module = __import__(modulename, fromlist=[classname])
clazz = getattr(module, classname)
missing = {s for s in ('IsType', 'ApplyRule') if not hasattr(clazz, s)}
if missing:
raise Error('%s: %s lacks %s', i, clazz.__name__, ' and '.join(missing))
rule = clazz(**args)
rules.append(rule)
return rules