| import io |
| import itertools |
| import json |
| import os |
| import re |
| import signal |
| import socket |
| import subprocess |
| import sys |
| import textwrap |
| import unittest |
| import unittest.mock |
| from contextlib import closing, contextmanager, redirect_stdout, redirect_stderr, ExitStack |
| from test.support import is_wasi, cpython_only, force_color, requires_subprocess, SHORT_TIMEOUT, subTests |
| from test.support.os_helper import TESTFN, unlink |
| from typing import List |
| |
| import pdb |
| from pdb import _PdbServer, _PdbClient |
| |
| |
| if not sys.is_remote_debug_enabled(): |
| raise unittest.SkipTest('remote debugging is disabled') |
| |
| |
| @contextmanager |
| def kill_on_error(proc): |
| """Context manager killing the subprocess if a Python exception is raised.""" |
| with proc: |
| try: |
| yield proc |
| except: |
| proc.kill() |
| raise |
| |
| |
| class MockSocketFile: |
| """Mock socket file for testing _PdbServer without actual socket connections.""" |
| |
| def __init__(self): |
| self.input_queue = [] |
| self.output_buffer = [] |
| |
| def write(self, data: bytes) -> None: |
| """Simulate write to socket.""" |
| self.output_buffer.append(data) |
| |
| def flush(self) -> None: |
| """No-op flush implementation.""" |
| pass |
| |
| def readline(self) -> bytes: |
| """Read a line from the prepared input queue.""" |
| if not self.input_queue: |
| return b"" |
| return self.input_queue.pop(0) |
| |
| def close(self) -> None: |
| """Close the mock socket file.""" |
| pass |
| |
| def add_input(self, data: dict) -> None: |
| """Add input that will be returned by readline.""" |
| self.input_queue.append(json.dumps(data).encode() + b"\n") |
| |
| def get_output(self) -> List[dict]: |
| """Get the output that was written by the object being tested.""" |
| results = [] |
| for data in self.output_buffer: |
| if isinstance(data, bytes) and data.endswith(b"\n"): |
| try: |
| results.append(json.loads(data.decode().strip())) |
| except json.JSONDecodeError: |
| pass # Ignore non-JSON output |
| self.output_buffer = [] |
| return results |
| |
| |
| class PdbClientTestCase(unittest.TestCase): |
| """Tests for the _PdbClient class.""" |
| |
| def do_test( |
| self, |
| *, |
| incoming, |
| simulate_send_failure=False, |
| simulate_sigint_during_stdout_write=False, |
| use_interrupt_socket=False, |
| expected_outgoing=None, |
| expected_outgoing_signals=None, |
| expected_completions=None, |
| expected_exception=None, |
| expected_stdout="", |
| expected_stdout_substring="", |
| expected_state=None, |
| ): |
| if expected_outgoing is None: |
| expected_outgoing = [] |
| if expected_outgoing_signals is None: |
| expected_outgoing_signals = [] |
| if expected_completions is None: |
| expected_completions = [] |
| if expected_state is None: |
| expected_state = {} |
| |
| expected_state.setdefault("write_failed", False) |
| messages = [m for source, m in incoming if source == "server"] |
| prompts = [m["prompt"] for source, m in incoming if source == "user"] |
| |
| input_iter = (m for source, m in incoming if source == "user") |
| completions = [] |
| |
| def mock_input(prompt): |
| message = next(input_iter, None) |
| if message is None: |
| raise EOFError |
| |
| if req := message.get("completion_request"): |
| readline_mock = unittest.mock.Mock() |
| readline_mock.get_line_buffer.return_value = req["line"] |
| readline_mock.get_begidx.return_value = req["begidx"] |
| readline_mock.get_endidx.return_value = req["endidx"] |
| unittest.mock.seal(readline_mock) |
| with unittest.mock.patch.dict(sys.modules, {"readline": readline_mock}): |
| for param in itertools.count(): |
| prefix = req["line"][req["begidx"] : req["endidx"]] |
| completion = client.complete(prefix, param) |
| if completion is None: |
| break |
| completions.append(completion) |
| |
| reply = message["input"] |
| if isinstance(reply, BaseException): |
| raise reply |
| if isinstance(reply, str): |
| return reply |
| return reply() |
| |
| with ExitStack() as stack: |
| client_sock, server_sock = socket.socketpair() |
| stack.enter_context(closing(client_sock)) |
| stack.enter_context(closing(server_sock)) |
| |
| server_sock = unittest.mock.Mock(wraps=server_sock) |
| |
| client_sock.sendall( |
| b"".join( |
| (m if isinstance(m, bytes) else json.dumps(m).encode()) + b"\n" |
| for m in messages |
| ) |
| ) |
| client_sock.shutdown(socket.SHUT_WR) |
| |
| if simulate_send_failure: |
| server_sock.sendall = unittest.mock.Mock( |
| side_effect=OSError("sendall failed") |
| ) |
| client_sock.shutdown(socket.SHUT_RD) |
| |
| stdout = io.StringIO() |
| |
| if simulate_sigint_during_stdout_write: |
| orig_stdout_write = stdout.write |
| |
| def sigint_stdout_write(s): |
| signal.raise_signal(signal.SIGINT) |
| return orig_stdout_write(s) |
| |
| stdout.write = sigint_stdout_write |
| |
| input_mock = stack.enter_context( |
| unittest.mock.patch("pdb.input", side_effect=mock_input) |
| ) |
| stack.enter_context(redirect_stdout(stdout)) |
| |
| if use_interrupt_socket: |
| interrupt_sock = unittest.mock.Mock(spec=socket.socket) |
| mock_kill = None |
| else: |
| interrupt_sock = None |
| mock_kill = stack.enter_context( |
| unittest.mock.patch("os.kill", spec=os.kill) |
| ) |
| |
| client = _PdbClient( |
| pid=12345, |
| server_socket=server_sock, |
| interrupt_sock=interrupt_sock, |
| ) |
| |
| if expected_exception is not None: |
| exception = expected_exception["exception"] |
| msg = expected_exception["msg"] |
| stack.enter_context(self.assertRaises(exception, msg=msg)) |
| |
| client.cmdloop() |
| |
| sent_msgs = [msg.args[0] for msg in server_sock.sendall.mock_calls] |
| for msg in sent_msgs: |
| assert msg.endswith(b"\n") |
| actual_outgoing = [json.loads(msg) for msg in sent_msgs] |
| |
| self.assertEqual(actual_outgoing, expected_outgoing) |
| self.assertEqual(completions, expected_completions) |
| if expected_stdout_substring and not expected_stdout: |
| self.assertIn(expected_stdout_substring, stdout.getvalue()) |
| else: |
| self.assertEqual(stdout.getvalue(), expected_stdout) |
| input_mock.assert_has_calls([unittest.mock.call(p) for p in prompts]) |
| actual_state = {k: getattr(client, k) for k in expected_state} |
| self.assertEqual(actual_state, expected_state) |
| |
| if use_interrupt_socket: |
| outgoing_signals = [ |
| signal.Signals(int.from_bytes(call.args[0])) |
| for call in interrupt_sock.sendall.call_args_list |
| ] |
| else: |
| assert mock_kill is not None |
| outgoing_signals = [] |
| for call in mock_kill.call_args_list: |
| pid, signum = call.args |
| self.assertEqual(pid, 12345) |
| outgoing_signals.append(signal.Signals(signum)) |
| self.assertEqual(outgoing_signals, expected_outgoing_signals) |
| |
| def test_remote_immediately_closing_the_connection(self): |
| """Test the behavior when the remote closes the connection immediately.""" |
| incoming = [] |
| expected_outgoing = [] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=expected_outgoing, |
| ) |
| |
| def test_handling_command_list(self): |
| """Test handling the command_list message.""" |
| incoming = [ |
| ("server", {"command_list": ["help", "list", "continue"]}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_state={ |
| "pdb_commands": {"help", "list", "continue"}, |
| }, |
| ) |
| |
| def test_handling_info_message(self): |
| """Test handling a message payload with type='info'.""" |
| incoming = [ |
| ("server", {"message": "Some message or other\n", "type": "info"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_stdout="Some message or other\n", |
| ) |
| |
| def test_handling_error_message(self): |
| """Test handling a message payload with type='error'.""" |
| incoming = [ |
| ("server", {"message": "Some message or other.", "type": "error"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_stdout="*** Some message or other.\n", |
| ) |
| |
| def test_handling_other_message(self): |
| """Test handling a message payload with an unrecognized type.""" |
| incoming = [ |
| ("server", {"message": "Some message.\n", "type": "unknown"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_stdout="Some message.\n", |
| ) |
| |
| @unittest.skipIf(sys.flags.optimize >= 2, "Help not available for -OO") |
| @subTests( |
| "help_request,expected_substring", |
| [ |
| # a request to display help for a command |
| ({"help": "ll"}, "Usage: ll | longlist"), |
| # a request to display a help overview |
| ({"help": ""}, "type help <topic>"), |
| # a request to display the full PDB manual |
| ({"help": "pdb"}, ">>> import pdb"), |
| ], |
| ) |
| def test_handling_help_when_available(self, help_request, expected_substring): |
| """Test handling help requests when help is available.""" |
| incoming = [ |
| ("server", help_request), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_stdout_substring=expected_substring, |
| ) |
| |
| @unittest.skipIf(sys.flags.optimize < 2, "Needs -OO") |
| @subTests( |
| "help_request,expected_substring", |
| [ |
| # a request to display help for a command |
| ({"help": "ll"}, "No help for 'll'"), |
| # a request to display a help overview |
| ({"help": ""}, "Undocumented commands"), |
| # a request to display the full PDB manual |
| ({"help": "pdb"}, "No help for 'pdb'"), |
| ], |
| ) |
| def test_handling_help_when_not_available(self, help_request, expected_substring): |
| """Test handling help requests when help is not available.""" |
| incoming = [ |
| ("server", help_request), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_stdout_substring=expected_substring, |
| ) |
| |
| def test_handling_pdb_prompts(self): |
| """Test responding to pdb's normal prompts.""" |
| incoming = [ |
| ("server", {"command_list": ["b"]}), |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": "lst ["}), |
| ("user", {"prompt": "... ", "input": "0 ]"}), |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": ""}), |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": "b ["}), |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": "! b ["}), |
| ("user", {"prompt": "... ", "input": "b ]"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"reply": "lst [\n0 ]"}, |
| {"reply": ""}, |
| {"reply": "b ["}, |
| {"reply": "!b [\nb ]"}, |
| ], |
| expected_state={"state": "pdb"}, |
| ) |
| |
| def test_handling_interact_prompts(self): |
| """Test responding to pdb's interact mode prompts.""" |
| incoming = [ |
| ("server", {"command_list": ["b"]}), |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ("user", {"prompt": ">>> ", "input": "lst ["}), |
| ("user", {"prompt": "... ", "input": "0 ]"}), |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ("user", {"prompt": ">>> ", "input": ""}), |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ("user", {"prompt": ">>> ", "input": "b ["}), |
| ("user", {"prompt": "... ", "input": "b ]"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"reply": "lst [\n0 ]"}, |
| {"reply": ""}, |
| {"reply": "b [\nb ]"}, |
| ], |
| expected_state={"state": "interact"}, |
| ) |
| |
| def test_retry_pdb_prompt_on_syntax_error(self): |
| """Test re-prompting after a SyntaxError in a Python expression.""" |
| incoming = [ |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": " lst ["}), |
| ("user", {"prompt": "(Pdb) ", "input": "lst ["}), |
| ("user", {"prompt": "... ", "input": " 0 ]"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"reply": "lst [\n 0 ]"}, |
| ], |
| expected_stdout_substring="*** IndentationError", |
| expected_state={"state": "pdb"}, |
| ) |
| |
| def test_retry_interact_prompt_on_syntax_error(self): |
| """Test re-prompting after a SyntaxError in a Python expression.""" |
| incoming = [ |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ("user", {"prompt": ">>> ", "input": "!lst ["}), |
| ("user", {"prompt": ">>> ", "input": "lst ["}), |
| ("user", {"prompt": "... ", "input": " 0 ]"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"reply": "lst [\n 0 ]"}, |
| ], |
| expected_stdout_substring="*** SyntaxError", |
| expected_state={"state": "interact"}, |
| ) |
| |
| def test_handling_unrecognized_prompt_type(self): |
| """Test fallback to "dumb" single-line mode for unknown states.""" |
| incoming = [ |
| ("server", {"prompt": "Do it? ", "state": "confirm"}), |
| ("user", {"prompt": "Do it? ", "input": "! ["}), |
| ("server", {"prompt": "Do it? ", "state": "confirm"}), |
| ("user", {"prompt": "Do it? ", "input": "echo hello"}), |
| ("server", {"prompt": "Do it? ", "state": "confirm"}), |
| ("user", {"prompt": "Do it? ", "input": ""}), |
| ("server", {"prompt": "Do it? ", "state": "confirm"}), |
| ("user", {"prompt": "Do it? ", "input": "echo goodbye"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"reply": "! ["}, |
| {"reply": "echo hello"}, |
| {"reply": ""}, |
| {"reply": "echo goodbye"}, |
| ], |
| expected_state={"state": "dumb"}, |
| ) |
| |
| def test_sigint_at_prompt(self): |
| """Test signaling when a prompt gets interrupted.""" |
| incoming = [ |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ( |
| "user", |
| { |
| "prompt": "(Pdb) ", |
| "input": lambda: signal.raise_signal(signal.SIGINT), |
| }, |
| ), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"signal": "INT"}, |
| ], |
| expected_state={"state": "pdb"}, |
| ) |
| |
| def test_sigint_at_continuation_prompt(self): |
| """Test signaling when a continuation prompt gets interrupted.""" |
| incoming = [ |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": "if True:"}), |
| ( |
| "user", |
| { |
| "prompt": "... ", |
| "input": lambda: signal.raise_signal(signal.SIGINT), |
| }, |
| ), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"signal": "INT"}, |
| ], |
| expected_state={"state": "pdb"}, |
| ) |
| |
| def test_sigint_when_writing(self): |
| """Test siginaling when sys.stdout.write() gets interrupted.""" |
| incoming = [ |
| ("server", {"message": "Some message or other\n", "type": "info"}), |
| ] |
| for use_interrupt_socket in [False, True]: |
| with self.subTest(use_interrupt_socket=use_interrupt_socket): |
| self.do_test( |
| incoming=incoming, |
| simulate_sigint_during_stdout_write=True, |
| use_interrupt_socket=use_interrupt_socket, |
| expected_outgoing=[], |
| expected_outgoing_signals=[signal.SIGINT], |
| expected_stdout="Some message or other\n", |
| ) |
| |
| def test_eof_at_prompt(self): |
| """Test signaling when a prompt gets an EOFError.""" |
| incoming = [ |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": EOFError()}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"signal": "EOF"}, |
| ], |
| expected_state={"state": "pdb"}, |
| ) |
| |
| def test_unrecognized_json_message(self): |
| """Test failing after getting an unrecognized payload.""" |
| incoming = [ |
| ("server", {"monty": "python"}), |
| ("server", {"message": "Some message or other\n", "type": "info"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_exception={ |
| "exception": RuntimeError, |
| "msg": 'Unrecognized payload b\'{"monty": "python"}\'', |
| }, |
| ) |
| |
| def test_continuing_after_getting_a_non_json_payload(self): |
| """Test continuing after getting a non JSON payload.""" |
| incoming = [ |
| ("server", b"spam"), |
| ("server", {"message": "Something", "type": "info"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[], |
| expected_stdout="\n".join( |
| [ |
| "*** Invalid JSON from remote: b'spam\\n'", |
| "Something", |
| ] |
| ), |
| ) |
| |
| def test_write_failing(self): |
| """Test terminating if write fails due to a half closed socket.""" |
| incoming = [ |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": KeyboardInterrupt()}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[{"signal": "INT"}], |
| simulate_send_failure=True, |
| expected_state={"write_failed": True}, |
| ) |
| |
| def test_completion_in_pdb_state(self): |
| """Test requesting tab completions at a (Pdb) prompt.""" |
| # GIVEN |
| incoming = [ |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ( |
| "user", |
| { |
| "prompt": "(Pdb) ", |
| "completion_request": { |
| "line": " mod._", |
| "begidx": 8, |
| "endidx": 9, |
| }, |
| "input": "print(\n mod.__name__)", |
| }, |
| ), |
| ("server", {"completions": ["__name__", "__file__"]}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| { |
| "complete": { |
| "text": "_", |
| "line": "mod._", |
| "begidx": 4, |
| "endidx": 5, |
| } |
| }, |
| {"reply": "print(\n mod.__name__)"}, |
| ], |
| expected_completions=["__name__", "__file__"], |
| expected_state={"state": "pdb"}, |
| ) |
| |
| def test_multiline_completion_in_pdb_state(self): |
| """Test requesting tab completions at a (Pdb) continuation prompt.""" |
| # GIVEN |
| incoming = [ |
| ("server", {"prompt": "(Pdb) ", "state": "pdb"}), |
| ("user", {"prompt": "(Pdb) ", "input": "if True:"}), |
| ( |
| "user", |
| { |
| "prompt": "... ", |
| "completion_request": { |
| "line": " b", |
| "begidx": 4, |
| "endidx": 5, |
| }, |
| "input": " bool()", |
| }, |
| ), |
| ("server", {"completions": ["bin", "bool", "bytes"]}), |
| ("user", {"prompt": "... ", "input": ""}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| { |
| "complete": { |
| "text": "b", |
| "line": "! b", |
| "begidx": 2, |
| "endidx": 3, |
| } |
| }, |
| {"reply": "if True:\n bool()\n"}, |
| ], |
| expected_completions=["bin", "bool", "bytes"], |
| expected_state={"state": "pdb"}, |
| ) |
| |
| def test_completion_in_interact_state(self): |
| """Test requesting tab completions at a >>> prompt.""" |
| incoming = [ |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ( |
| "user", |
| { |
| "prompt": ">>> ", |
| "completion_request": { |
| "line": " mod.__", |
| "begidx": 8, |
| "endidx": 10, |
| }, |
| "input": "print(\n mod.__name__)", |
| }, |
| ), |
| ("server", {"completions": ["__name__", "__file__"]}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| { |
| "complete": { |
| "text": "__", |
| "line": "mod.__", |
| "begidx": 4, |
| "endidx": 6, |
| } |
| }, |
| {"reply": "print(\n mod.__name__)"}, |
| ], |
| expected_completions=["__name__", "__file__"], |
| expected_state={"state": "interact"}, |
| ) |
| |
| def test_completion_in_unknown_state(self): |
| """Test requesting tab completions at an unrecognized prompt.""" |
| incoming = [ |
| ("server", {"command_list": ["p"]}), |
| ("server", {"prompt": "Do it? ", "state": "confirm"}), |
| ( |
| "user", |
| { |
| "prompt": "Do it? ", |
| "completion_request": { |
| "line": "_", |
| "begidx": 0, |
| "endidx": 1, |
| }, |
| "input": "__name__", |
| }, |
| ), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| {"reply": "__name__"}, |
| ], |
| expected_state={"state": "dumb"}, |
| ) |
| |
| def test_write_failure_during_completion(self): |
| """Test failing to write to the socket to request tab completions.""" |
| incoming = [ |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ( |
| "user", |
| { |
| "prompt": ">>> ", |
| "completion_request": { |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| }, |
| "input": "xyz", |
| }, |
| ), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| { |
| "complete": { |
| "text": "xy", |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| } |
| }, |
| {"reply": "xyz"}, |
| ], |
| simulate_send_failure=True, |
| expected_completions=[], |
| expected_state={"state": "interact", "write_failed": True}, |
| ) |
| |
| def test_read_failure_during_completion(self): |
| """Test failing to read tab completions from the socket.""" |
| incoming = [ |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ( |
| "user", |
| { |
| "prompt": ">>> ", |
| "completion_request": { |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| }, |
| "input": "xyz", |
| }, |
| ), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| { |
| "complete": { |
| "text": "xy", |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| } |
| }, |
| {"reply": "xyz"}, |
| ], |
| expected_completions=[], |
| expected_state={"state": "interact"}, |
| ) |
| |
| def test_reading_invalid_json_during_completion(self): |
| """Test receiving invalid JSON when getting tab completions.""" |
| incoming = [ |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ( |
| "user", |
| { |
| "prompt": ">>> ", |
| "completion_request": { |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| }, |
| "input": "xyz", |
| }, |
| ), |
| ("server", b'{"completions": '), |
| ("user", {"prompt": ">>> ", "input": "xyz"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| { |
| "complete": { |
| "text": "xy", |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| } |
| }, |
| {"reply": "xyz"}, |
| ], |
| expected_stdout_substring="*** json.decoder.JSONDecodeError", |
| expected_completions=[], |
| expected_state={"state": "interact"}, |
| ) |
| |
| def test_reading_empty_json_during_completion(self): |
| """Test receiving an empty JSON object when getting tab completions.""" |
| incoming = [ |
| ("server", {"prompt": ">>> ", "state": "interact"}), |
| ( |
| "user", |
| { |
| "prompt": ">>> ", |
| "completion_request": { |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| }, |
| "input": "xyz", |
| }, |
| ), |
| ("server", {}), |
| ("user", {"prompt": ">>> ", "input": "xyz"}), |
| ] |
| self.do_test( |
| incoming=incoming, |
| expected_outgoing=[ |
| { |
| "complete": { |
| "text": "xy", |
| "line": "xy", |
| "begidx": 0, |
| "endidx": 2, |
| } |
| }, |
| {"reply": "xyz"}, |
| ], |
| expected_stdout=( |
| "*** RuntimeError: Failed to get valid completions." |
| " Got: {}\n" |
| ), |
| expected_completions=[], |
| expected_state={"state": "interact"}, |
| ) |
| |
| |
| class RemotePdbTestCase(unittest.TestCase): |
| """Tests for the _PdbServer class.""" |
| |
| def setUp(self): |
| self.sockfile = MockSocketFile() |
| self.pdb = _PdbServer(self.sockfile) |
| |
| # Mock some Bdb attributes that are lazily created when tracing starts |
| self.pdb.botframe = None |
| self.pdb.quitting = False |
| |
| # Create a frame for testing |
| self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': {'x': 100}} |
| self.test_locals = {'c': 3, 'd': 4} |
| |
| # Create a simple test frame |
| frame_info = unittest.mock.Mock() |
| frame_info.f_globals = self.test_globals |
| frame_info.f_locals = self.test_locals |
| frame_info.f_lineno = 42 |
| frame_info.f_code = unittest.mock.Mock() |
| frame_info.f_code.co_filename = "test_file.py" |
| frame_info.f_code.co_name = "test_function" |
| |
| self.pdb.curframe = frame_info |
| |
| def test_message_and_error(self): |
| """Test message and error methods send correct JSON.""" |
| self.pdb.message("Test message") |
| self.pdb.error("Test error") |
| |
| outputs = self.sockfile.get_output() |
| self.assertEqual(len(outputs), 2) |
| self.assertEqual(outputs[0], {"message": "Test message\n", "type": "info"}) |
| self.assertEqual(outputs[1], {"message": "Test error", "type": "error"}) |
| |
| def test_read_command(self): |
| """Test reading commands from the socket.""" |
| # Add test input |
| self.sockfile.add_input({"reply": "help"}) |
| |
| # Read the command |
| cmd = self.pdb._read_reply() |
| self.assertEqual(cmd, "help") |
| |
| def test_read_command_EOF(self): |
| """Test reading EOF command.""" |
| # Simulate socket closure |
| self.pdb._write_failed = True |
| with self.assertRaises(EOFError): |
| self.pdb._read_reply() |
| |
| def test_completion(self): |
| """Test handling completion requests.""" |
| # Mock completenames to return specific values |
| with unittest.mock.patch.object(self.pdb, 'completenames', |
| return_value=["continue", "clear"]): |
| |
| # Add a completion request |
| self.sockfile.add_input({ |
| "complete": { |
| "text": "c", |
| "line": "c", |
| "begidx": 0, |
| "endidx": 1 |
| } |
| }) |
| |
| # Add a regular command to break the loop |
| self.sockfile.add_input({"reply": "help"}) |
| |
| # Read command - this should process the completion request first |
| cmd = self.pdb._read_reply() |
| |
| # Verify completion response was sent |
| outputs = self.sockfile.get_output() |
| self.assertEqual(len(outputs), 1) |
| self.assertEqual(outputs[0], {"completions": ["continue", "clear"]}) |
| |
| # The actual command should be returned |
| self.assertEqual(cmd, "help") |
| |
| def test_do_help(self): |
| """Test that do_help sends the help message.""" |
| self.pdb.do_help("break") |
| |
| outputs = self.sockfile.get_output() |
| self.assertEqual(len(outputs), 1) |
| self.assertEqual(outputs[0], {"help": "break"}) |
| |
| def test_interact_mode(self): |
| """Test interaction mode setup and execution.""" |
| # First set up interact mode |
| self.pdb.do_interact("") |
| |
| # Verify _interact_state is properly initialized |
| self.assertIsNotNone(self.pdb._interact_state) |
| self.assertIsInstance(self.pdb._interact_state, dict) |
| |
| # Test running code in interact mode |
| with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error: |
| self.pdb._run_in_python_repl("print('test')") |
| mock_error.assert_not_called() |
| |
| # Test with syntax error |
| self.pdb._run_in_python_repl("if:") |
| mock_error.assert_called_once() |
| |
| def test_registering_commands(self): |
| """Test registering breakpoint commands.""" |
| # Mock get_bpbynumber |
| with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'): |
| # Queue up some input to send |
| self.sockfile.add_input({"reply": "commands 1"}) |
| self.sockfile.add_input({"reply": "silent"}) |
| self.sockfile.add_input({"reply": "print('hi')"}) |
| self.sockfile.add_input({"reply": "end"}) |
| self.sockfile.add_input({"signal": "EOF"}) |
| |
| # Run the PDB command loop |
| self.pdb.cmdloop() |
| |
| outputs = self.sockfile.get_output() |
| self.assertIn('command_list', outputs[0]) |
| self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"}) |
| self.assertEqual(outputs[2], {"prompt": "(com) ", "state": "commands"}) |
| self.assertEqual(outputs[3], {"prompt": "(com) ", "state": "commands"}) |
| self.assertEqual(outputs[4], {"prompt": "(com) ", "state": "commands"}) |
| self.assertEqual(outputs[5], {"prompt": "(Pdb) ", "state": "pdb"}) |
| self.assertEqual(outputs[6], {"message": "\n", "type": "info"}) |
| self.assertEqual(len(outputs), 7) |
| |
| self.assertEqual( |
| self.pdb.commands[1], |
| ["_pdbcmd_silence_frame_status", "print('hi')"], |
| ) |
| |
| def test_detach(self): |
| """Test the detach method.""" |
| with unittest.mock.patch.object(self.sockfile, 'close') as mock_close: |
| self.pdb.detach() |
| mock_close.assert_called_once() |
| self.assertFalse(self.pdb.quitting) |
| |
| def test_cmdloop(self): |
| """Test the command loop with various commands.""" |
| # Mock onecmd to track command execution |
| with unittest.mock.patch.object(self.pdb, 'onecmd', return_value=False) as mock_onecmd: |
| # Add commands to the queue |
| self.pdb.cmdqueue = ['help', 'list'] |
| |
| # Add a command from the socket for when cmdqueue is empty |
| self.sockfile.add_input({"reply": "next"}) |
| |
| # Add a second command to break the loop |
| self.sockfile.add_input({"reply": "quit"}) |
| |
| # Configure onecmd to exit the loop on "quit" |
| def side_effect(line): |
| return line == 'quit' |
| mock_onecmd.side_effect = side_effect |
| |
| # Run the command loop |
| self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace() |
| self.pdb.cmdloop() |
| |
| # Should have processed 4 commands: 2 from cmdqueue, 2 from socket |
| self.assertEqual(mock_onecmd.call_count, 4) |
| mock_onecmd.assert_any_call('help') |
| mock_onecmd.assert_any_call('list') |
| mock_onecmd.assert_any_call('next') |
| mock_onecmd.assert_any_call('quit') |
| |
| # Check if prompt was sent to client |
| outputs = self.sockfile.get_output() |
| prompts = [o for o in outputs if 'prompt' in o] |
| self.assertEqual(len(prompts), 2) # Should have sent 2 prompts |
| |
| |
| @requires_subprocess() |
| @unittest.skipIf(is_wasi, "WASI does not support TCP sockets") |
| class PdbConnectTestCase(unittest.TestCase): |
| """Tests for the _connect mechanism using direct socket communication.""" |
| |
| def setUp(self): |
| # Create a server socket that will wait for the debugger to connect |
| self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.server_sock.bind(('127.0.0.1', 0)) # Let OS assign port |
| self.server_sock.listen(1) |
| self.port = self.server_sock.getsockname()[1] |
| |
| def _create_script(self, script=None): |
| # Create a file for subprocess script |
| if script is None: |
| script = textwrap.dedent( |
| f""" |
| import pdb |
| import sys |
| import time |
| |
| def foo(): |
| x = 42 |
| return bar() |
| |
| def bar(): |
| return 42 |
| |
| def connect_to_debugger(): |
| # Create a frame to debug |
| def dummy_function(): |
| x = 42 |
| # Call connect to establish connection |
| # with the test server |
| frame = sys._getframe() # Get the current frame |
| pdb._connect( |
| host='127.0.0.1', |
| port={self.port}, |
| frame=frame, |
| commands="", |
| version=pdb._PdbServer.protocol_version(), |
| signal_raising_thread=False, |
| colorize=False, |
| ) |
| return x # This line won't be reached in debugging |
| |
| return dummy_function() |
| |
| result = connect_to_debugger() |
| foo() |
| print(f"Function returned: {{result}}") |
| """) |
| |
| self.script_path = TESTFN + "_connect_test.py" |
| with open(self.script_path, 'w') as f: |
| f.write(script) |
| |
| def tearDown(self): |
| self.server_sock.close() |
| try: |
| unlink(self.script_path) |
| except OSError: |
| pass |
| |
| def _connect_and_get_client_file(self): |
| """Helper to start subprocess and get connected client file.""" |
| # Start the subprocess that will connect to our socket |
| process = subprocess.Popen( |
| [sys.executable, self.script_path], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True |
| ) |
| |
| # Accept the connection from the subprocess |
| client_sock, _ = self.server_sock.accept() |
| client_file = client_sock.makefile('rwb') |
| self.addCleanup(client_file.close) |
| self.addCleanup(client_sock.close) |
| |
| return process, client_file |
| |
| def _read_until_prompt(self, client_file): |
| """Helper to read messages until a prompt is received.""" |
| messages = [] |
| while True: |
| data = client_file.readline() |
| if not data: |
| break |
| msg = json.loads(data.decode()) |
| messages.append(msg) |
| if 'prompt' in msg: |
| break |
| return messages |
| |
| def _send_command(self, client_file, command): |
| """Helper to send a command to the debugger.""" |
| client_file.write(json.dumps({"reply": command}).encode() + b"\n") |
| client_file.flush() |
| |
| def test_connect_and_basic_commands(self): |
| """Test connecting to a remote debugger and sending basic commands.""" |
| self._create_script() |
| process, client_file = self._connect_and_get_client_file() |
| |
| with kill_on_error(process): |
| # We should receive initial data from the debugger |
| data = client_file.readline() |
| initial_data = json.loads(data.decode()) |
| self.assertIn('message', initial_data) |
| self.assertIn('pdb._connect', initial_data['message']) |
| |
| # First, look for command_list message |
| data = client_file.readline() |
| command_list = json.loads(data.decode()) |
| self.assertIn('command_list', command_list) |
| |
| # Then, look for the first prompt |
| data = client_file.readline() |
| prompt_data = json.loads(data.decode()) |
| self.assertIn('prompt', prompt_data) |
| self.assertEqual(prompt_data['state'], 'pdb') |
| |
| # Send 'bt' (backtrace) command |
| self._send_command(client_file, "bt") |
| |
| # Check for response - we should get some stack frames |
| messages = self._read_until_prompt(client_file) |
| |
| # Extract text messages containing stack info |
| text_msg = [msg['message'] for msg in messages |
| if 'message' in msg and 'connect_to_debugger' in msg['message']] |
| got_stack_info = bool(text_msg) |
| |
| expected_stacks = [ |
| "<module>", |
| "connect_to_debugger", |
| ] |
| |
| for stack, msg in zip(expected_stacks, text_msg, strict=True): |
| self.assertIn(stack, msg) |
| |
| self.assertTrue(got_stack_info, "Should have received stack trace information") |
| |
| # Send 'c' (continue) command to let the program finish |
| self._send_command(client_file, "c") |
| |
| # Wait for process to finish |
| stdout, _ = process.communicate(timeout=SHORT_TIMEOUT) |
| |
| # Check if we got the expected output |
| self.assertIn("Function returned: 42", stdout) |
| self.assertEqual(process.returncode, 0) |
| |
| def test_breakpoints(self): |
| """Test setting and hitting breakpoints.""" |
| self._create_script() |
| process, client_file = self._connect_and_get_client_file() |
| with kill_on_error(process): |
| # Skip initial messages until we get to the prompt |
| self._read_until_prompt(client_file) |
| |
| # Set a breakpoint at the return statement |
| self._send_command(client_file, "break bar") |
| messages = self._read_until_prompt(client_file) |
| bp_msg = next(msg['message'] for msg in messages if 'message' in msg) |
| self.assertIn("Breakpoint", bp_msg) |
| |
| # Continue execution until breakpoint |
| self._send_command(client_file, "c") |
| messages = self._read_until_prompt(client_file) |
| |
| # Verify we hit the breakpoint |
| hit_msg = next(msg['message'] for msg in messages if 'message' in msg) |
| self.assertIn("bar()", hit_msg) |
| |
| # Check breakpoint list |
| self._send_command(client_file, "b") |
| messages = self._read_until_prompt(client_file) |
| list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg) |
| self.assertIn("1 breakpoint", list_msg) |
| self.assertIn("breakpoint already hit 1 time", list_msg) |
| |
| # Clear breakpoint |
| self._send_command(client_file, "clear 1") |
| messages = self._read_until_prompt(client_file) |
| clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg) |
| self.assertIn("Deleted breakpoint", clear_msg) |
| |
| # Continue to end |
| self._send_command(client_file, "c") |
| stdout, _ = process.communicate(timeout=SHORT_TIMEOUT) |
| |
| self.assertIn("Function returned: 42", stdout) |
| self.assertEqual(process.returncode, 0) |
| |
| def test_keyboard_interrupt(self): |
| """Test that sending keyboard interrupt breaks into pdb.""" |
| |
| script = textwrap.dedent(f""" |
| import time |
| import sys |
| import socket |
| import pdb |
| def bar(): |
| frame = sys._getframe() # Get the current frame |
| pdb._connect( |
| host='127.0.0.1', |
| port={self.port}, |
| frame=frame, |
| commands="", |
| version=pdb._PdbServer.protocol_version(), |
| signal_raising_thread=True, |
| colorize=False, |
| ) |
| print("Connected to debugger") |
| iterations = 50 |
| while iterations > 0: |
| print("Iteration", iterations, flush=True) |
| time.sleep(0.2) |
| iterations -= 1 |
| return 42 |
| |
| if __name__ == "__main__": |
| print("Function returned:", bar()) |
| """) |
| self._create_script(script=script) |
| process, client_file = self._connect_and_get_client_file() |
| |
| # Accept a 2nd connection from the subprocess to tell it about signals |
| signal_sock, _ = self.server_sock.accept() |
| self.addCleanup(signal_sock.close) |
| |
| with kill_on_error(process): |
| # Skip initial messages until we get to the prompt |
| self._read_until_prompt(client_file) |
| |
| # Continue execution |
| self._send_command(client_file, "c") |
| |
| # Confirm that the remote is already in the while loop. We know |
| # it's in bar() and we can exit the loop immediately by setting |
| # iterations to 0. |
| while line := process.stdout.readline(): |
| if line.startswith("Iteration"): |
| break |
| |
| # Inject a script to interrupt the running process |
| signal_sock.sendall(signal.SIGINT.to_bytes()) |
| messages = self._read_until_prompt(client_file) |
| |
| # Verify we got the keyboard interrupt message. |
| interrupt_msgs = [msg['message'] for msg in messages if 'message' in msg] |
| expected_msg = [msg for msg in interrupt_msgs if "bar()" in msg] |
| self.assertGreater(len(expected_msg), 0) |
| |
| # Continue to end as fast as we can |
| self._send_command(client_file, "iterations = 0") |
| self._send_command(client_file, "c") |
| stdout, _ = process.communicate(timeout=SHORT_TIMEOUT) |
| self.assertIn("Function returned: 42", stdout) |
| self.assertEqual(process.returncode, 0) |
| |
| def test_handle_eof(self): |
| """Test that EOF signal properly exits the debugger.""" |
| self._create_script() |
| process, client_file = self._connect_and_get_client_file() |
| |
| with kill_on_error(process): |
| # Skip initial messages until we get to the prompt |
| self._read_until_prompt(client_file) |
| |
| # Send EOF signal to exit the debugger |
| client_file.write(json.dumps({"signal": "EOF"}).encode() + b"\n") |
| client_file.flush() |
| |
| # The process should complete normally after receiving EOF |
| stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) |
| |
| # Verify process completed correctly |
| self.assertIn("Function returned: 42", stdout) |
| self.assertEqual(process.returncode, 0) |
| self.assertEqual(stderr, "") |
| |
| def test_protocol_version(self): |
| """Test that incompatible protocol versions are properly detected.""" |
| # Create a script using an incompatible protocol version |
| script = textwrap.dedent(f''' |
| import sys |
| import pdb |
| |
| def run_test(): |
| frame = sys._getframe() |
| |
| # Use a fake version number that's definitely incompatible |
| fake_version = 0x01010101 # A fake version that doesn't match any real Python version |
| |
| # Connect with the wrong version |
| pdb._connect( |
| host='127.0.0.1', |
| port={self.port}, |
| frame=frame, |
| commands="", |
| version=fake_version, |
| signal_raising_thread=False, |
| colorize=False, |
| ) |
| |
| # This should print if the debugger detaches correctly |
| print("Debugger properly detected version mismatch") |
| return True |
| |
| if __name__ == "__main__": |
| print("Test result:", run_test()) |
| ''') |
| self._create_script(script=script) |
| process, client_file = self._connect_and_get_client_file() |
| |
| with kill_on_error(process): |
| # First message should be an error about protocol version mismatch |
| data = client_file.readline() |
| message = json.loads(data.decode()) |
| |
| self.assertIn('message', message) |
| self.assertEqual(message['type'], 'error') |
| self.assertIn('incompatible', message['message']) |
| self.assertIn('protocol version', message['message']) |
| |
| # The process should complete normally |
| stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) |
| |
| # Verify the process completed successfully |
| self.assertIn("Test result: True", stdout) |
| self.assertIn("Debugger properly detected version mismatch", stdout) |
| self.assertEqual(process.returncode, 0) |
| |
| def test_help_system(self): |
| """Test that the help system properly sends help text to the client.""" |
| self._create_script() |
| process, client_file = self._connect_and_get_client_file() |
| |
| with kill_on_error(process): |
| # Skip initial messages until we get to the prompt |
| self._read_until_prompt(client_file) |
| |
| # Request help for different commands |
| help_commands = ["help", "help break", "help continue", "help pdb"] |
| |
| for cmd in help_commands: |
| self._send_command(client_file, cmd) |
| |
| # Look for help message |
| data = client_file.readline() |
| message = json.loads(data.decode()) |
| |
| self.assertIn('help', message) |
| |
| if cmd == "help": |
| # Should just contain the command itself |
| self.assertEqual(message['help'], "") |
| else: |
| # Should contain the specific command we asked for help with |
| command = cmd.split()[1] |
| self.assertEqual(message['help'], command) |
| |
| # Skip to the next prompt |
| self._read_until_prompt(client_file) |
| |
| # Continue execution to finish the program |
| self._send_command(client_file, "c") |
| |
| stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) |
| self.assertIn("Function returned: 42", stdout) |
| self.assertEqual(process.returncode, 0) |
| |
| def test_multi_line_commands(self): |
| """Test that multi-line commands work properly over remote connection.""" |
| self._create_script() |
| process, client_file = self._connect_and_get_client_file() |
| |
| with kill_on_error(process): |
| # Skip initial messages until we get to the prompt |
| self._read_until_prompt(client_file) |
| |
| # Send a multi-line command |
| multi_line_commands = [ |
| # Define a function |
| "def test_func():\n return 42", |
| |
| # For loop |
| "for i in range(3):\n print(i)", |
| |
| # If statement |
| "if True:\n x = 42\nelse:\n x = 0", |
| |
| # Try/except |
| "try:\n result = 10/2\n print(result)\nexcept ZeroDivisionError:\n print('Error')", |
| |
| # Class definition |
| "class TestClass:\n def __init__(self):\n self.value = 100\n def get_value(self):\n return self.value" |
| ] |
| |
| for cmd in multi_line_commands: |
| self._send_command(client_file, cmd) |
| self._read_until_prompt(client_file) |
| |
| # Test executing the defined function |
| self._send_command(client_file, "test_func()") |
| messages = self._read_until_prompt(client_file) |
| |
| # Find the result message |
| result_msg = next(msg['message'] for msg in messages if 'message' in msg) |
| self.assertIn("42", result_msg) |
| |
| # Test creating an instance of the defined class |
| self._send_command(client_file, "obj = TestClass()") |
| self._read_until_prompt(client_file) |
| |
| # Test calling a method on the instance |
| self._send_command(client_file, "obj.get_value()") |
| messages = self._read_until_prompt(client_file) |
| |
| # Find the result message |
| result_msg = next(msg['message'] for msg in messages if 'message' in msg) |
| self.assertIn("100", result_msg) |
| |
| # Continue execution to finish |
| self._send_command(client_file, "c") |
| |
| stdout, stderr = process.communicate(timeout=SHORT_TIMEOUT) |
| self.assertIn("Function returned: 42", stdout) |
| self.assertEqual(process.returncode, 0) |
| |
| |
| def _supports_remote_attaching(): |
| PROCESS_VM_READV_SUPPORTED = False |
| |
| try: |
| from _remote_debugging import PROCESS_VM_READV_SUPPORTED |
| except ImportError: |
| pass |
| |
| return PROCESS_VM_READV_SUPPORTED |
| |
| |
| @unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not enabled") |
| @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and sys.platform != "win32", |
| "Test only runs on Linux, Windows and MacOS") |
| @unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(), |
| "Testing on Linux requires process_vm_readv support") |
| @cpython_only |
| @requires_subprocess() |
| class PdbAttachTestCase(unittest.TestCase): |
| def setUp(self): |
| # Create a server socket that will wait for the debugger to connect |
| self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| self.sock.bind(('127.0.0.1', 0)) # Let OS assign port |
| self.sock.listen(1) |
| self.port = self.sock.getsockname()[1] |
| self._create_script() |
| |
| def _create_script(self, script=None): |
| # Create a file for subprocess script |
| script = textwrap.dedent( |
| f""" |
| import socket |
| import time |
| |
| def foo(): |
| return bar() |
| |
| def bar(): |
| return baz() |
| |
| def baz(): |
| x = 1 |
| # Trigger attach |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.connect(('127.0.0.1', {self.port})) |
| sock.close() |
| count = 0 |
| while x == 1 and count < 100: |
| count += 1 |
| time.sleep(0.1) |
| return x |
| |
| result = foo() |
| print(f"Function returned: {{result}}") |
| """ |
| ) |
| |
| self.script_path = TESTFN + "_connect_test.py" |
| with open(self.script_path, 'w') as f: |
| f.write(script) |
| |
| def tearDown(self): |
| self.sock.close() |
| try: |
| unlink(self.script_path) |
| except OSError: |
| pass |
| |
| def do_integration_test(self, client_stdin): |
| process = subprocess.Popen( |
| [sys.executable, self.script_path], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True |
| ) |
| self.addCleanup(process.stdout.close) |
| self.addCleanup(process.stderr.close) |
| |
| # Wait for the process to reach our attachment point |
| self.sock.settimeout(10) |
| conn, _ = self.sock.accept() |
| conn.close() |
| |
| client_stdin = io.StringIO(client_stdin) |
| client_stdout = io.StringIO() |
| client_stderr = io.StringIO() |
| |
| self.addCleanup(client_stdin.close) |
| self.addCleanup(client_stdout.close) |
| self.addCleanup(client_stderr.close) |
| self.addCleanup(process.wait) |
| |
| with ( |
| unittest.mock.patch("sys.stdin", client_stdin), |
| redirect_stdout(client_stdout), |
| redirect_stderr(client_stderr), |
| unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]), |
| unittest.mock.patch( |
| "pdb.exit_with_permission_help_text", side_effect=PermissionError |
| ), |
| ): |
| try: |
| pdb.main() |
| except PermissionError: |
| self.skipTest("Insufficient permissions for remote execution") |
| |
| process.wait() |
| server_stdout = process.stdout.read() |
| server_stderr = process.stderr.read() |
| |
| if process.returncode != 0: |
| print("server failed") |
| print(f"server stdout:\n{server_stdout}") |
| print(f"server stderr:\n{server_stderr}") |
| |
| self.assertEqual(process.returncode, 0) |
| return { |
| "client": { |
| "stdout": client_stdout.getvalue(), |
| "stderr": client_stderr.getvalue(), |
| }, |
| "server": { |
| "stdout": server_stdout, |
| "stderr": server_stderr, |
| }, |
| } |
| |
| def test_attach_to_process_without_colors(self): |
| with force_color(False): |
| output = self.do_integration_test("ll\nx=42\n") |
| self.assertEqual(output["client"]["stderr"], "") |
| self.assertEqual(output["server"]["stderr"], "") |
| |
| self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") |
| self.assertIn("while x == 1", output["client"]["stdout"]) |
| self.assertNotIn("\x1b", output["client"]["stdout"]) |
| |
| def test_attach_to_process_with_colors(self): |
| with force_color(True): |
| output = self.do_integration_test("ll\nx=42\n") |
| self.assertEqual(output["client"]["stderr"], "") |
| self.assertEqual(output["server"]["stderr"], "") |
| |
| self.assertEqual(output["server"]["stdout"], "Function returned: 42\n") |
| self.assertIn("\x1b", output["client"]["stdout"]) |
| self.assertNotIn("while x == 1", output["client"]["stdout"]) |
| self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", output["client"]["stdout"])) |
| |
| def test_attach_to_non_existent_process(self): |
| with force_color(False): |
| result = subprocess.run([sys.executable, "-m", "pdb", "-p", "999999"], text=True, capture_output=True) |
| self.assertNotEqual(result.returncode, 0) |
| if sys.platform == "darwin": |
| # On MacOS, attaching to a non-existent process gives PermissionError |
| error = "The specified process cannot be attached to due to insufficient permissions" |
| else: |
| error = "Cannot attach to pid 999999, please make sure that the process exists" |
| self.assertIn(error, result.stdout) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |