| """Test the interactive interpreter.""" |
| |
| import os |
| import select |
| import subprocess |
| import sys |
| import unittest |
| from textwrap import dedent |
| from test import support |
| from test.support import ( |
| cpython_only, |
| has_subprocess_support, |
| os_helper, |
| SuppressCrashReport, |
| SHORT_TIMEOUT, |
| ) |
| from test.support.script_helper import kill_python |
| from test.support.import_helper import import_module |
| |
| try: |
| import pty |
| except ImportError: |
| pty = None |
| |
| |
| if not has_subprocess_support: |
| raise unittest.SkipTest("test module requires subprocess") |
| |
| |
| def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): |
| """Run the Python REPL with the given arguments. |
| |
| kw is extra keyword args to pass to subprocess.Popen. Returns a Popen |
| object. |
| """ |
| |
| # To run the REPL without using a terminal, spawn python with the command |
| # line option '-i' and the process name set to '<stdin>'. |
| # The directory of argv[0] must match the directory of the Python |
| # executable for the Popen() call to python to succeed as the directory |
| # path may be used by PyConfig_Get("module_search_paths") to build the |
| # default module search path. |
| stdin_fname = os.path.join(os.path.dirname(sys.executable), "<stdin>") |
| cmd_line = [stdin_fname, '-I', '-i'] |
| cmd_line.extend(args) |
| |
| # Set TERM=vt100, for the rationale see the comments in spawn_python() of |
| # test.support.script_helper. |
| env = kw.setdefault('env', dict(os.environ)) |
| env['TERM'] = 'vt100' |
| return subprocess.Popen(cmd_line, |
| executable=sys.executable, |
| text=True, |
| stdin=subprocess.PIPE, |
| stdout=stdout, stderr=stderr, |
| **kw) |
| |
| def run_on_interactive_mode(source): |
| """Spawn a new Python interpreter, pass the given |
| input source code from the stdin and return the |
| result back. If the interpreter exits non-zero, it |
| raises a ValueError.""" |
| |
| process = spawn_repl() |
| process.stdin.write(source) |
| output = kill_python(process) |
| |
| if process.returncode != 0: |
| raise ValueError("Process didn't exit properly.") |
| return output |
| |
| |
| @support.force_not_colorized_test_class |
| class TestInteractiveInterpreter(unittest.TestCase): |
| |
| @cpython_only |
| # Python built with Py_TRACE_REFS fail with a fatal error in |
| # _PyRefchain_Trace() on memory allocation error. |
| @unittest.skipIf(support.Py_TRACE_REFS, 'cannot test Py_TRACE_REFS build') |
| def test_no_memory(self): |
| import_module("_testcapi") |
| # Issue #30696: Fix the interactive interpreter looping endlessly when |
| # no memory. Check also that the fix does not break the interactive |
| # loop when an exception is raised. |
| user_input = """ |
| import sys, _testcapi |
| 1/0 |
| print('After the exception.') |
| _testcapi.set_nomemory(0) |
| sys.exit(0) |
| """ |
| user_input = dedent(user_input) |
| p = spawn_repl() |
| with SuppressCrashReport(): |
| p.stdin.write(user_input) |
| output = kill_python(p) |
| self.assertIn('After the exception.', output) |
| # Exit code 120: Py_FinalizeEx() failed to flush stdout and stderr. |
| self.assertIn(p.returncode, (1, 120)) |
| |
| @cpython_only |
| def test_multiline_string_parsing(self): |
| # bpo-39209: Multiline string tokens need to be handled in the tokenizer |
| # in two places: the interactive path and the non-interactive path. |
| user_input = '''\ |
| x = """<?xml version="1.0" encoding="iso-8859-1"?> |
| <test> |
| <Users> |
| <fun25> |
| <limits> |
| <total>0KiB</total> |
| <kbps>0</kbps> |
| <rps>1.3</rps> |
| <connections>0</connections> |
| </limits> |
| <usages> |
| <total>16738211KiB</total> |
| <kbps>237.15</kbps> |
| <rps>1.3</rps> |
| <connections>0</connections> |
| </usages> |
| <time_to_refresh>never</time_to_refresh> |
| <limit_exceeded_URL>none</limit_exceeded_URL> |
| </fun25> |
| </Users> |
| </test>""" |
| ''' |
| user_input = dedent(user_input) |
| p = spawn_repl() |
| p.stdin.write(user_input) |
| output = kill_python(p) |
| self.assertEqual(p.returncode, 0) |
| |
| def test_close_stdin(self): |
| user_input = dedent(''' |
| import os |
| print("before close") |
| os.close(0) |
| ''') |
| prepare_repl = dedent(''' |
| from test.support import suppress_msvcrt_asserts |
| suppress_msvcrt_asserts() |
| ''') |
| process = spawn_repl('-c', prepare_repl) |
| output = process.communicate(user_input)[0] |
| self.assertEqual(process.returncode, 0) |
| self.assertIn('before close', output) |
| |
| def test_interactive_traceback_reporting(self): |
| user_input = "1 / 0 / 3 / 4" |
| p = spawn_repl() |
| p.stdin.write(user_input) |
| output = kill_python(p) |
| self.assertEqual(p.returncode, 0) |
| |
| traceback_lines = output.splitlines()[-6:-1] |
| expected_lines = [ |
| "Traceback (most recent call last):", |
| " File \"<stdin>\", line 1, in <module>", |
| " 1 / 0 / 3 / 4", |
| " ~~^~~", |
| "ZeroDivisionError: division by zero", |
| ] |
| self.assertEqual(traceback_lines, expected_lines) |
| |
| def test_interactive_traceback_reporting_multiple_input(self): |
| user_input1 = dedent(""" |
| def foo(x): |
| 1 / x |
| |
| """) |
| p = spawn_repl() |
| p.stdin.write(user_input1) |
| user_input2 = "foo(0)" |
| p.stdin.write(user_input2) |
| output = kill_python(p) |
| self.assertEqual(p.returncode, 0) |
| |
| traceback_lines = output.splitlines()[-8:-1] |
| expected_lines = [ |
| ' File "<stdin>", line 1, in <module>', |
| ' foo(0)', |
| ' ~~~^^^', |
| ' File "<stdin>", line 2, in foo', |
| ' 1 / x', |
| ' ~~^~~', |
| 'ZeroDivisionError: division by zero' |
| ] |
| self.assertEqual(traceback_lines, expected_lines) |
| |
| def test_pythonstartup_error_reporting(self): |
| # errors based on https://github.com/python/cpython/issues/137576 |
| |
| def make_repl(env): |
| return subprocess.Popen( |
| [os.path.join(os.path.dirname(sys.executable), '<stdin>'), "-i"], |
| executable=sys.executable, |
| text=True, |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| env=env, |
| ) |
| |
| # case 1: error in user input, but PYTHONSTARTUP is fine |
| with os_helper.temp_dir() as tmpdir: |
| script = os.path.join(tmpdir, "pythonstartup.py") |
| with open(script, "w") as f: |
| f.write("print('from pythonstartup')" + os.linesep) |
| |
| env = os.environ.copy() |
| env['PYTHONSTARTUP'] = script |
| env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".pythonhist") |
| p = make_repl(env) |
| p.stdin.write("1/0") |
| output = kill_python(p) |
| expected = dedent(""" |
| Traceback (most recent call last): |
| File "<stdin>", line 1, in <module> |
| 1/0 |
| ~^~ |
| ZeroDivisionError: division by zero |
| """) |
| self.assertIn("from pythonstartup", output) |
| self.assertIn(expected, output) |
| |
| # case 2: error in PYTHONSTARTUP triggered by user input |
| with os_helper.temp_dir() as tmpdir: |
| script = os.path.join(tmpdir, "pythonstartup.py") |
| with open(script, "w") as f: |
| f.write("def foo():\n 1/0\n") |
| |
| env = os.environ.copy() |
| env['PYTHONSTARTUP'] = script |
| env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".pythonhist") |
| p = make_repl(env) |
| p.stdin.write('foo()') |
| output = kill_python(p) |
| expected = dedent(""" |
| Traceback (most recent call last): |
| File "<stdin>", line 1, in <module> |
| foo() |
| ~~~^^ |
| File "%s", line 2, in foo |
| 1/0 |
| ~^~ |
| ZeroDivisionError: division by zero |
| """) % script |
| self.assertIn(expected, output) |
| |
| |
| |
| def test_runsource_show_syntax_error_location(self): |
| user_input = dedent("""def f(x, x): ... |
| """) |
| p = spawn_repl() |
| p.stdin.write(user_input) |
| output = kill_python(p) |
| expected_lines = [ |
| ' def f(x, x): ...', |
| ' ^', |
| "SyntaxError: duplicate parameter 'x' in function definition" |
| ] |
| self.assertEqual(output.splitlines()[4:-1], expected_lines) |
| |
| def test_interactive_source_is_in_linecache(self): |
| user_input = dedent(""" |
| def foo(x): |
| return x + 1 |
| |
| def bar(x): |
| return foo(x) + 2 |
| """) |
| p = spawn_repl() |
| p.stdin.write(user_input) |
| user_input2 = dedent(""" |
| import linecache |
| print(linecache._interactive_cache[linecache._make_key(foo.__code__)]) |
| """) |
| p.stdin.write(user_input2) |
| output = kill_python(p) |
| self.assertEqual(p.returncode, 0) |
| expected = "(30, None, [\'def foo(x):\\n\', \' return x + 1\\n\', \'\\n\'], \'<stdin>\')" |
| self.assertIn(expected, output, expected) |
| |
| def test_asyncio_repl_reaches_python_startup_script(self): |
| with os_helper.temp_dir() as tmpdir: |
| script = os.path.join(tmpdir, "pythonstartup.py") |
| with open(script, "w") as f: |
| f.write("print('pythonstartup done!')" + os.linesep) |
| f.write("exit(0)" + os.linesep) |
| |
| env = os.environ.copy() |
| env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".asyncio_history") |
| env["PYTHONSTARTUP"] = script |
| subprocess.check_call( |
| [sys.executable, "-m", "asyncio"], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| env=env, |
| timeout=SHORT_TIMEOUT, |
| ) |
| |
| @unittest.skipUnless(pty, "requires pty") |
| def test_asyncio_repl_is_ok(self): |
| m, s = pty.openpty() |
| cmd = [sys.executable, "-I", "-m", "asyncio"] |
| env = os.environ.copy() |
| proc = subprocess.Popen( |
| cmd, |
| stdin=s, |
| stdout=s, |
| stderr=s, |
| text=True, |
| close_fds=True, |
| env=env, |
| ) |
| os.close(s) |
| os.write(m, b"await asyncio.sleep(0)\n") |
| os.write(m, b"exit()\n") |
| output = [] |
| while select.select([m], [], [], SHORT_TIMEOUT)[0]: |
| try: |
| data = os.read(m, 1024).decode("utf-8") |
| if not data: |
| break |
| except OSError: |
| break |
| output.append(data) |
| os.close(m) |
| try: |
| exit_code = proc.wait(timeout=SHORT_TIMEOUT) |
| except subprocess.TimeoutExpired: |
| proc.kill() |
| exit_code = proc.wait() |
| |
| self.assertEqual(exit_code, 0, "".join(output)) |
| |
| |
| @support.force_not_colorized_test_class |
| class TestInteractiveModeSyntaxErrors(unittest.TestCase): |
| |
| def test_interactive_syntax_error_correct_line(self): |
| output = run_on_interactive_mode(dedent("""\ |
| def f(): |
| print(0) |
| return yield 42 |
| """)) |
| |
| traceback_lines = output.splitlines()[-4:-1] |
| expected_lines = [ |
| ' return yield 42', |
| ' ^^^^^', |
| 'SyntaxError: invalid syntax' |
| ] |
| self.assertEqual(traceback_lines, expected_lines) |
| |
| |
| class TestAsyncioREPL(unittest.TestCase): |
| def test_multiple_statements_fail_early(self): |
| user_input = "1 / 0; print(f'afterwards: {1+1}')" |
| p = spawn_repl("-m", "asyncio") |
| p.stdin.write(user_input) |
| output = kill_python(p) |
| self.assertIn("ZeroDivisionError", output) |
| self.assertNotIn("afterwards: 2", output) |
| |
| def test_toplevel_contextvars_sync(self): |
| user_input = dedent("""\ |
| from contextvars import ContextVar |
| var = ContextVar("var", default="failed") |
| var.set("ok") |
| """) |
| p = spawn_repl("-m", "asyncio") |
| p.stdin.write(user_input) |
| user_input2 = dedent(""" |
| print(f"toplevel contextvar test: {var.get()}") |
| """) |
| p.stdin.write(user_input2) |
| output = kill_python(p) |
| self.assertEqual(p.returncode, 0) |
| expected = "toplevel contextvar test: ok" |
| self.assertIn(expected, output, expected) |
| |
| def test_toplevel_contextvars_async(self): |
| user_input = dedent("""\ |
| from contextvars import ContextVar |
| var = ContextVar('var', default='failed') |
| """) |
| p = spawn_repl("-m", "asyncio") |
| p.stdin.write(user_input) |
| user_input2 = "async def set_var(): var.set('ok')\n" |
| p.stdin.write(user_input2) |
| user_input3 = "await set_var()\n" |
| p.stdin.write(user_input3) |
| user_input4 = "print(f'toplevel contextvar test: {var.get()}')\n" |
| p.stdin.write(user_input4) |
| output = kill_python(p) |
| self.assertEqual(p.returncode, 0) |
| expected = "toplevel contextvar test: ok" |
| self.assertIn(expected, output, expected) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |