blob: 25c5974d290389233010f93689b29f10f98740db [file] [log] [blame]
#!/usr/bin/env python2
#
# Copyright 2016 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for Instalog Event flow policy."""
from __future__ import print_function
import logging
import unittest
import instalog_common # pylint: disable=unused-import
from instalog import datatypes
from instalog import flow_policy
from instalog import log_utils
_SAMPLE_TIME = 629467932.000012
_SAMPLE_PROCESS_STAGE1 = datatypes.ProcessStage(
node_id='node_id1',
time=_SAMPLE_TIME,
plugin_id='plugin_id1',
plugin_type='plugin_type1',
target=datatypes.ProcessStage.BUFFER)
_SAMPLE_PROCESS_STAGE2 = datatypes.ProcessStage(
node_id='node_id2',
time=_SAMPLE_TIME,
plugin_id='plugin_id2',
plugin_type='plugin_type2',
target=datatypes.ProcessStage.EXTERNAL)
_SAMPLE_EVENT = datatypes.Event({})
_SAMPLE_EVENT.AppendStage(_SAMPLE_PROCESS_STAGE1)
_SAMPLE_EVENT.AppendStage(_SAMPLE_PROCESS_STAGE2)
class TestFlowPolicy(unittest.TestCase):
def testFlowPolicyRulesCreation(self):
policy = flow_policy.FlowPolicy(
allow=[{'rule': 'history', 'node_id': 'node_id2'}])
rule = flow_policy.HistoryRule(node_id='node_id2')
self.assertEqual(policy.allow[0], rule)
def testEmptyPolicyMatch(self):
policy = flow_policy.FlowPolicy()
self.assertFalse(policy.MatchEvent(_SAMPLE_EVENT))
def testAllRulePolicyMatch(self):
policy = flow_policy.FlowPolicy(allow=[{'rule': 'all'}])
self.assertTrue(policy.MatchEvent(_SAMPLE_EVENT))
def testAllowSingleMatch(self):
policy = flow_policy.FlowPolicy(
allow=[{'rule': 'history', 'node_id': 'node_id2'}])
self.assertTrue(policy.MatchEvent(_SAMPLE_EVENT))
def testAllowDoubleMatch(self):
policy = flow_policy.FlowPolicy(
allow=[{'rule': 'history', 'node_id': 'node_id2'},
{'rule': 'history', 'plugin_id': 'plugin_id'}])
self.assertTrue(policy.MatchEvent(_SAMPLE_EVENT))
def testAllowDenyMatch(self):
policy = flow_policy.FlowPolicy(
allow=[{'rule': 'history', 'node_id': 'node_id2'}],
deny=[{'rule': 'history', 'plugin_id': 'non_existent'}])
self.assertTrue(policy.MatchEvent(_SAMPLE_EVENT))
def testAllowDenyMismatch(self):
policy = flow_policy.FlowPolicy(
allow=[{'rule': 'history', 'node_id': 'node_id2'}],
deny=[{'rule': 'history', 'node_id': 'node_id1'}])
self.assertFalse(policy.MatchEvent(_SAMPLE_EVENT))
class TestHistoryRule(unittest.TestCase):
def testSingleAttribute(self):
rule = flow_policy.HistoryRule(node_id='node_id2')
self.assertTrue(rule.MatchEvent(_SAMPLE_EVENT))
def testDoubleAttributeMatch(self):
rule = flow_policy.HistoryRule(
node_id='node_id2',
plugin_id='plugin_id2')
self.assertTrue(rule.MatchEvent(_SAMPLE_EVENT))
def testDoubleAttributeMismatch(self):
rule = flow_policy.HistoryRule(
node_id='node_id2',
plugin_id='plugin_id1')
self.assertFalse(rule.MatchEvent(_SAMPLE_EVENT))
def testPositionMatch(self):
rule = flow_policy.HistoryRule(
node_id='node_id2',
position=1)
self.assertTrue(rule.MatchEvent(_SAMPLE_EVENT))
def testNegativePosition(self):
rule1 = flow_policy.HistoryRule(
node_id='node_id1',
position=-2)
self.assertTrue(rule1.MatchEvent(_SAMPLE_EVENT))
rule2 = flow_policy.HistoryRule(
node_id='node_id2',
position=-1)
self.assertTrue(rule2.MatchEvent(_SAMPLE_EVENT))
def testPositionMismatch(self):
rule = flow_policy.HistoryRule(
node_id='node_id2',
position=0)
self.assertFalse(rule.MatchEvent(_SAMPLE_EVENT))
class TestTestlogRule(unittest.TestCase):
def testMatchEvent(self):
rule = flow_policy.TestlogRule(type='station.test_run')
self.assertFalse(rule.MatchEvent(
datatypes.Event({'type': 'station.message'})))
self.assertFalse(rule.MatchEvent(
datatypes.Event({})))
self.assertTrue(rule.MatchEvent(
datatypes.Event({'type': 'station.test_run'})))
def testFlowPolicyAllow(self):
policy = flow_policy.FlowPolicy(
allow=[{'rule': 'testlog', 'type': 'station.test_run'}])
self.assertFalse(policy.MatchEvent(
datatypes.Event({'type': 'station.message'})))
self.assertFalse(policy.MatchEvent(
datatypes.Event({})))
self.assertTrue(policy.MatchEvent(
datatypes.Event({'type': 'station.test_run'})))
def testFlowPolicyDeny(self):
policy = flow_policy.FlowPolicy(
allow=[{'rule': 'all'}],
deny=[{'rule': 'testlog', 'type': 'station.message'},
{'rule': 'testlog', 'type': 'station.status'}])
self.assertFalse(policy.MatchEvent(
datatypes.Event({'type': 'station.message'})))
self.assertFalse(policy.MatchEvent(
datatypes.Event({'type': 'station.status'})))
self.assertTrue(policy.MatchEvent(
datatypes.Event({})))
self.assertTrue(policy.MatchEvent(
datatypes.Event({'type': 'station.test_run'})))
if __name__ == '__main__':
log_utils.InitLogging(log_utils.GetStreamHandler(logging.INFO))
unittest.main()