| import io |
| import unittest |
| |
| |
| class TestThreadedTaskDispatcher(unittest.TestCase): |
| def _makeOne(self): |
| from waitress.task import ThreadedTaskDispatcher |
| |
| return ThreadedTaskDispatcher() |
| |
| def test_handler_thread_task_raises(self): |
| inst = self._makeOne() |
| inst.threads.add(0) |
| inst.logger = DummyLogger() |
| |
| class BadDummyTask(DummyTask): |
| def service(self): |
| super().service() |
| inst.stop_count += 1 |
| raise Exception |
| |
| task = BadDummyTask() |
| inst.logger = DummyLogger() |
| inst.queue.append(task) |
| inst.active_count += 1 |
| inst.handler_thread(0) |
| self.assertEqual(inst.stop_count, 0) |
| self.assertEqual(inst.active_count, 0) |
| self.assertEqual(inst.threads, set()) |
| self.assertEqual(len(inst.logger.logged), 1) |
| |
| def test_set_thread_count_increase(self): |
| inst = self._makeOne() |
| L = [] |
| inst.start_new_thread = lambda *x: L.append(x) |
| inst.set_thread_count(1) |
| self.assertEqual(L, [(inst.handler_thread, 0)]) |
| |
| def test_set_thread_count_increase_with_existing(self): |
| inst = self._makeOne() |
| L = [] |
| inst.threads = {0} |
| inst.start_new_thread = lambda *x: L.append(x) |
| inst.set_thread_count(2) |
| self.assertEqual(L, [(inst.handler_thread, 1)]) |
| |
| def test_set_thread_count_decrease(self): |
| inst = self._makeOne() |
| inst.threads = {0, 1} |
| inst.set_thread_count(1) |
| self.assertEqual(inst.stop_count, 1) |
| |
| def test_set_thread_count_same(self): |
| inst = self._makeOne() |
| L = [] |
| inst.start_new_thread = lambda *x: L.append(x) |
| inst.threads = {0} |
| inst.set_thread_count(1) |
| self.assertEqual(L, []) |
| |
| def test_add_task_with_idle_threads(self): |
| task = DummyTask() |
| inst = self._makeOne() |
| inst.threads.add(0) |
| inst.queue_logger = DummyLogger() |
| inst.add_task(task) |
| self.assertEqual(len(inst.queue), 1) |
| self.assertEqual(len(inst.queue_logger.logged), 0) |
| |
| def test_add_task_with_all_busy_threads(self): |
| task = DummyTask() |
| inst = self._makeOne() |
| inst.queue_logger = DummyLogger() |
| inst.add_task(task) |
| self.assertEqual(len(inst.queue_logger.logged), 1) |
| inst.add_task(task) |
| self.assertEqual(len(inst.queue_logger.logged), 2) |
| |
| def test_shutdown_one_thread(self): |
| inst = self._makeOne() |
| inst.threads.add(0) |
| inst.logger = DummyLogger() |
| task = DummyTask() |
| inst.queue.append(task) |
| self.assertEqual(inst.shutdown(timeout=0.01), True) |
| self.assertEqual( |
| inst.logger.logged, |
| [ |
| "1 thread(s) still running", |
| "Canceling 1 pending task(s)", |
| ], |
| ) |
| self.assertEqual(task.cancelled, True) |
| |
| def test_shutdown_no_threads(self): |
| inst = self._makeOne() |
| self.assertEqual(inst.shutdown(timeout=0.01), True) |
| |
| def test_shutdown_no_cancel_pending(self): |
| inst = self._makeOne() |
| self.assertEqual(inst.shutdown(cancel_pending=False, timeout=0.01), False) |
| |
| |
| class TestTask(unittest.TestCase): |
| def _makeOne(self, channel=None, request=None): |
| if channel is None: |
| channel = DummyChannel() |
| if request is None: |
| request = DummyParser() |
| from waitress.task import Task |
| |
| return Task(channel, request) |
| |
| def test_ctor_version_not_in_known(self): |
| request = DummyParser() |
| request.version = "8.4" |
| inst = self._makeOne(request=request) |
| self.assertEqual(inst.version, "1.0") |
| |
| def test_build_response_header_bad_http_version(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "8.4" |
| self.assertRaises(AssertionError, inst.build_response_header) |
| |
| def test_build_response_header_v10_keepalive_no_content_length(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.request.headers["CONNECTION"] = "keep-alive" |
| inst.version = "1.0" |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 4) |
| self.assertEqual(lines[0], b"HTTP/1.0 200 OK") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertTrue(("Connection", "close") in inst.response_headers) |
| |
| def test_build_response_header_v10_keepalive_with_content_length(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.request.headers["CONNECTION"] = "keep-alive" |
| inst.response_headers = [("Content-Length", "10")] |
| inst.version = "1.0" |
| inst.content_length = 0 |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 5) |
| self.assertEqual(lines[0], b"HTTP/1.0 200 OK") |
| self.assertEqual(lines[1], b"Connection: Keep-Alive") |
| self.assertEqual(lines[2], b"Content-Length: 10") |
| self.assertTrue(lines[3].startswith(b"Date:")) |
| self.assertEqual(lines[4], b"Server: waitress") |
| self.assertEqual(inst.close_on_finish, False) |
| |
| def test_build_response_header_v11_connection_closed_by_client(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.1" |
| inst.request.headers["CONNECTION"] = "close" |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 5) |
| self.assertEqual(lines[0], b"HTTP/1.1 200 OK") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| self.assertEqual(lines[4], b"Transfer-Encoding: chunked") |
| self.assertTrue(("Connection", "close") in inst.response_headers) |
| self.assertEqual(inst.close_on_finish, True) |
| |
| def test_build_response_header_v11_connection_keepalive_by_client(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.request.headers["CONNECTION"] = "keep-alive" |
| inst.version = "1.1" |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 5) |
| self.assertEqual(lines[0], b"HTTP/1.1 200 OK") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| self.assertEqual(lines[4], b"Transfer-Encoding: chunked") |
| self.assertTrue(("Connection", "close") in inst.response_headers) |
| self.assertEqual(inst.close_on_finish, True) |
| |
| def test_build_response_header_v11_200_no_content_length(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.1" |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 5) |
| self.assertEqual(lines[0], b"HTTP/1.1 200 OK") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| self.assertEqual(lines[4], b"Transfer-Encoding: chunked") |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertTrue(("Connection", "close") in inst.response_headers) |
| |
| def test_build_response_header_v11_204_no_content_length_or_transfer_encoding(self): |
| # RFC 7230: MUST NOT send Transfer-Encoding or Content-Length |
| # for any response with a status code of 1xx or 204. |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.1" |
| inst.status = "204 No Content" |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 4) |
| self.assertEqual(lines[0], b"HTTP/1.1 204 No Content") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertTrue(("Connection", "close") in inst.response_headers) |
| |
| def test_build_response_header_v11_1xx_no_content_length_or_transfer_encoding(self): |
| # RFC 7230: MUST NOT send Transfer-Encoding or Content-Length |
| # for any response with a status code of 1xx or 204. |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.1" |
| inst.status = "100 Continue" |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 4) |
| self.assertEqual(lines[0], b"HTTP/1.1 100 Continue") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertTrue(("Connection", "close") in inst.response_headers) |
| |
| def test_build_response_header_v11_304_no_content_length_or_transfer_encoding(self): |
| # RFC 7230: MUST NOT send Transfer-Encoding or Content-Length |
| # for any response with a status code of 1xx, 204 or 304. |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.1" |
| inst.status = "304 Not Modified" |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 4) |
| self.assertEqual(lines[0], b"HTTP/1.1 304 Not Modified") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertTrue(("Connection", "close") in inst.response_headers) |
| |
| def test_build_response_header_via_added(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.0" |
| inst.response_headers = [("Server", "abc")] |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 5) |
| self.assertEqual(lines[0], b"HTTP/1.0 200 OK") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: abc") |
| self.assertEqual(lines[4], b"Via: waitress") |
| |
| def test_build_response_header_date_exists(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.0" |
| inst.response_headers = [("Date", "date")] |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 4) |
| self.assertEqual(lines[0], b"HTTP/1.0 200 OK") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| |
| def test_build_response_header_preexisting_content_length(self): |
| inst = self._makeOne() |
| inst.request = DummyParser() |
| inst.version = "1.1" |
| inst.content_length = 100 |
| result = inst.build_response_header() |
| lines = filter_lines(result) |
| self.assertEqual(len(lines), 4) |
| self.assertEqual(lines[0], b"HTTP/1.1 200 OK") |
| self.assertEqual(lines[1], b"Content-Length: 100") |
| self.assertTrue(lines[2].startswith(b"Date:")) |
| self.assertEqual(lines[3], b"Server: waitress") |
| |
| def test_remove_content_length_header(self): |
| inst = self._makeOne() |
| inst.response_headers = [("Content-Length", "70")] |
| inst.remove_content_length_header() |
| self.assertEqual(inst.response_headers, []) |
| |
| def test_remove_content_length_header_with_other(self): |
| inst = self._makeOne() |
| inst.response_headers = [ |
| ("Content-Length", "70"), |
| ("Content-Type", "text/html"), |
| ] |
| inst.remove_content_length_header() |
| self.assertEqual(inst.response_headers, [("Content-Type", "text/html")]) |
| |
| def test_start(self): |
| inst = self._makeOne() |
| inst.start() |
| self.assertTrue(inst.start_time) |
| |
| def test_finish_didnt_write_header(self): |
| inst = self._makeOne() |
| inst.wrote_header = False |
| inst.complete = True |
| inst.finish() |
| self.assertTrue(inst.channel.written) |
| |
| def test_finish_wrote_header(self): |
| inst = self._makeOne() |
| inst.wrote_header = True |
| inst.finish() |
| self.assertFalse(inst.channel.written) |
| |
| def test_finish_chunked_response(self): |
| inst = self._makeOne() |
| inst.wrote_header = True |
| inst.chunked_response = True |
| inst.finish() |
| self.assertEqual(inst.channel.written, b"0\r\n\r\n") |
| |
| def test_write_wrote_header(self): |
| inst = self._makeOne() |
| inst.wrote_header = True |
| inst.complete = True |
| inst.content_length = 3 |
| inst.write(b"abc") |
| self.assertEqual(inst.channel.written, b"abc") |
| |
| def test_write_header_not_written(self): |
| inst = self._makeOne() |
| inst.wrote_header = False |
| inst.complete = True |
| inst.write(b"abc") |
| self.assertTrue(inst.channel.written) |
| self.assertEqual(inst.wrote_header, True) |
| |
| def test_write_start_response_uncalled(self): |
| inst = self._makeOne() |
| self.assertRaises(RuntimeError, inst.write, b"") |
| |
| def test_write_chunked_response(self): |
| inst = self._makeOne() |
| inst.wrote_header = True |
| inst.chunked_response = True |
| inst.complete = True |
| inst.write(b"abc") |
| self.assertEqual(inst.channel.written, b"3\r\nabc\r\n") |
| |
| def test_write_preexisting_content_length(self): |
| inst = self._makeOne() |
| inst.wrote_header = True |
| inst.complete = True |
| inst.content_length = 1 |
| inst.logger = DummyLogger() |
| inst.write(b"abc") |
| self.assertTrue(inst.channel.written) |
| self.assertEqual(inst.logged_write_excess, True) |
| self.assertEqual(len(inst.logger.logged), 1) |
| |
| |
| class TestWSGITask(unittest.TestCase): |
| def _makeOne(self, channel=None, request=None): |
| if channel is None: |
| channel = DummyChannel() |
| if request is None: |
| request = DummyParser() |
| from waitress.task import WSGITask |
| |
| return WSGITask(channel, request) |
| |
| def test_service(self): |
| inst = self._makeOne() |
| |
| def execute(): |
| inst.executed = True |
| |
| inst.execute = execute |
| inst.complete = True |
| inst.service() |
| self.assertTrue(inst.start_time) |
| self.assertTrue(inst.close_on_finish) |
| self.assertTrue(inst.channel.written) |
| self.assertEqual(inst.executed, True) |
| |
| def test_service_server_raises_socket_error(self): |
| import socket |
| |
| inst = self._makeOne() |
| |
| def execute(): |
| raise OSError |
| |
| inst.execute = execute |
| self.assertRaises(socket.error, inst.service) |
| self.assertTrue(inst.start_time) |
| self.assertTrue(inst.close_on_finish) |
| self.assertFalse(inst.channel.written) |
| |
| def test_execute_app_calls_start_response_twice_wo_exc_info(self): |
| def app(environ, start_response): |
| start_response("200 OK", []) |
| start_response("200 OK", []) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(AssertionError, inst.execute) |
| |
| def test_execute_app_calls_start_response_w_exc_info_complete(self): |
| def app(environ, start_response): |
| start_response("200 OK", [], [ValueError, ValueError(), None]) |
| return [b"a"] |
| |
| inst = self._makeOne() |
| inst.complete = True |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertTrue(inst.complete) |
| self.assertEqual(inst.status, "200 OK") |
| self.assertTrue(inst.channel.written) |
| |
| def test_execute_app_calls_start_response_w_excinf_headers_unwritten(self): |
| def app(environ, start_response): |
| start_response("200 OK", [], [ValueError, None, None]) |
| return [b"a"] |
| |
| inst = self._makeOne() |
| inst.wrote_header = False |
| inst.channel.server.application = app |
| inst.response_headers = [("a", "b")] |
| inst.execute() |
| self.assertTrue(inst.complete) |
| self.assertEqual(inst.status, "200 OK") |
| self.assertTrue(inst.channel.written) |
| self.assertFalse(("a", "b") in inst.response_headers) |
| |
| def test_execute_app_calls_start_response_w_excinf_headers_written(self): |
| def app(environ, start_response): |
| start_response("200 OK", [], [ValueError, ValueError(), None]) |
| |
| inst = self._makeOne() |
| inst.complete = True |
| inst.wrote_header = True |
| inst.channel.server.application = app |
| self.assertRaises(ValueError, inst.execute) |
| |
| def test_execute_bad_header_key(self): |
| def app(environ, start_response): |
| start_response("200 OK", [(None, "a")]) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(AssertionError, inst.execute) |
| |
| def test_execute_bad_header_value(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("a", None)]) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(AssertionError, inst.execute) |
| |
| def test_execute_hopbyhop_header(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("Connection", "close")]) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(AssertionError, inst.execute) |
| |
| def test_execute_bad_header_value_control_characters(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("a", "\n")]) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(ValueError, inst.execute) |
| |
| def test_execute_bad_header_name_control_characters(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("a\r", "value")]) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(ValueError, inst.execute) |
| |
| def test_execute_bad_status_control_characters(self): |
| def app(environ, start_response): |
| start_response("200 OK\r", []) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(ValueError, inst.execute) |
| |
| def test_preserve_header_value_order(self): |
| def app(environ, start_response): |
| write = start_response("200 OK", [("C", "b"), ("A", "b"), ("A", "a")]) |
| write(b"abc") |
| return [] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertTrue(b"A: b\r\nA: a\r\nC: b\r\n" in inst.channel.written) |
| |
| def test_execute_bad_status_value(self): |
| def app(environ, start_response): |
| start_response(None, []) |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| self.assertRaises(AssertionError, inst.execute) |
| |
| def test_execute_with_content_length_header(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("Content-Length", "1")]) |
| return [b"a"] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertEqual(inst.content_length, 1) |
| |
| def test_execute_app_calls_write(self): |
| def app(environ, start_response): |
| write = start_response("200 OK", [("Content-Length", "3")]) |
| write(b"abc") |
| return [] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertEqual(inst.channel.written[-3:], b"abc") |
| |
| def test_execute_app_returns_len1_chunk_without_cl(self): |
| def app(environ, start_response): |
| start_response("200 OK", []) |
| return [b"abc"] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertEqual(inst.content_length, 3) |
| |
| def test_execute_app_returns_empty_chunk_as_first(self): |
| def app(environ, start_response): |
| start_response("200 OK", []) |
| return ["", b"abc"] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertEqual(inst.content_length, None) |
| |
| def test_execute_app_returns_too_many_bytes(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("Content-Length", "1")]) |
| return [b"abc"] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.logger = DummyLogger() |
| inst.execute() |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertEqual(len(inst.logger.logged), 1) |
| |
| def test_execute_app_returns_too_few_bytes(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("Content-Length", "3")]) |
| return [b"a"] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.logger = DummyLogger() |
| inst.execute() |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertEqual(len(inst.logger.logged), 1) |
| |
| def test_execute_app_do_not_warn_on_head(self): |
| def app(environ, start_response): |
| start_response("200 OK", [("Content-Length", "3")]) |
| return [b""] |
| |
| inst = self._makeOne() |
| inst.request.command = "HEAD" |
| inst.channel.server.application = app |
| inst.logger = DummyLogger() |
| inst.execute() |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertEqual(len(inst.logger.logged), 0) |
| |
| def test_execute_app_without_body_204_logged(self): |
| def app(environ, start_response): |
| start_response("204 No Content", [("Content-Length", "3")]) |
| return [b"abc"] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.logger = DummyLogger() |
| inst.execute() |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertNotIn(b"abc", inst.channel.written) |
| self.assertNotIn(b"Content-Length", inst.channel.written) |
| self.assertNotIn(b"Transfer-Encoding", inst.channel.written) |
| self.assertEqual(len(inst.logger.logged), 1) |
| |
| def test_execute_app_without_body_304_logged(self): |
| def app(environ, start_response): |
| start_response("304 Not Modified", [("Content-Length", "3")]) |
| return [b"abc"] |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.logger = DummyLogger() |
| inst.execute() |
| self.assertEqual(inst.close_on_finish, True) |
| self.assertNotIn(b"abc", inst.channel.written) |
| self.assertNotIn(b"Content-Length", inst.channel.written) |
| self.assertNotIn(b"Transfer-Encoding", inst.channel.written) |
| self.assertEqual(len(inst.logger.logged), 1) |
| |
| def test_execute_app_returns_closeable(self): |
| class closeable(list): |
| def close(self): |
| self.closed = True |
| |
| foo = closeable([b"abc"]) |
| |
| def app(environ, start_response): |
| start_response("200 OK", [("Content-Length", "3")]) |
| return foo |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertEqual(foo.closed, True) |
| |
| def test_execute_app_returns_filewrapper_prepare_returns_True(self): |
| from waitress.buffers import ReadOnlyFileBasedBuffer |
| |
| f = io.BytesIO(b"abc") |
| app_iter = ReadOnlyFileBasedBuffer(f, 8192) |
| |
| def app(environ, start_response): |
| start_response("200 OK", [("Content-Length", "3")]) |
| return app_iter |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertTrue(inst.channel.written) # header |
| self.assertEqual(inst.channel.otherdata, [app_iter]) |
| |
| def test_execute_app_returns_filewrapper_prepare_returns_True_nocl(self): |
| from waitress.buffers import ReadOnlyFileBasedBuffer |
| |
| f = io.BytesIO(b"abc") |
| app_iter = ReadOnlyFileBasedBuffer(f, 8192) |
| |
| def app(environ, start_response): |
| start_response("200 OK", []) |
| return app_iter |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.execute() |
| self.assertTrue(inst.channel.written) # header |
| self.assertEqual(inst.channel.otherdata, [app_iter]) |
| self.assertEqual(inst.content_length, 3) |
| |
| def test_execute_app_returns_filewrapper_prepare_returns_True_badcl(self): |
| from waitress.buffers import ReadOnlyFileBasedBuffer |
| |
| f = io.BytesIO(b"abc") |
| app_iter = ReadOnlyFileBasedBuffer(f, 8192) |
| |
| def app(environ, start_response): |
| start_response("200 OK", []) |
| return app_iter |
| |
| inst = self._makeOne() |
| inst.channel.server.application = app |
| inst.content_length = 10 |
| inst.response_headers = [("Content-Length", "10")] |
| inst.execute() |
| self.assertTrue(inst.channel.written) # header |
| self.assertEqual(inst.channel.otherdata, [app_iter]) |
| self.assertEqual(inst.content_length, 3) |
| self.assertEqual(dict(inst.response_headers)["Content-Length"], "3") |
| |
| def test_get_environment_already_cached(self): |
| inst = self._makeOne() |
| inst.environ = object() |
| self.assertEqual(inst.get_environment(), inst.environ) |
| |
| def test_get_environment_path_startswith_more_than_one_slash(self): |
| inst = self._makeOne() |
| request = DummyParser() |
| request.path = "///abc" |
| inst.request = request |
| environ = inst.get_environment() |
| self.assertEqual(environ["PATH_INFO"], "/abc") |
| |
| def test_get_environment_path_empty(self): |
| inst = self._makeOne() |
| request = DummyParser() |
| request.path = "" |
| inst.request = request |
| environ = inst.get_environment() |
| self.assertEqual(environ["PATH_INFO"], "") |
| |
| def test_get_environment_no_query(self): |
| inst = self._makeOne() |
| request = DummyParser() |
| inst.request = request |
| environ = inst.get_environment() |
| self.assertEqual(environ["QUERY_STRING"], "") |
| |
| def test_get_environment_with_query(self): |
| inst = self._makeOne() |
| request = DummyParser() |
| request.query = "abc" |
| inst.request = request |
| environ = inst.get_environment() |
| self.assertEqual(environ["QUERY_STRING"], "abc") |
| |
| def test_get_environ_with_url_prefix_miss(self): |
| inst = self._makeOne() |
| inst.channel.server.adj.url_prefix = "/foo" |
| request = DummyParser() |
| request.path = "/bar" |
| inst.request = request |
| environ = inst.get_environment() |
| self.assertEqual(environ["PATH_INFO"], "/bar") |
| self.assertEqual(environ["SCRIPT_NAME"], "/foo") |
| |
| def test_get_environ_with_url_prefix_hit(self): |
| inst = self._makeOne() |
| inst.channel.server.adj.url_prefix = "/foo" |
| request = DummyParser() |
| request.path = "/foo/fuz" |
| inst.request = request |
| environ = inst.get_environment() |
| self.assertEqual(environ["PATH_INFO"], "/fuz") |
| self.assertEqual(environ["SCRIPT_NAME"], "/foo") |
| |
| def test_get_environ_with_url_prefix_empty_path(self): |
| inst = self._makeOne() |
| inst.channel.server.adj.url_prefix = "/foo" |
| request = DummyParser() |
| request.path = "/foo" |
| inst.request = request |
| environ = inst.get_environment() |
| self.assertEqual(environ["PATH_INFO"], "") |
| self.assertEqual(environ["SCRIPT_NAME"], "/foo") |
| |
| def test_get_environment_values(self): |
| import sys |
| |
| inst = self._makeOne() |
| request = DummyParser() |
| request.headers = { |
| "CONTENT_TYPE": "abc", |
| "CONTENT_LENGTH": "10", |
| "X_FOO": "BAR", |
| "CONNECTION": "close", |
| } |
| request.query = "abc" |
| inst.request = request |
| environ = inst.get_environment() |
| |
| # nail the keys of environ |
| self.assertEqual( |
| sorted(environ.keys()), |
| [ |
| "CONTENT_LENGTH", |
| "CONTENT_TYPE", |
| "HTTP_CONNECTION", |
| "HTTP_X_FOO", |
| "PATH_INFO", |
| "QUERY_STRING", |
| "REMOTE_ADDR", |
| "REMOTE_HOST", |
| "REMOTE_PORT", |
| "REQUEST_METHOD", |
| "SCRIPT_NAME", |
| "SERVER_NAME", |
| "SERVER_PORT", |
| "SERVER_PROTOCOL", |
| "SERVER_SOFTWARE", |
| "waitress.client_disconnected", |
| "wsgi.errors", |
| "wsgi.file_wrapper", |
| "wsgi.input", |
| "wsgi.input_terminated", |
| "wsgi.multiprocess", |
| "wsgi.multithread", |
| "wsgi.run_once", |
| "wsgi.url_scheme", |
| "wsgi.version", |
| ], |
| ) |
| |
| self.assertEqual(environ["REQUEST_METHOD"], "GET") |
| self.assertEqual(environ["SERVER_PORT"], "80") |
| self.assertEqual(environ["SERVER_NAME"], "localhost") |
| self.assertEqual(environ["SERVER_SOFTWARE"], "waitress") |
| self.assertEqual(environ["SERVER_PROTOCOL"], "HTTP/1.0") |
| self.assertEqual(environ["SCRIPT_NAME"], "") |
| self.assertEqual(environ["HTTP_CONNECTION"], "close") |
| self.assertEqual(environ["PATH_INFO"], "/") |
| self.assertEqual(environ["QUERY_STRING"], "abc") |
| self.assertEqual(environ["REMOTE_ADDR"], "127.0.0.1") |
| self.assertEqual(environ["REMOTE_HOST"], "127.0.0.1") |
| self.assertEqual(environ["REMOTE_PORT"], "39830") |
| self.assertEqual(environ["CONTENT_TYPE"], "abc") |
| self.assertEqual(environ["CONTENT_LENGTH"], "10") |
| self.assertEqual(environ["HTTP_X_FOO"], "BAR") |
| self.assertEqual(environ["wsgi.version"], (1, 0)) |
| self.assertEqual(environ["wsgi.url_scheme"], "http") |
| self.assertEqual(environ["wsgi.errors"], sys.stderr) |
| self.assertEqual(environ["wsgi.multithread"], True) |
| self.assertEqual(environ["wsgi.multiprocess"], False) |
| self.assertEqual(environ["wsgi.run_once"], False) |
| self.assertEqual(environ["wsgi.input"], "stream") |
| self.assertEqual(environ["wsgi.input_terminated"], True) |
| self.assertEqual(inst.environ, environ) |
| |
| |
| class TestErrorTask(unittest.TestCase): |
| def _makeOne(self, channel=None, request=None): |
| if channel is None: |
| channel = DummyChannel() |
| if request is None: |
| request = DummyParser() |
| request.error = self._makeDummyError() |
| from waitress.task import ErrorTask |
| |
| return ErrorTask(channel, request) |
| |
| def _makeDummyError(self): |
| from waitress.utilities import Error |
| |
| e = Error("body") |
| e.code = 432 |
| e.reason = "Too Ugly" |
| return e |
| |
| def test_execute_http_10(self): |
| inst = self._makeOne() |
| inst.execute() |
| lines = filter_lines(inst.channel.written) |
| self.assertEqual(len(lines), 9) |
| self.assertEqual(lines[0], b"HTTP/1.0 432 Too Ugly") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertEqual(lines[2], b"Content-Length: 43") |
| self.assertEqual(lines[3], b"Content-Type: text/plain") |
| self.assertTrue(lines[4]) |
| self.assertEqual(lines[5], b"Server: waitress") |
| self.assertEqual(lines[6], b"Too Ugly") |
| self.assertEqual(lines[7], b"body") |
| self.assertEqual(lines[8], b"(generated by waitress)") |
| |
| def test_execute_http_11(self): |
| inst = self._makeOne() |
| inst.version = "1.1" |
| inst.execute() |
| lines = filter_lines(inst.channel.written) |
| self.assertEqual(len(lines), 9) |
| self.assertEqual(lines[0], b"HTTP/1.1 432 Too Ugly") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertEqual(lines[2], b"Content-Length: 43") |
| self.assertEqual(lines[3], b"Content-Type: text/plain") |
| self.assertTrue(lines[4]) |
| self.assertEqual(lines[5], b"Server: waitress") |
| self.assertEqual(lines[6], b"Too Ugly") |
| self.assertEqual(lines[7], b"body") |
| self.assertEqual(lines[8], b"(generated by waitress)") |
| |
| def test_execute_http_11_close(self): |
| inst = self._makeOne() |
| inst.version = "1.1" |
| inst.request.headers["CONNECTION"] = "close" |
| inst.execute() |
| lines = filter_lines(inst.channel.written) |
| self.assertEqual(len(lines), 9) |
| self.assertEqual(lines[0], b"HTTP/1.1 432 Too Ugly") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertEqual(lines[2], b"Content-Length: 43") |
| self.assertEqual(lines[3], b"Content-Type: text/plain") |
| self.assertTrue(lines[4]) |
| self.assertEqual(lines[5], b"Server: waitress") |
| self.assertEqual(lines[6], b"Too Ugly") |
| self.assertEqual(lines[7], b"body") |
| self.assertEqual(lines[8], b"(generated by waitress)") |
| |
| def test_execute_http_11_keep_forces_close(self): |
| inst = self._makeOne() |
| inst.version = "1.1" |
| inst.request.headers["CONNECTION"] = "keep-alive" |
| inst.execute() |
| lines = filter_lines(inst.channel.written) |
| self.assertEqual(len(lines), 9) |
| self.assertEqual(lines[0], b"HTTP/1.1 432 Too Ugly") |
| self.assertEqual(lines[1], b"Connection: close") |
| self.assertEqual(lines[2], b"Content-Length: 43") |
| self.assertEqual(lines[3], b"Content-Type: text/plain") |
| self.assertTrue(lines[4]) |
| self.assertEqual(lines[5], b"Server: waitress") |
| self.assertEqual(lines[6], b"Too Ugly") |
| self.assertEqual(lines[7], b"body") |
| self.assertEqual(lines[8], b"(generated by waitress)") |
| |
| |
| class DummyTask: |
| serviced = False |
| cancelled = False |
| |
| def service(self): |
| self.serviced = True |
| |
| def cancel(self): |
| self.cancelled = True |
| |
| |
| class DummyAdj: |
| log_socket_errors = True |
| ident = "waitress" |
| host = "127.0.0.1" |
| port = 80 |
| url_prefix = "" |
| |
| |
| class DummyServer: |
| server_name = "localhost" |
| effective_port = 80 |
| |
| def __init__(self): |
| self.adj = DummyAdj() |
| |
| |
| class DummyChannel: |
| closed_when_done = False |
| adj = DummyAdj() |
| creation_time = 0 |
| addr = ("127.0.0.1", 39830) |
| |
| def check_client_disconnected(self): |
| # For now, until we have tests handling this feature |
| return False |
| |
| def __init__(self, server=None): |
| if server is None: |
| server = DummyServer() |
| self.server = server |
| self.written = b"" |
| self.otherdata = [] |
| |
| def write_soon(self, data): |
| if isinstance(data, bytes): |
| self.written += data |
| else: |
| self.otherdata.append(data) |
| return len(data) |
| |
| |
| class DummyParser: |
| version = "1.0" |
| command = "GET" |
| path = "/" |
| query = "" |
| url_scheme = "http" |
| expect_continue = False |
| headers_finished = False |
| |
| def __init__(self): |
| self.headers = {} |
| |
| def get_body_stream(self): |
| return "stream" |
| |
| |
| def filter_lines(s): |
| return list(filter(None, s.split(b"\r\n"))) |
| |
| |
| class DummyLogger: |
| def __init__(self): |
| self.logged = [] |
| |
| def warning(self, msg, *args): |
| self.logged.append(msg % args) |
| |
| def exception(self, msg, *args): |
| self.logged.append(msg % args) |