| import dis |
| import os.path |
| import re |
| import subprocess |
| import sys |
| import sysconfig |
| import types |
| import unittest |
| |
| from test import support |
| from test.support import findfile, MS_WINDOWS |
| |
| |
| if not support.has_subprocess_support: |
| raise unittest.SkipTest("test module requires subprocess") |
| |
| |
| def abspath(filename): |
| return os.path.abspath(findfile(filename, subdir="dtracedata")) |
| |
| |
| def normalize_trace_output(output): |
| """Normalize DTrace output for comparison. |
| |
| DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers |
| are concatenated. So if the operating system moves our thread around, the |
| straight result can be "non-causal". So we add timestamps to the probe |
| firing, sort by that field, then strip it from the output""" |
| |
| # When compiling with '--with-pydebug', strip '[# refs]' debug output. |
| output = re.sub(r"\[[0-9]+ refs\]", "", output) |
| try: |
| result = [ |
| row.split("\t") |
| for row in output.splitlines() |
| if row and not row.startswith('#') and not row.startswith('@') |
| ] |
| result.sort(key=lambda row: int(row[0])) |
| result = [row[1] for row in result] |
| # Normalize paths to basenames (bpftrace outputs full paths) |
| normalized = [] |
| for line in result: |
| # Replace full paths with just the filename |
| line = re.sub(r'/[^:]+/([^/:]+\.py)', r'\1', line) |
| normalized.append(line) |
| return "\n".join(normalized) |
| except (IndexError, ValueError): |
| raise AssertionError( |
| "tracer produced unparsable output:\n{}".format(output) |
| ) |
| |
| |
| class TraceBackend: |
| EXTENSION = None |
| COMMAND = None |
| COMMAND_ARGS = [] |
| |
| def run_case(self, name, optimize_python=None): |
| actual_output = normalize_trace_output(self.trace_python( |
| script_file=abspath(name + self.EXTENSION), |
| python_file=abspath(name + ".py"), |
| optimize_python=optimize_python)) |
| |
| with open(abspath(name + self.EXTENSION + ".expected")) as f: |
| expected_output = f.read().rstrip() |
| |
| return (expected_output, actual_output) |
| |
| def generate_trace_command(self, script_file, subcommand=None): |
| command = self.COMMAND + [script_file] |
| if subcommand: |
| command += ["-c", subcommand] |
| return command |
| |
| def trace(self, script_file, subcommand=None): |
| command = self.generate_trace_command(script_file, subcommand) |
| stdout, _ = subprocess.Popen(command, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| universal_newlines=True).communicate() |
| return stdout |
| |
| def trace_python(self, script_file, python_file, optimize_python=None): |
| python_flags = [] |
| if optimize_python: |
| python_flags.extend(["-O"] * optimize_python) |
| subcommand = " ".join([sys.executable] + python_flags + [python_file]) |
| return self.trace(script_file, subcommand) |
| |
| def assert_usable(self): |
| try: |
| output = self.trace(abspath("assert_usable" + self.EXTENSION)) |
| output = output.strip() |
| except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe: |
| output = str(fnfe) |
| if output != "probe: success": |
| raise unittest.SkipTest( |
| "{}(1) failed: {}".format(self.COMMAND[0], output) |
| ) |
| |
| |
| class DTraceBackend(TraceBackend): |
| EXTENSION = ".d" |
| COMMAND = ["dtrace", "-q", "-s"] |
| if sys.platform == "sunos5": |
| COMMAND.insert(2, "-Z") |
| |
| |
| class SystemTapBackend(TraceBackend): |
| EXTENSION = ".stp" |
| COMMAND = ["stap", "-g"] |
| |
| |
| class BPFTraceBackend(TraceBackend): |
| EXTENSION = ".bt" |
| COMMAND = ["bpftrace"] |
| |
| # Inline bpftrace programs for each test case |
| PROGRAMS = { |
| "call_stack": """ |
| usdt:{python}:python:function__entry {{ |
| printf("%lld\\tfunction__entry:%s:%s:%d\\n", |
| nsecs, str(arg0), str(arg1), arg2); |
| }} |
| usdt:{python}:python:function__return {{ |
| printf("%lld\\tfunction__return:%s:%s:%d\\n", |
| nsecs, str(arg0), str(arg1), arg2); |
| }} |
| """, |
| "gc": """ |
| usdt:{python}:python:function__entry {{ |
| if (str(arg1) == "start") {{ @tracing = 1; }} |
| }} |
| usdt:{python}:python:function__return {{ |
| if (str(arg1) == "start") {{ @tracing = 0; }} |
| }} |
| usdt:{python}:python:gc__start {{ |
| if (@tracing) {{ |
| printf("%lld\\tgc__start:%d\\n", nsecs, arg0); |
| }} |
| }} |
| usdt:{python}:python:gc__done {{ |
| if (@tracing) {{ |
| printf("%lld\\tgc__done:%lld\\n", nsecs, arg0); |
| }} |
| }} |
| END {{ clear(@tracing); }} |
| """, |
| } |
| |
| # Which test scripts to filter by filename (None = use @tracing flag) |
| FILTER_BY_FILENAME = {"call_stack": "call_stack.py"} |
| |
| @staticmethod |
| def _filter_probe_rows(output): |
| return "\n".join( |
| line for line in output.splitlines() |
| if line.partition("\t")[0].isdigit() |
| ) |
| |
| # Expected outputs for each test case |
| # Note: bpftrace captures <module> entry/return and may have slight timing |
| # differences compared to SystemTap due to probe firing order |
| EXPECTED = { |
| "call_stack": """function__entry:call_stack.py:<module>:0 |
| function__entry:call_stack.py:start:23 |
| function__entry:call_stack.py:function_1:1 |
| function__entry:call_stack.py:function_3:9 |
| function__return:call_stack.py:function_3:10 |
| function__return:call_stack.py:function_1:2 |
| function__entry:call_stack.py:function_2:5 |
| function__entry:call_stack.py:function_1:1 |
| function__entry:call_stack.py:function_3:9 |
| function__return:call_stack.py:function_3:10 |
| function__return:call_stack.py:function_1:2 |
| function__return:call_stack.py:function_2:6 |
| function__entry:call_stack.py:function_3:9 |
| function__return:call_stack.py:function_3:10 |
| function__entry:call_stack.py:function_4:13 |
| function__return:call_stack.py:function_4:14 |
| function__entry:call_stack.py:function_5:18 |
| function__return:call_stack.py:function_5:21 |
| function__return:call_stack.py:start:28 |
| function__return:call_stack.py:<module>:30""", |
| "gc": """gc__start:0 |
| gc__done:0 |
| gc__start:1 |
| gc__done:0 |
| gc__start:2 |
| gc__done:0 |
| gc__start:2 |
| gc__done:1""", |
| } |
| |
| def run_case(self, name, optimize_python=None): |
| if name not in self.PROGRAMS: |
| raise unittest.SkipTest(f"No bpftrace program for {name}") |
| |
| python_file = abspath(name + ".py") |
| python_flags = [] |
| if optimize_python: |
| python_flags.extend(["-O"] * optimize_python) |
| |
| subcommand = [sys.executable] + python_flags + [python_file] |
| program = self.PROGRAMS[name].format(python=sys.executable) |
| |
| try: |
| proc = subprocess.Popen( |
| ["bpftrace", "-e", program, "-c", " ".join(subcommand)], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| universal_newlines=True, |
| ) |
| stdout, stderr = proc.communicate(timeout=60) |
| except subprocess.TimeoutExpired: |
| proc.kill() |
| raise AssertionError("bpftrace timed out") |
| except (FileNotFoundError, PermissionError) as e: |
| raise unittest.SkipTest(f"bpftrace not available: {e}") |
| |
| if proc.returncode != 0: |
| raise AssertionError( |
| f"bpftrace failed with code {proc.returncode}:\n{stderr}" |
| ) |
| |
| stdout = self._filter_probe_rows(stdout) |
| |
| # Filter output by filename if specified (bpftrace captures everything) |
| if name in self.FILTER_BY_FILENAME: |
| filter_filename = self.FILTER_BY_FILENAME[name] |
| filtered_lines = [ |
| line for line in stdout.splitlines() |
| if filter_filename in line |
| ] |
| stdout = "\n".join(filtered_lines) |
| |
| actual_output = normalize_trace_output(stdout) |
| expected_output = self.EXPECTED[name].strip() |
| |
| return (expected_output, actual_output) |
| |
| def assert_usable(self): |
| # Check if bpftrace is available and can attach to USDT probes |
| program = f'usdt:{sys.executable}:python:function__entry {{ printf("probe: success\\n"); exit(); }}' |
| try: |
| proc = subprocess.Popen( |
| ["bpftrace", "-e", program, "-c", f"{sys.executable} -c pass"], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| universal_newlines=True, |
| ) |
| stdout, stderr = proc.communicate(timeout=10) |
| except subprocess.TimeoutExpired: |
| proc.kill() |
| proc.communicate() # Clean up |
| raise unittest.SkipTest("bpftrace timed out during usability check") |
| except OSError as e: |
| raise unittest.SkipTest(f"bpftrace not available: {e}") |
| |
| # Check for permission errors (bpftrace usually requires root) |
| if proc.returncode != 0: |
| raise unittest.SkipTest( |
| f"bpftrace(1) failed with code {proc.returncode}: {stderr}" |
| ) |
| |
| if "probe: success" not in stdout: |
| raise unittest.SkipTest( |
| f"bpftrace(1) failed: stdout={stdout!r} stderr={stderr!r}" |
| ) |
| |
| |
| class BPFTraceOutputTests(unittest.TestCase): |
| def test_filter_probe_rows_ignores_warnings(self): |
| output = """stdin:1-19: WARNING: found external warnings |
| HINT: include/vmlinux.h:1439:3: warning: declaration does not declare anything |
| 4623214882928\tgc__start:0 |
| 4623214885575\tgc__done:0 |
| """ |
| self.assertEqual( |
| BPFTraceBackend._filter_probe_rows(output), |
| "4623214882928\tgc__start:0\n4623214885575\tgc__done:0", |
| ) |
| |
| |
| @unittest.skipIf(MS_WINDOWS, "Tests not compliant with trace on Windows.") |
| class TraceTests: |
| # unittest.TestCase options |
| maxDiff = None |
| |
| # TraceTests options |
| backend = None |
| optimize_python = 0 |
| |
| @classmethod |
| def setUpClass(self): |
| self.backend.assert_usable() |
| |
| def run_case(self, name): |
| actual_output, expected_output = self.backend.run_case( |
| name, optimize_python=self.optimize_python) |
| self.assertEqual(actual_output, expected_output) |
| |
| def test_function_entry_return(self): |
| self.run_case("call_stack") |
| |
| def test_verify_call_opcodes(self): |
| """Ensure our call stack test hits all function call opcodes""" |
| |
| # Modern Python uses CALL, CALL_KW, and CALL_FUNCTION_EX |
| opcodes = set(["CALL", "CALL_FUNCTION_EX", "CALL_KW"]) |
| |
| with open(abspath("call_stack.py")) as f: |
| code_string = f.read() |
| |
| def get_function_instructions(funcname): |
| # Recompile with appropriate optimization setting |
| code = compile(source=code_string, |
| filename="<string>", |
| mode="exec", |
| optimize=self.optimize_python) |
| |
| for c in code.co_consts: |
| if isinstance(c, types.CodeType) and c.co_name == funcname: |
| return dis.get_instructions(c) |
| return [] |
| |
| for instruction in get_function_instructions('start'): |
| opcodes.discard(instruction.opname) |
| |
| self.assertEqual(set(), opcodes) |
| |
| def test_gc(self): |
| self.run_case("gc") |
| |
| |
| class DTraceNormalTests(TraceTests, unittest.TestCase): |
| backend = DTraceBackend() |
| optimize_python = 0 |
| |
| |
| class DTraceOptimizedTests(TraceTests, unittest.TestCase): |
| backend = DTraceBackend() |
| optimize_python = 2 |
| |
| |
| class SystemTapNormalTests(TraceTests, unittest.TestCase): |
| backend = SystemTapBackend() |
| optimize_python = 0 |
| |
| |
| class SystemTapOptimizedTests(TraceTests, unittest.TestCase): |
| backend = SystemTapBackend() |
| optimize_python = 2 |
| |
| |
| class BPFTraceNormalTests(TraceTests, unittest.TestCase): |
| backend = BPFTraceBackend() |
| optimize_python = 0 |
| |
| |
| class BPFTraceOptimizedTests(TraceTests, unittest.TestCase): |
| backend = BPFTraceBackend() |
| optimize_python = 2 |
| |
| |
| class CheckDtraceProbes(unittest.TestCase): |
| @classmethod |
| def setUpClass(cls): |
| if sysconfig.get_config_var('WITH_DTRACE'): |
| readelf_major_version, readelf_minor_version = cls.get_readelf_version() |
| if support.verbose: |
| print(f"readelf version: {readelf_major_version}.{readelf_minor_version}") |
| else: |
| raise unittest.SkipTest("CPython must be configured with the --with-dtrace option.") |
| |
| |
| @staticmethod |
| def get_readelf_version(): |
| try: |
| cmd = ["readelf", "--version"] |
| proc = subprocess.Popen( |
| cmd, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| universal_newlines=True, |
| ) |
| with proc: |
| version, stderr = proc.communicate() |
| |
| if proc.returncode: |
| raise Exception( |
| f"Command {' '.join(cmd)!r} failed " |
| f"with exit code {proc.returncode}: " |
| f"stdout={version!r} stderr={stderr!r}" |
| ) |
| except OSError: |
| raise unittest.SkipTest("Couldn't find readelf on the path") |
| |
| # Regex to parse: |
| # 'GNU readelf (GNU Binutils) 2.40.0\n' -> 2.40 |
| match = re.search(r"^(?:GNU) readelf.*?\b(\d+)\.(\d+)", version) |
| if match is None: |
| raise unittest.SkipTest(f"Unable to parse readelf version: {version}") |
| |
| return int(match.group(1)), int(match.group(2)) |
| |
| def get_readelf_output(self): |
| command = ["readelf", "-n", sys.executable] |
| stdout, _ = subprocess.Popen( |
| command, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| universal_newlines=True, |
| ).communicate() |
| return stdout |
| |
| def test_check_probes(self): |
| readelf_output = self.get_readelf_output() |
| |
| available_probe_names = [ |
| "Name: import__find__load__done", |
| "Name: import__find__load__start", |
| "Name: audit", |
| "Name: gc__start", |
| "Name: gc__done", |
| "Name: function__entry", |
| "Name: function__return", |
| ] |
| |
| for probe_name in available_probe_names: |
| with self.subTest(probe_name=probe_name): |
| self.assertIn(probe_name, readelf_output) |
| |
| @unittest.expectedFailure |
| def test_missing_probes(self): |
| readelf_output = self.get_readelf_output() |
| |
| # Missing probes will be added in the future. |
| missing_probe_names = [ |
| "Name: line", |
| ] |
| |
| for probe_name in missing_probe_names: |
| with self.subTest(probe_name=probe_name): |
| self.assertIn(probe_name, readelf_output) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |