| import errno |
| import itertools |
| import os |
| import signal |
| import subprocess |
| import sys |
| import threading |
| import unittest |
| from functools import partial |
| from test import support |
| from test.support import os_helper, force_not_colorized_test_class |
| from test.support import script_helper, threading_helper |
| |
| from unittest import TestCase |
| from unittest.mock import MagicMock, call, patch, ANY, Mock |
| |
| from .support import handle_all_events, code_to_events |
| |
| try: |
| from _pyrepl.console import Event |
| from _pyrepl.unix_console import UnixConsole |
| except ImportError: |
| pass |
| |
| from _pyrepl.terminfo import _TERMINAL_CAPABILITIES |
| |
| TERM_CAPABILITIES = _TERMINAL_CAPABILITIES["ansi"] |
| |
| |
| def unix_console(events, **kwargs): |
| console = UnixConsole(term="xterm") |
| console.get_event = MagicMock(side_effect=events) |
| console.getpending = MagicMock(return_value=Event("key", "")) |
| |
| height = kwargs.get("height", 25) |
| width = kwargs.get("width", 80) |
| console.getheightwidth = MagicMock(side_effect=lambda: (height, width)) |
| console.wait = MagicMock() |
| |
| console.prepare() |
| for key, val in kwargs.items(): |
| setattr(console, key, val) |
| return console |
| |
| |
| handle_events_unix_console = partial( |
| handle_all_events, |
| prepare_console=unix_console, |
| ) |
| handle_events_narrow_unix_console = partial( |
| handle_all_events, |
| prepare_console=partial(unix_console, width=5), |
| ) |
| handle_events_short_unix_console = partial( |
| handle_all_events, |
| prepare_console=partial(unix_console, height=1), |
| ) |
| handle_events_unix_console_height_3 = partial( |
| handle_all_events, prepare_console=partial(unix_console, height=3) |
| ) |
| |
| |
| @unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows") |
| @patch( |
| "_pyrepl.terminfo.tparm", |
| lambda s, *args: s + b":" + b",".join(str(i).encode() for i in args), |
| ) |
| @patch( |
| "termios.tcgetattr", |
| lambda _: [ |
| 27394, |
| 3, |
| 19200, |
| 536872399, |
| 38400, |
| 38400, |
| [ |
| b"\x04", |
| b"\xff", |
| b"\xff", |
| b"\x7f", |
| b"\x17", |
| b"\x15", |
| b"\x12", |
| b"\x00", |
| b"\x03", |
| b"\x1c", |
| b"\x1a", |
| b"\x19", |
| b"\x11", |
| b"\x13", |
| b"\x16", |
| b"\x0f", |
| b"\x01", |
| b"\x00", |
| b"\x14", |
| b"\x00", |
| ], |
| ], |
| ) |
| @patch("termios.tcsetattr", lambda a, b, c: None) |
| @patch("os.write") |
| @force_not_colorized_test_class |
| class TestConsole(TestCase): |
| def test_simple_addition(self, _os_write): |
| code = "12+34" |
| events = code_to_events(code) |
| _, con = handle_events_unix_console(events) |
| _os_write.assert_any_call(ANY, b"1") |
| _os_write.assert_any_call(ANY, b"2") |
| _os_write.assert_any_call(ANY, b"+") |
| _os_write.assert_any_call(ANY, b"3") |
| _os_write.assert_any_call(ANY, b"4") |
| con.restore() |
| |
| def test_wrap(self, _os_write): |
| code = "12+34" |
| events = code_to_events(code) |
| _, con = handle_events_narrow_unix_console(events) |
| _os_write.assert_any_call(ANY, b"1") |
| _os_write.assert_any_call(ANY, b"2") |
| _os_write.assert_any_call(ANY, b"+") |
| _os_write.assert_any_call(ANY, b"3") |
| _os_write.assert_any_call(ANY, b"\\") |
| _os_write.assert_any_call(ANY, b"\n") |
| _os_write.assert_any_call(ANY, b"4") |
| con.restore() |
| |
| def test_cursor_left(self, _os_write): |
| code = "1" |
| events = itertools.chain( |
| code_to_events(code), |
| [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], |
| ) |
| _, con = handle_events_unix_console(events) |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") |
| con.restore() |
| |
| def test_cursor_left_right(self, _os_write): |
| code = "1" |
| events = itertools.chain( |
| code_to_events(code), |
| [ |
| Event(evt="key", data="left", raw=bytearray(b"\x1bOD")), |
| Event(evt="key", data="right", raw=bytearray(b"\x1bOC")), |
| ], |
| ) |
| _, con = handle_events_unix_console(events) |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuf"] + b":1") |
| con.restore() |
| |
| def test_cursor_up(self, _os_write): |
| code = "1\n2+3" |
| events = itertools.chain( |
| code_to_events(code), |
| [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))], |
| ) |
| _, con = handle_events_unix_console(events) |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") |
| con.restore() |
| |
| def test_cursor_up_down(self, _os_write): |
| code = "1\n2+3" |
| events = itertools.chain( |
| code_to_events(code), |
| [ |
| Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), |
| Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), |
| ], |
| ) |
| _, con = handle_events_unix_console(events) |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cuu"] + b":1") |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cud"] + b":1") |
| con.restore() |
| |
| def test_cursor_back_write(self, _os_write): |
| events = itertools.chain( |
| code_to_events("1"), |
| [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))], |
| code_to_events("2"), |
| ) |
| _, con = handle_events_unix_console(events) |
| _os_write.assert_any_call(ANY, b"1") |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["cub"] + b":1") |
| _os_write.assert_any_call(ANY, b"2") |
| con.restore() |
| |
| def test_multiline_function_move_up_short_terminal(self, _os_write): |
| # fmt: off |
| code = ( |
| "def f():\n" |
| " foo" |
| ) |
| # fmt: on |
| |
| events = itertools.chain( |
| code_to_events(code), |
| [ |
| Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), |
| Event(evt="scroll", data=None), |
| ], |
| ) |
| _, con = handle_events_short_unix_console(events) |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") |
| con.restore() |
| |
| def test_multiline_function_move_up_down_short_terminal(self, _os_write): |
| # fmt: off |
| code = ( |
| "def f():\n" |
| " foo" |
| ) |
| # fmt: on |
| |
| events = itertools.chain( |
| code_to_events(code), |
| [ |
| Event(evt="key", data="up", raw=bytearray(b"\x1bOA")), |
| Event(evt="scroll", data=None), |
| Event(evt="key", data="down", raw=bytearray(b"\x1bOB")), |
| Event(evt="scroll", data=None), |
| ], |
| ) |
| _, con = handle_events_short_unix_console(events) |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ri"] + b":") |
| _os_write.assert_any_call(ANY, TERM_CAPABILITIES["ind"] + b":") |
| con.restore() |
| |
| def test_resize_bigger_on_multiline_function(self, _os_write): |
| # fmt: off |
| code = ( |
| "def f():\n" |
| " foo" |
| ) |
| # fmt: on |
| |
| events = itertools.chain(code_to_events(code)) |
| reader, console = handle_events_short_unix_console(events) |
| |
| console.height = 2 |
| console.getheightwidth = MagicMock(lambda _: (2, 80)) |
| |
| def same_reader(_): |
| return reader |
| |
| def same_console(events): |
| console.get_event = MagicMock(side_effect=events) |
| return console |
| |
| _, con = handle_all_events( |
| [Event(evt="resize", data=None)], |
| prepare_reader=same_reader, |
| prepare_console=same_console, |
| ) |
| _os_write.assert_has_calls( |
| [ |
| call(ANY, TERM_CAPABILITIES["ri"] + b":"), |
| call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), |
| call(ANY, b"def f():"), |
| ] |
| ) |
| console.restore() |
| con.restore() |
| |
| def test_resize_smaller_on_multiline_function(self, _os_write): |
| # fmt: off |
| code = ( |
| "def f():\n" |
| " foo" |
| ) |
| # fmt: on |
| |
| events = itertools.chain(code_to_events(code)) |
| reader, console = handle_events_unix_console_height_3(events) |
| |
| console.height = 1 |
| console.getheightwidth = MagicMock(lambda _: (1, 80)) |
| |
| def same_reader(_): |
| return reader |
| |
| def same_console(events): |
| console.get_event = MagicMock(side_effect=events) |
| return console |
| |
| _, con = handle_all_events( |
| [Event(evt="resize", data=None)], |
| prepare_reader=same_reader, |
| prepare_console=same_console, |
| ) |
| _os_write.assert_has_calls( |
| [ |
| call(ANY, TERM_CAPABILITIES["ind"] + b":"), |
| call(ANY, TERM_CAPABILITIES["cup"] + b":0,0"), |
| call(ANY, b" foo"), |
| ] |
| ) |
| console.restore() |
| con.restore() |
| |
| def test_getheightwidth_with_invalid_environ(self, _os_write): |
| # gh-128636 |
| console = UnixConsole(term="xterm") |
| with os_helper.EnvironmentVarGuard() as env: |
| env["LINES"] = "" |
| self.assertIsInstance(console.getheightwidth(), tuple) |
| env["COLUMNS"] = "" |
| self.assertIsInstance(console.getheightwidth(), tuple) |
| os.environ = [] |
| self.assertIsInstance(console.getheightwidth(), tuple) |
| |
| @unittest.skipUnless(sys.platform == "darwin", "requires macOS") |
| def test_restore_with_invalid_environ_on_macos(self, _os_write): |
| # gh-128636 for macOS |
| console = UnixConsole(term="xterm") |
| with os_helper.EnvironmentVarGuard(): |
| os.environ = [] |
| console.prepare() # needed to call restore() |
| console.restore() # this should succeed |
| |
| @threading_helper.reap_threads |
| @threading_helper.requires_working_threading() |
| def test_restore_in_thread(self, _os_write): |
| # gh-139391: ensure that console.restore() silently suppresses |
| # exceptions when calling signal.signal() from a non-main thread. |
| console = unix_console([]) |
| console.old_sigwinch = signal.SIG_DFL |
| thread = threading.Thread(target=console.restore) |
| thread.start() |
| thread.join() # this should not raise |
| |
| |
| @unittest.skipIf(sys.platform == "win32", "No Unix console on Windows") |
| class TestUnixConsoleEIOHandling(TestCase): |
| |
| @patch('_pyrepl.unix_console.tcsetattr') |
| @patch('_pyrepl.unix_console.tcgetattr') |
| def test_eio_error_handling_in_restore(self, mock_tcgetattr, mock_tcsetattr): |
| |
| import termios |
| mock_termios = Mock() |
| mock_termios.iflag = 0 |
| mock_termios.oflag = 0 |
| mock_termios.cflag = 0 |
| mock_termios.lflag = 0 |
| mock_termios.cc = [0] * 32 |
| mock_termios.copy.return_value = mock_termios |
| mock_tcgetattr.return_value = mock_termios |
| |
| console = UnixConsole(term="xterm") |
| console.prepare() |
| |
| mock_tcsetattr.side_effect = termios.error(errno.EIO, "Input/output error") |
| |
| # EIO error should be handled gracefully in restore() |
| console.restore() |
| |
| @unittest.skipUnless(sys.platform == "linux", "Only valid on Linux") |
| def test_repl_eio(self): |
| # Use the pty-based approach to simulate EIO error |
| script_path = os.path.join(os.path.dirname(__file__), "eio_test_script.py") |
| |
| proc = script_helper.spawn_python( |
| "-S", script_path, |
| stderr=subprocess.PIPE, |
| text=True |
| ) |
| |
| ready_line = proc.stdout.readline().strip() |
| if ready_line != "READY" or proc.poll() is not None: |
| self.fail("Child process failed to start properly") |
| |
| os.kill(proc.pid, signal.SIGUSR1) |
| # sleep for pty to settle |
| _, err = proc.communicate(timeout=support.LONG_TIMEOUT) |
| self.assertEqual( |
| proc.returncode, |
| 1, |
| f"Expected EIO/ENXIO error, got return code {proc.returncode}", |
| ) |
| self.assertTrue( |
| ( |
| "Got EIO:" in err |
| or "Got ENXIO:" in err |
| ), |
| f"Expected EIO/ENXIO error message in stderr: {err}", |
| ) |