| """sqlite3 CLI tests.""" |
| import sqlite3 |
| import sys |
| import textwrap |
| import unittest |
| import unittest.mock |
| import os |
| |
| from sqlite3.__main__ import main as cli |
| from test.support.import_helper import import_module |
| from test.support.os_helper import TESTFN, unlink |
| from test.support.pty_helper import run_pty |
| from test.support import ( |
| captured_stdout, |
| captured_stderr, |
| captured_stdin, |
| force_not_colorized_test_class, |
| requires_subprocess, |
| verbose, |
| ) |
| |
| |
| @force_not_colorized_test_class |
| class CommandLineInterface(unittest.TestCase): |
| |
| def _do_test(self, *args, expect_success=True): |
| with ( |
| captured_stdout() as out, |
| captured_stderr() as err, |
| self.assertRaises(SystemExit) as cm |
| ): |
| cli(args) |
| return out.getvalue(), err.getvalue(), cm.exception.code |
| |
| def expect_success(self, *args): |
| out, err, code = self._do_test(*args) |
| self.assertEqual(code, 0, |
| "\n".join([f"Unexpected failure: {args=}", out, err])) |
| self.assertEqual(err, "") |
| return out |
| |
| def expect_failure(self, *args): |
| out, err, code = self._do_test(*args, expect_success=False) |
| self.assertNotEqual(code, 0, |
| "\n".join([f"Unexpected failure: {args=}", out, err])) |
| self.assertEqual(out, "") |
| return err |
| |
| def test_cli_help(self): |
| out = self.expect_success("-h") |
| self.assertIn("usage: ", out) |
| self.assertIn(" [-h] [-v] [filename] [sql]", out) |
| self.assertIn("Python sqlite3 CLI", out) |
| |
| def test_cli_version(self): |
| out = self.expect_success("-v") |
| self.assertIn(sqlite3.sqlite_version, out) |
| |
| def test_cli_execute_sql(self): |
| out = self.expect_success(":memory:", "select 1") |
| self.assertIn("(1,)", out) |
| |
| def test_cli_execute_too_much_sql(self): |
| stderr = self.expect_failure(":memory:", "select 1; select 2") |
| err = "ProgrammingError: You can only execute one statement at a time" |
| self.assertIn(err, stderr) |
| |
| def test_cli_execute_incomplete_sql(self): |
| stderr = self.expect_failure(":memory:", "sel") |
| self.assertIn("OperationalError (SQLITE_ERROR)", stderr) |
| |
| def test_cli_on_disk_db(self): |
| self.addCleanup(unlink, TESTFN) |
| out = self.expect_success(TESTFN, "create table t(t)") |
| self.assertEqual(out, "") |
| out = self.expect_success(TESTFN, "select count(t) from t") |
| self.assertIn("(0,)", out) |
| |
| |
| @force_not_colorized_test_class |
| class InteractiveSession(unittest.TestCase): |
| MEMORY_DB_MSG = "Connected to a transient in-memory database" |
| PS1 = "sqlite> " |
| PS2 = "... " |
| |
| def run_cli(self, *args, commands=()): |
| with ( |
| captured_stdin() as stdin, |
| captured_stdout() as stdout, |
| captured_stderr() as stderr, |
| self.assertRaises(SystemExit) as cm |
| ): |
| for cmd in commands: |
| stdin.write(cmd + "\n") |
| stdin.seek(0) |
| cli(args) |
| |
| out = stdout.getvalue() |
| err = stderr.getvalue() |
| self.assertEqual(cm.exception.code, 0, |
| f"Unexpected failure: {args=}\n{out}\n{err}") |
| return out, err |
| |
| def test_interact(self): |
| out, err = self.run_cli() |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 1) |
| self.assertEqual(out.count(self.PS2), 0) |
| |
| def test_interact_quit(self): |
| out, err = self.run_cli(commands=(".quit",)) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 1) |
| self.assertEqual(out.count(self.PS2), 0) |
| |
| def test_interact_version(self): |
| out, err = self.run_cli(commands=(".version",)) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertIn(sqlite3.sqlite_version + "\n", out) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 2) |
| self.assertEqual(out.count(self.PS2), 0) |
| self.assertIn(sqlite3.sqlite_version, out) |
| |
| def test_interact_empty_source(self): |
| out, err = self.run_cli(commands=("", " ")) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 3) |
| self.assertEqual(out.count(self.PS2), 0) |
| |
| def test_interact_dot_commands_unknown(self): |
| out, err = self.run_cli(commands=(".unknown_command", )) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 2) |
| self.assertEqual(out.count(self.PS2), 0) |
| self.assertIn('Error: unknown command: "', err) |
| # test "unknown_command" is pointed out in the error message |
| self.assertIn("unknown_command", err) |
| |
| def test_interact_dot_commands_empty(self): |
| out, err = self.run_cli(commands=(".")) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 2) |
| self.assertEqual(out.count(self.PS2), 0) |
| |
| def test_interact_dot_commands_with_whitespaces(self): |
| out, err = self.run_cli(commands=(".version ", ". version")) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertEqual(out.count(sqlite3.sqlite_version + "\n"), 2) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 3) |
| self.assertEqual(out.count(self.PS2), 0) |
| |
| def test_interact_valid_sql(self): |
| out, err = self.run_cli(commands=("SELECT 1;",)) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertIn("(1,)\n", out) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 2) |
| self.assertEqual(out.count(self.PS2), 0) |
| |
| def test_interact_incomplete_multiline_sql(self): |
| out, err = self.run_cli(commands=("SELECT 1",)) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertEndsWith(out, self.PS2) |
| self.assertEqual(out.count(self.PS1), 1) |
| self.assertEqual(out.count(self.PS2), 1) |
| |
| def test_interact_valid_multiline_sql(self): |
| out, err = self.run_cli(commands=("SELECT 1\n;",)) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertIn(self.PS2, out) |
| self.assertIn("(1,)\n", out) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 2) |
| self.assertEqual(out.count(self.PS2), 1) |
| |
| def test_interact_invalid_sql(self): |
| out, err = self.run_cli(commands=("sel;",)) |
| self.assertIn(self.MEMORY_DB_MSG, err) |
| self.assertIn("OperationalError (SQLITE_ERROR)", err) |
| self.assertEndsWith(out, self.PS1) |
| self.assertEqual(out.count(self.PS1), 2) |
| self.assertEqual(out.count(self.PS2), 0) |
| |
| def test_interact_on_disk_file(self): |
| self.addCleanup(unlink, TESTFN) |
| |
| out, err = self.run_cli(TESTFN, commands=("CREATE TABLE t(t);",)) |
| self.assertIn(TESTFN, err) |
| self.assertEndsWith(out, self.PS1) |
| |
| out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",)) |
| self.assertIn("(0,)\n", out) |
| |
| def test_color(self): |
| with unittest.mock.patch("_colorize.can_colorize", return_value=True): |
| out, err = self.run_cli(commands="TEXT\n") |
| self.assertIn("\x1b[1;35msqlite> \x1b[0m", out) |
| self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out) |
| out, err = self.run_cli(commands=("sel;",)) |
| self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: ' |
| '\x1b[35mnear "sel": syntax error\x1b[0m', err) |
| |
| |
| @requires_subprocess() |
| @force_not_colorized_test_class |
| class Completion(unittest.TestCase): |
| PS1 = "sqlite> " |
| |
| @classmethod |
| def setUpClass(cls): |
| readline = import_module("readline") |
| if readline.backend == "editline": |
| raise unittest.SkipTest("libedit readline is not supported") |
| |
| def write_input(self, input_, env=None): |
| script = textwrap.dedent(""" |
| import readline |
| from sqlite3.__main__ import main |
| |
| # Configure readline to ...: |
| # - hide control sequences surrounding each candidate |
| # - hide "Display all xxx possibilities? (y or n)" |
| # - show candidates one per line |
| readline.parse_and_bind("set colored-completion-prefix off") |
| readline.parse_and_bind("set completion-query-items 0") |
| readline.parse_and_bind("set page-completions off") |
| readline.parse_and_bind("set completion-display-width 0") |
| |
| main() |
| """) |
| return run_pty(script, input_, env) |
| |
| def test_complete_sql_keywords(self): |
| _sqlite3 = import_module("_sqlite3") |
| if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): |
| raise unittest.SkipTest("unable to determine SQLite keywords") |
| |
| # List candidates starting with 'S', there should be multiple matches. |
| input_ = b"S\t\tEL\t 1;\n.quit\n" |
| output = self.write_input(input_) |
| self.assertIn(b"SELECT", output) |
| self.assertIn(b"SET", output) |
| self.assertIn(b"SAVEPOINT", output) |
| self.assertIn(b"(1,)", output) |
| |
| # Keywords are completed in upper case for even lower case user input. |
| input_ = b"sel\t\t 1;\n.quit\n" |
| output = self.write_input(input_) |
| self.assertIn(b"SELECT", output) |
| self.assertIn(b"(1,)", output) |
| |
| # .commands are completed without changing case |
| input_ = b".ver\t\n.quit\n" |
| output = self.write_input(input_) |
| self.assertIn(b".version", output) |
| |
| def test_complete_table_indexes_triggers_views(self): |
| input_ = textwrap.dedent("""\ |
| CREATE TABLE _Table (id); |
| CREATE INDEX _Index ON _table (id); |
| CREATE TRIGGER _Trigger BEFORE INSERT |
| ON _Table BEGIN SELECT 1; END; |
| CREATE VIEW _View AS SELECT 1; |
| |
| CREATE TEMP TABLE _Temp_table (id); |
| CREATE INDEX temp._Temp_index ON _Temp_table (id); |
| CREATE TEMP TRIGGER _Temp_trigger BEFORE INSERT |
| ON _Table BEGIN SELECT 1; END; |
| CREATE TEMP VIEW _Temp_view AS SELECT 1; |
| |
| ATTACH ':memory:' AS attached; |
| CREATE TABLE attached._Attached_table (id); |
| CREATE INDEX attached._Attached_index ON _Attached_table (id); |
| CREATE TRIGGER attached._Attached_trigger BEFORE INSERT |
| ON _Attached_table BEGIN SELECT 1; END; |
| CREATE VIEW attached._Attached_view AS SELECT 1; |
| |
| SELECT id FROM _\t\tta\t; |
| .quit\n""").encode() |
| output = self.write_input(input_) |
| lines = output.decode().splitlines() |
| indices = [i for i, line in enumerate(lines) |
| if line.startswith(self.PS1)] |
| start, end = indices[-3], indices[-2] |
| candidates = [l.strip() for l in lines[start+1:end]] |
| self.assertEqual(candidates, |
| [ |
| "_Attached_index", |
| "_Attached_table", |
| "_Attached_trigger", |
| "_Attached_view", |
| "_Index", |
| "_Table", |
| "_Temp_index", |
| "_Temp_table", |
| "_Temp_trigger", |
| "_Temp_view", |
| "_Trigger", |
| "_View", |
| ], |
| ) |
| start, end = indices[-2], indices[-1] |
| # direct match with '_Table' completed, no candidates displayed |
| candidates = [l.strip() for l in lines[start+1:end]] |
| self.assertEqual(len(candidates), 0) |
| |
| @unittest.skipIf(sqlite3.sqlite_version_info < (3, 16, 0), |
| "PRAGMA table-valued function is not available until " |
| "SQLite 3.16.0") |
| def test_complete_columns(self): |
| input_ = textwrap.dedent("""\ |
| CREATE TABLE _table (_col_table); |
| CREATE TEMP TABLE _temp_table (_col_temp); |
| ATTACH ':memory:' AS attached; |
| CREATE TABLE attached._attached_table (_col_attached); |
| |
| SELECT _col_\t\tta\tFROM _table; |
| .quit\n""").encode() |
| output = self.write_input(input_) |
| lines = output.decode().splitlines() |
| indices = [ |
| i for i, line in enumerate(lines) if line.startswith(self.PS1) |
| ] |
| start, end = indices[-3], indices[-2] |
| candidates = [l.strip() for l in lines[start+1:end]] |
| |
| self.assertEqual( |
| candidates, ["_col_attached", "_col_table", "_col_temp"] |
| ) |
| |
| @unittest.skipIf(sqlite3.sqlite_version_info < (3, 30, 0), |
| "PRAGMA function_list is not available until " |
| "SQLite 3.30.0") |
| def test_complete_functions(self): |
| input_ = b"SELECT AV\t1);\n.quit\n" |
| output = self.write_input(input_) |
| self.assertIn(b"AVG(1);", output) |
| self.assertIn(b"(1.0,)", output) |
| |
| # Functions are completed in upper case for even lower case user input. |
| input_ = b"SELECT av\t1);\n.quit\n" |
| output = self.write_input(input_) |
| self.assertIn(b"AVG(1);", output) |
| self.assertIn(b"(1.0,)", output) |
| |
| def test_complete_schemata(self): |
| input_ = textwrap.dedent("""\ |
| ATTACH ':memory:' AS MixedCase; |
| -- Test '_' is escaped in Like pattern filtering |
| ATTACH ':memory:' AS _underscore; |
| -- Let database_list pragma have a 'temp' schema entry |
| CREATE TEMP TABLE _table (id); |
| |
| SELECT * FROM \t\tmIX\t.sqlite_master; |
| SELECT * FROM _und\t.sqlite_master; |
| .quit\n""").encode() |
| output = self.write_input(input_) |
| lines = output.decode().splitlines() |
| indices = [ |
| i for i, line in enumerate(lines) if line.startswith(self.PS1) |
| ] |
| start, end = indices[-4], indices[-3] |
| candidates = [l.strip() for l in lines[start+1:end]] |
| self.assertIn("MixedCase", candidates) |
| self.assertIn("_underscore", candidates) |
| self.assertIn("main", candidates) |
| self.assertIn("temp", candidates) |
| |
| @unittest.skipIf(sys.platform.startswith("freebsd"), |
| "Two actual tabs are inserted when there are no matching" |
| " completions in the pseudo-terminal opened by run_pty()" |
| " on FreeBSD") |
| def test_complete_no_match(self): |
| input_ = b"xyzzy\t\t\b\b\b\b\b\b\b.quit\n" |
| # Set NO_COLOR to disable coloring for self.PS1. |
| output = self.write_input(input_, env={**os.environ, "NO_COLOR": "1"}) |
| lines = output.decode().splitlines() |
| indices = ( |
| i for i, line in enumerate(lines, 1) |
| if line.startswith(f"{self.PS1}xyzzy") |
| ) |
| line_num = next(indices, -1) |
| self.assertNotEqual(line_num, -1) |
| # Completions occupy lines, assert no extra lines when there is nothing |
| # to complete. |
| self.assertEqual(line_num, len(lines)) |
| |
| def test_complete_no_input(self): |
| script = textwrap.dedent(""" |
| import readline |
| from sqlite3.__main__ import main |
| |
| # Configure readline to ...: |
| # - hide control sequences surrounding each candidate |
| # - hide "Display all xxx possibilities? (y or n)" |
| # - hide "--More--" |
| # - show candidates one per line |
| readline.parse_and_bind("set colored-completion-prefix off") |
| readline.parse_and_bind("set colored-stats off") |
| readline.parse_and_bind("set completion-query-items 0") |
| readline.parse_and_bind("set page-completions off") |
| readline.parse_and_bind("set completion-display-width 0") |
| readline.parse_and_bind("set show-all-if-ambiguous off") |
| readline.parse_and_bind("set show-all-if-unmodified off") |
| |
| main() |
| """) |
| input_ = b"\t\t.quit\n" |
| output = run_pty(script, input_, env={**os.environ, "NO_COLOR": "1"}) |
| try: |
| lines = output.decode().splitlines() |
| indices = [ |
| i for i, line in enumerate(lines) |
| if line.startswith(self.PS1) |
| ] |
| self.assertEqual(len(indices), 2) |
| start, end = indices |
| candidates = [l.strip() for l in lines[start+1:end]] |
| self.assertEqual(candidates, sorted(candidates)) |
| except: |
| if verbose: |
| print(' PTY output: '.center(30, '-')) |
| print(output.decode(errors='replace')) |
| print(' end PTY output '.center(30, '-')) |
| raise |
| |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |