blob: 31b4099f5a207eaed260346d527b89340e9c7ddb [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for the parser module."""
import os
import parser # pylint: disable=wrong-import-order
import shutil
import tempfile
import unittest
from importlib import resources
import arch
import bpf
ARCH_64 = arch.Arch.load_from_json_bytes(
resources.files("testdata").joinpath("arch_64.json").read_bytes()
)
class TokenizerTests(unittest.TestCase):
"""Tests for ParserState.tokenize."""
@staticmethod
def _tokenize(line):
parser_state = parser.ParserState("<memory>")
return list(parser_state.tokenize([line]))[0]
def test_tokenize(self):
"""Accept valid tokens."""
self.assertEqual(
[
(token.type, token.value)
for token in TokenizerTests._tokenize(
"@include /minijail.policy"
)
],
[
("INCLUDE", "@include"),
("PATH", "/minijail.policy"),
],
)
self.assertEqual(
[
(token.type, token.value)
for token in TokenizerTests._tokenize(
"@include ./minijail.policy"
)
],
[
("INCLUDE", "@include"),
("PATH", "./minijail.policy"),
],
)
self.assertEqual(
[
(token.type, token.value)
for token in TokenizerTests._tokenize(
"read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0755; "
"return ENOSYS # ignored"
)
],
[
("IDENTIFIER", "read"),
("COLON", ":"),
("ARGUMENT", "arg0"),
("OP", "in"),
("BITWISE_COMPLEMENT", "~"),
("NUMERIC_CONSTANT", "0xffff"),
("OR", "||"),
("ARGUMENT", "arg0"),
("OP", "&"),
("LPAREN", "("),
("NUMERIC_CONSTANT", "1"),
("BITWISE_OR", "|"),
("NUMERIC_CONSTANT", "2"),
("RPAREN", ")"),
("AND", "&&"),
("ARGUMENT", "arg0"),
("OP", "=="),
("NUMERIC_CONSTANT", "0755"),
("SEMICOLON", ";"),
("RETURN", "return"),
("IDENTIFIER", "ENOSYS"),
],
)
# Ensure that tokens that have an otherwise valid token as prefix are
# still matched correctly.
self.assertEqual(
[
(token.type, token.value)
for token in TokenizerTests._tokenize(
"inotify_wait return_sys killall trace_sys"
)
],
[
("IDENTIFIER", "inotify_wait"),
("IDENTIFIER", "return_sys"),
("IDENTIFIER", "killall"),
("IDENTIFIER", "trace_sys"),
],
)
def test_tokenize_invalid_token(self):
"""Reject tokenizer errors."""
with self.assertRaisesRegex(
parser.ParseException,
(
r"<memory>\(1:1\): invalid token\n"
r" %invalid-token%\n"
r" \^"
),
):
TokenizerTests._tokenize("%invalid-token%")
class ParseConstantTests(unittest.TestCase):
"""Tests for PolicyParser.parse_value."""
def setUp(self):
self.arch = ARCH_64
self.parser = parser.PolicyParser(
self.arch, kill_action=bpf.KillProcess()
)
def _tokenize(self, line):
# pylint: disable=protected-access
return list(self.parser._parser_state.tokenize([line]))[0]
def test_parse_constant_unsigned(self):
"""Accept reasonably-sized unsigned constants."""
self.assertEqual(
self.parser.parse_value(self._tokenize("0x80000000")), 0x80000000
)
if self.arch.bits == 64:
self.assertEqual(
self.parser.parse_value(self._tokenize("0x8000000000000000")),
0x8000000000000000,
)
def test_parse_constant_unsigned_too_big(self):
"""Reject unreasonably-sized unsigned constants."""
if self.arch.bits == 32:
with self.assertRaisesRegex(
parser.ParseException, "unsigned overflow"
):
self.parser.parse_value(self._tokenize("0x100000000"))
with self.assertRaisesRegex(parser.ParseException, "unsigned overflow"):
self.parser.parse_value(self._tokenize("0x10000000000000000"))
def test_parse_constant_signed(self):
"""Accept reasonably-sized signed constants."""
self.assertEqual(
self.parser.parse_value(self._tokenize("-1")),
self.arch.max_unsigned,
)
def test_parse_constant_signed_too_negative(self):
"""Reject unreasonably-sized signed constants."""
if self.arch.bits == 32:
with self.assertRaisesRegex(
parser.ParseException, "signed underflow"
):
self.parser.parse_value(self._tokenize("-0x800000001"))
with self.assertRaisesRegex(parser.ParseException, "signed underflow"):
self.parser.parse_value(self._tokenize("-0x8000000000000001"))
def test_parse_mask(self):
"""Accept parsing a mask value."""
self.assertEqual(
self.parser.parse_value(self._tokenize("0x1|0x2|0x4|0x8")), 0xF
)
def test_parse_parenthesized_expressions(self):
"""Accept parsing parenthesized expressions."""
bad_expressions = [
"(1",
"|(1)",
"(1)|",
"()",
"(",
"((",
"(()",
"(()1",
]
for expression in bad_expressions:
with self.assertRaises(parser.ParseException, msg=expression):
self.parser.parse_value(self._tokenize(expression))
bad_partial_expressions = [
"1)",
"(1)1",
"1(0)",
]
for expression in bad_partial_expressions:
tokens = self._tokenize(expression)
self.parser.parse_value(tokens)
self.assertNotEqual(tokens, [])
good_expressions = [
"(3)",
"(1)|2",
"1|(2)",
"(1)|(2)",
"((3))",
"0|(1|2)",
"(0|1|2)",
]
for expression in good_expressions:
self.assertEqual(
self.parser.parse_value(self._tokenize(expression)), 3
)
def test_parse_constant_complements(self):
"""Accept complementing constants."""
self.assertEqual(
self.parser.parse_value(self._tokenize("~0")),
self.arch.max_unsigned,
)
self.assertEqual(
self.parser.parse_value(self._tokenize("~0|~0")),
self.arch.max_unsigned,
)
if self.arch.bits == 32:
self.assertEqual(
self.parser.parse_value(
self._tokenize("~0x005AF0FF|~0xFFA50FFF")
),
0xFFFFFF00,
)
self.assertEqual(
self.parser.parse_value(
self._tokenize("0x0F|~(0x005AF000|0x00A50FFF)|0xF0")
),
0xFF0000FF,
)
else:
self.assertEqual(
self.parser.parse_value(
self._tokenize("~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF")
),
0xFFFFFFFFFFFF0000,
)
self.assertEqual(
self.parser.parse_value(
self._tokenize(
"0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00"
)
),
0xFFFF00000000FFFF,
)
def test_parse_double_complement(self):
"""Reject double-complementing constants."""
with self.assertRaisesRegex(parser.ParseException, "double complement"):
self.parser.parse_value(self._tokenize("~~0"))
def test_parse_empty_complement(self):
"""Reject complementing nothing."""
with self.assertRaisesRegex(parser.ParseException, "empty complement"):
self.parser.parse_value(self._tokenize("0|~"))
def test_parse_named_constant(self):
"""Accept parsing a named constant."""
self.assertEqual(self.parser.parse_value(self._tokenize("O_RDONLY")), 0)
def test_parse_empty_constant(self):
"""Reject parsing nothing."""
with self.assertRaisesRegex(parser.ParseException, "empty constant"):
self.parser.parse_value([])
with self.assertRaisesRegex(parser.ParseException, "empty constant"):
self.parser.parse_value(self._tokenize("0|"))
def test_parse_invalid_constant(self):
"""Reject parsing invalid constants."""
with self.assertRaisesRegex(parser.ParseException, "invalid constant"):
self.parser.parse_value(self._tokenize("foo"))
class ParseFilterExpressionTests(unittest.TestCase):
"""Tests for PolicyParser.parse_argument_expression."""
def setUp(self):
self.arch = ARCH_64
self.parser = parser.PolicyParser(
self.arch, kill_action=bpf.KillProcess()
)
def _tokenize(self, line):
# pylint: disable=protected-access
return list(self.parser._parser_state.tokenize([line]))[0]
def test_parse_argument_expression(self):
"""Accept valid argument expressions."""
self.assertEqual(
self.parser.parse_argument_expression(
self._tokenize(
"arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE"
)
),
[
[parser.Atom(0, "in", 0xFFFF)],
[parser.Atom(0, "==", 4), parser.Atom(1, "==", 2)],
],
)
def test_parse_number_argument_expression(self):
"""Accept argument expressions with any octal/decimal/hex number."""
# 4607 == 010777 == 0x11ff
self.assertEqual(
self.parser.parse_argument_expression(
self._tokenize("arg0 in 4607")
),
[
[parser.Atom(0, "in", 4607)],
],
)
self.assertEqual(
self.parser.parse_argument_expression(
self._tokenize("arg0 in 010777")
),
[
[parser.Atom(0, "in", 4607)],
],
)
self.assertEqual(
self.parser.parse_argument_expression(
self._tokenize("arg0 in 0x11ff")
),
[
[parser.Atom(0, "in", 4607)],
],
)
def test_parse_empty_argument_expression(self):
"""Reject empty argument expressions."""
with self.assertRaisesRegex(
parser.ParseException, "empty argument expression"
):
self.parser.parse_argument_expression(
self._tokenize("arg0 in 0xffff ||")
)
def test_parse_empty_clause(self):
"""Reject empty clause."""
with self.assertRaisesRegex(parser.ParseException, "empty clause"):
self.parser.parse_argument_expression(
self._tokenize("arg0 in 0xffff &&")
)
def test_parse_invalid_argument(self):
"""Reject invalid argument."""
with self.assertRaisesRegex(parser.ParseException, "invalid argument"):
self.parser.parse_argument_expression(
self._tokenize("argX in 0xffff")
)
def test_parse_invalid_operator(self):
"""Reject invalid operator."""
with self.assertRaisesRegex(parser.ParseException, "invalid operator"):
self.parser.parse_argument_expression(
self._tokenize("arg0 = 0xffff")
)
def test_parse_missing_operator(self):
"""Reject missing operator."""
with self.assertRaisesRegex(parser.ParseException, "missing operator"):
self.parser.parse_argument_expression(self._tokenize("arg0"))
def test_parse_missing_operand(self):
"""Reject missing operand."""
with self.assertRaisesRegex(parser.ParseException, "empty constant"):
self.parser.parse_argument_expression(self._tokenize("arg0 =="))
class ParseFilterTests(unittest.TestCase):
"""Tests for PolicyParser.parse_filter."""
def setUp(self):
self.arch = ARCH_64
self.parser = parser.PolicyParser(
self.arch, kill_action=bpf.KillProcess()
)
def _tokenize(self, line):
# pylint: disable=protected-access
return list(self.parser._parser_state.tokenize([line]))[0]
def test_parse_filter(self):
"""Accept valid filters."""
self.assertEqual(
self.parser.parse_filter(self._tokenize("arg0 == 0")),
[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("kill-process")),
[
parser.Filter(None, bpf.KillProcess()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("kill-thread")),
[
parser.Filter(None, bpf.KillThread()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("trap")),
[
parser.Filter(None, bpf.Trap()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("return ENOSYS")),
[
parser.Filter(
None, bpf.ReturnErrno(self.arch.constants["ENOSYS"])
),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("trace")),
[
parser.Filter(None, bpf.Trace()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("user-notify")),
[
parser.Filter(None, bpf.UserNotify()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("log")),
[
parser.Filter(None, bpf.Log()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("allow")),
[
parser.Filter(None, bpf.Allow()),
],
)
self.assertEqual(
self.parser.parse_filter(self._tokenize("1")),
[
parser.Filter(None, bpf.Allow()),
],
)
self.assertEqual(
self.parser.parse_filter(
self._tokenize("{ arg0 == 0, arg0 == 1; return ENOSYS, trap }")
),
[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
parser.Filter(
[[parser.Atom(0, "==", 1)]],
bpf.ReturnErrno(self.arch.constants["ENOSYS"]),
),
parser.Filter(None, bpf.Trap()),
],
)
def test_parse_missing_return_value(self):
"""Reject missing return value."""
with self.assertRaisesRegex(
parser.ParseException, "missing return value"
):
self.parser.parse_filter(self._tokenize("return"))
def test_parse_invalid_return_value(self):
"""Reject invalid return value."""
with self.assertRaisesRegex(parser.ParseException, "invalid constant"):
self.parser.parse_filter(self._tokenize("return arg0"))
def test_parse_unclosed_brace(self):
"""Reject unclosed brace."""
with self.assertRaisesRegex(parser.ParseException, "unclosed brace"):
self.parser.parse_filter(self._tokenize("{ allow"))
class ParseFilterDenylistTests(unittest.TestCase):
"""Tests for PolicyParser.parse_filter with a denylist policy."""
def setUp(self):
self.arch = ARCH_64
self.kill_action = bpf.KillProcess()
self.parser = parser.PolicyParser(
self.arch, kill_action=self.kill_action, denylist=True
)
def _tokenize(self, line):
# pylint: disable=protected-access
return list(self.parser._parser_state.tokenize([line]))[0]
def test_parse_filter(self):
"""Accept only filters that return an errno."""
self.assertEqual(
self.parser.parse_filter(
self._tokenize("arg0 == 0; return ENOSYS")
),
[
parser.Filter(
[[parser.Atom(0, "==", 0)]],
bpf.ReturnErrno(self.arch.constants["ENOSYS"]),
),
],
)
class ParseFilterStatementTests(unittest.TestCase):
"""Tests for PolicyParser.parse_filter_statement."""
def setUp(self):
self.arch = ARCH_64
self.parser = parser.PolicyParser(
self.arch, kill_action=bpf.KillProcess()
)
def _tokenize(self, line):
# pylint: disable=protected-access
return list(self.parser._parser_state.tokenize([line]))[0]
def assertEqualIgnoringToken(self, actual, expected, msg=None):
"""Similar to assertEqual, but ignores the token field."""
if (
actual.syscalls != expected.syscalls
or actual.filters != expected.filters
):
self.fail("%r != %r: %s" % (actual, expected, msg))
def test_parse_filter_statement(self):
"""Accept valid filter statements."""
self.assertEqualIgnoringToken(
self.parser.parse_filter_statement(
self._tokenize("read: arg0 == 0")
),
parser.ParsedFilterStatement(
syscalls=(parser.Syscall("read", 0),),
filters=[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
token=None,
),
)
self.assertEqualIgnoringToken(
self.parser.parse_filter_statement(
self._tokenize("{read, write}: arg0 == 0")
),
parser.ParsedFilterStatement(
syscalls=(
parser.Syscall("read", 0),
parser.Syscall("write", 1),
),
filters=[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
token=None,
),
)
self.assertEqualIgnoringToken(
self.parser.parse_filter_statement(
self._tokenize("io@libc: arg0 == 0")
),
parser.ParsedFilterStatement(
syscalls=(
parser.Syscall("read", 0),
parser.Syscall("write", 1),
),
filters=[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
token=None,
),
)
self.assertEqualIgnoringToken(
self.parser.parse_filter_statement(
self._tokenize("file-io@systemd: arg0 == 0")
),
parser.ParsedFilterStatement(
syscalls=(
parser.Syscall("read", 0),
parser.Syscall("write", 1),
),
filters=[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
token=None,
),
)
self.assertEqualIgnoringToken(
self.parser.parse_filter_statement(
self._tokenize("kill: arg0 == 0")
),
parser.ParsedFilterStatement(
syscalls=(parser.Syscall("kill", 62),),
filters=[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
token=None,
),
)
def test_parse_metadata(self):
"""Accept valid filter statements with metadata."""
self.assertEqualIgnoringToken(
self.parser.parse_filter_statement(
self._tokenize("read[arch=test]: arg0 == 0")
),
parser.ParsedFilterStatement(
syscalls=(parser.Syscall("read", 0),),
filters=[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
token=None,
),
)
self.assertEqualIgnoringToken(
self.parser.parse_filter_statement(
self._tokenize(
"{read, nonexistent[arch=nonexistent]}: arg0 == 0"
)
),
parser.ParsedFilterStatement(
syscalls=(parser.Syscall("read", 0),),
filters=[
parser.Filter([[parser.Atom(0, "==", 0)]], bpf.Allow()),
],
token=None,
),
)
def test_parse_unclosed_brace(self):
"""Reject unclosed brace."""
with self.assertRaisesRegex(parser.ParseException, "unclosed brace"):
self.parser.parse_filter(self._tokenize("{ allow"))
def test_parse_invalid_syscall_group(self):
"""Reject invalid syscall groups."""
with self.assertRaisesRegex(parser.ParseException, "unclosed brace"):
self.parser.parse_filter_statement(
self._tokenize("{ read, write: arg0 == 0")
)
def test_parse_missing_colon(self):
"""Reject missing colon."""
with self.assertRaisesRegex(parser.ParseException, "missing colon"):
self.parser.parse_filter_statement(self._tokenize("read"))
def test_parse_invalid_colon(self):
"""Reject invalid colon."""
with self.assertRaisesRegex(parser.ParseException, "invalid colon"):
self.parser.parse_filter_statement(self._tokenize("read arg0"))
def test_parse_missing_filter(self):
"""Reject missing filter."""
with self.assertRaisesRegex(parser.ParseException, "missing filter"):
self.parser.parse_filter_statement(self._tokenize("read:"))
class ParseFileTests(unittest.TestCase):
"""Tests for PolicyParser.parse_file."""
def setUp(self):
self.arch = ARCH_64
self.parser = parser.PolicyParser(
self.arch, kill_action=bpf.KillProcess()
)
self.tempdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tempdir)
def _write_file(self, filename, contents):
"""Helper to write out a file for testing."""
path = os.path.join(self.tempdir, filename)
with open(path, "w", encoding="utf-8") as outf:
outf.write(contents)
return path
def test_parse_simple(self):
"""Allow simple policy files."""
path = self._write_file(
"test.policy",
"""
# Comment.
read: allow
write: allow
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.KillProcess(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
parser.FilterStatement(
syscall=parser.Syscall("write", 1),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
],
),
)
def test_parse_multiline(self):
"""Allow simple multi-line policy files."""
path = self._write_file(
"test.policy",
"""
# Comment.
read: \
allow
write: allow
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.KillProcess(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
parser.FilterStatement(
syscall=parser.Syscall("write", 1),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
],
),
)
def test_parse_default(self):
"""Allow defining a default action."""
path = self._write_file(
"test.policy",
"""
@default kill-thread
read: allow
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.KillThread(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
],
),
)
def test_parse_default_permissive(self):
"""Reject defining a permissive default action."""
path = self._write_file(
"test.policy",
"""
@default log
read: allow
""",
)
with self.assertRaisesRegex(
parser.ParseException, r"invalid permissive default action"
):
self.parser.parse_file(path)
def test_parse_simple_grouped(self):
"""Allow simple policy files."""
path = self._write_file(
"test.policy",
"""
# Comment.
{read, write}: allow
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.KillProcess(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
parser.FilterStatement(
syscall=parser.Syscall("write", 1),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
],
),
)
def test_parse_other_arch(self):
"""Allow entries that only target another architecture."""
path = self._write_file(
"test.policy",
"""
# Comment.
read[arch=nonexistent]: allow
write: allow
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.KillProcess(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("write", 1),
frequency=1,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
],
),
)
def test_parse_include(self):
"""Allow including policy files."""
path = self._write_file(
"test.include.policy",
"""
{read, write}: arg0 == 0; allow
""",
)
path = self._write_file(
"test.policy",
"""
@include ./test.include.policy
read: return ENOSYS
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.KillProcess(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=1,
filters=[
parser.Filter(
[[parser.Atom(0, "==", 0)]], bpf.Allow()
),
parser.Filter(
None,
bpf.ReturnErrno(self.arch.constants["ENOSYS"]),
),
],
),
parser.FilterStatement(
syscall=parser.Syscall("write", 1),
frequency=1,
filters=[
parser.Filter(
[[parser.Atom(0, "==", 0)]], bpf.Allow()
),
parser.Filter(None, bpf.KillProcess()),
],
),
],
),
)
def test_parse_invalid_include(self):
"""Reject including invalid policy files."""
with self.assertRaisesRegex(
parser.ParseException, r"empty include path"
):
path = self._write_file(
"test.policy",
"""
@include
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(
parser.ParseException, r"invalid include path"
):
path = self._write_file(
"test.policy",
"""
@include arg0
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(
parser.ParseException, r"@include statement nested too deep"
):
path = self._write_file(
"test.policy",
"""
@include ./test.policy
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(
parser.ParseException, r"Could not @include .*"
):
path = self._write_file(
"test.policy",
"""
@include ./nonexistent.policy
""",
)
self.parser.parse_file(path)
def test_parse_frequency(self):
"""Allow including frequency files."""
self._write_file(
"test.frequency",
"""
read: 2
write: 3
""",
)
path = self._write_file(
"test.policy",
"""
@frequency ./test.frequency
read: allow
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.KillProcess(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=2,
filters=[
parser.Filter(None, bpf.Allow()),
],
),
],
),
)
def test_parse_invalid_frequency(self):
"""Reject including invalid frequency files."""
path = self._write_file(
"test.policy", """@frequency ./test.frequency"""
)
with self.assertRaisesRegex(parser.ParseException, r"missing colon"):
self._write_file(
"test.frequency",
"""
read
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(parser.ParseException, r"invalid colon"):
self._write_file(
"test.frequency",
"""
read foo
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(parser.ParseException, r"missing number"):
self._write_file(
"test.frequency",
"""
read:
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(parser.ParseException, r"invalid number"):
self._write_file(
"test.frequency",
"""
read: foo
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(parser.ParseException, r"invalid number"):
self._write_file(
"test.frequency",
"""
read: -1
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(
parser.ParseException, r"empty frequency path"
):
path = self._write_file(
"test.policy",
"""
@frequency
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(
parser.ParseException, r"invalid frequency path"
):
path = self._write_file(
"test.policy",
"""
@frequency arg0
""",
)
self.parser.parse_file(path)
with self.assertRaisesRegex(
parser.ParseException, r"Could not open frequency file.*"
):
path = self._write_file(
"test.policy",
"""
@frequency ./nonexistent.frequency
""",
)
self.parser.parse_file(path)
def test_parse_multiple_unconditional(self):
"""Reject actions after an unconditional action."""
path = self._write_file(
"test.policy",
"""
read: allow
read: allow
""",
)
with self.assertRaisesRegex(
parser.ParseException,
(
r"test.policy\(3:17\): "
r"Syscall read.*already had an unconditional action "
r"applied"
),
):
self.parser.parse_file(path)
path = self._write_file(
"test.policy",
"""
read: log
read: arg0 == 0; log
""",
)
with self.assertRaisesRegex(
parser.ParseException,
(
r"test.policy\(3:17\): "
r"Syscall read.*already had an unconditional action "
r"applied"
),
):
self.parser.parse_file(path)
def test_parse_allowlist_denylist_header(self):
"""Reject trying to compile denylist policy file as allowlist."""
with self.assertRaisesRegex(
parser.ParseException,
r"policy is denylist, but flag --denylist " "not passed in",
):
path = self._write_file(
"test.policy",
"""
@denylist
""",
)
self.parser.parse_file(path)
class ParseFileDenylistTests(unittest.TestCase):
"""Tests for PolicyParser.parse_file."""
def setUp(self):
self.arch = ARCH_64
self.kill_action = bpf.KillProcess()
self.parser = parser.PolicyParser(
self.arch, kill_action=self.kill_action, denylist=True
)
self.tempdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tempdir)
def _write_file(self, filename, contents):
"""Helper to write out a file for testing."""
path = os.path.join(self.tempdir, filename)
with open(path, "w", encoding="utf-8") as outf:
outf.write(contents)
return path
def test_parse_simple(self):
"""Allow simple denylist policy files."""
path = self._write_file(
"test.policy",
"""
# Comment.
@denylist
read: return ENOSYS
write: return ENOSYS
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.Allow(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=1,
filters=[
parser.Filter(
None,
bpf.ReturnErrno(self.arch.constants["ENOSYS"]),
),
],
),
parser.FilterStatement(
syscall=parser.Syscall("write", 1),
frequency=1,
filters=[
parser.Filter(
None,
bpf.ReturnErrno(self.arch.constants["ENOSYS"]),
),
],
),
],
),
)
def test_parse_simple_with_arg(self):
"""Allow simple denylist policy files."""
path = self._write_file(
"test.policy",
"""
# Comment.
@denylist
read: return ENOSYS
write: arg0 == 0 ; return ENOSYS
""",
)
self.assertEqual(
self.parser.parse_file(path),
parser.ParsedPolicy(
default_action=bpf.Allow(),
filter_statements=[
parser.FilterStatement(
syscall=parser.Syscall("read", 0),
frequency=1,
filters=[
parser.Filter(
None,
bpf.ReturnErrno(self.arch.constants["ENOSYS"]),
),
],
),
parser.FilterStatement(
syscall=parser.Syscall("write", 1),
frequency=1,
filters=[
parser.Filter(
[[parser.Atom(0, "==", 0)]],
bpf.ReturnErrno(self.arch.constants["ENOSYS"]),
),
parser.Filter(None, bpf.Allow()),
],
),
],
),
)
def test_parse_denylist_no_header(self):
"""Reject trying to compile denylist policy file as allowlist."""
with self.assertRaisesRegex(
parser.ParseException,
r"policy must contain @denylist flag to be "
"compiled with --denylist flag",
):
path = self._write_file(
"test.policy",
"""
read: return ENOSYS
""",
)
self.parser.parse_file(path)
if __name__ == "__main__":
unittest.main()