| import errno |
| import socket |
| import unittest |
| |
| dummy_app = object() |
| |
| |
| class TestWSGIServer(unittest.TestCase): |
| def _makeOne( |
| self, |
| application=dummy_app, |
| host="127.0.0.1", |
| port=0, |
| _dispatcher=None, |
| adj=None, |
| map=None, |
| _start=True, |
| _sock=None, |
| _server=None, |
| ): |
| from waitress.server import create_server |
| |
| self.inst = create_server( |
| application, |
| host=host, |
| port=port, |
| map=map, |
| _dispatcher=_dispatcher, |
| _start=_start, |
| _sock=_sock, |
| ) |
| return self.inst |
| |
| def _makeOneWithMap( |
| self, adj=None, _start=True, host="127.0.0.1", port=0, app=dummy_app |
| ): |
| sock = DummySock() |
| task_dispatcher = DummyTaskDispatcher() |
| map = {} |
| return self._makeOne( |
| app, |
| host=host, |
| port=port, |
| map=map, |
| _sock=sock, |
| _dispatcher=task_dispatcher, |
| _start=_start, |
| ) |
| |
| def _makeOneWithMulti( |
| self, adj=None, _start=True, app=dummy_app, listen="127.0.0.1:0 127.0.0.1:0" |
| ): |
| sock = DummySock() |
| task_dispatcher = DummyTaskDispatcher() |
| map = {} |
| from waitress.server import create_server |
| |
| self.inst = create_server( |
| app, |
| listen=listen, |
| map=map, |
| _dispatcher=task_dispatcher, |
| _start=_start, |
| _sock=sock, |
| ) |
| return self.inst |
| |
| def _makeWithSockets( |
| self, |
| application=dummy_app, |
| _dispatcher=None, |
| map=None, |
| _start=True, |
| _sock=None, |
| _server=None, |
| sockets=None, |
| ): |
| from waitress.server import create_server |
| |
| _sockets = [] |
| if sockets is not None: |
| _sockets = sockets |
| self.inst = create_server( |
| application, |
| map=map, |
| _dispatcher=_dispatcher, |
| _start=_start, |
| _sock=_sock, |
| sockets=_sockets, |
| ) |
| return self.inst |
| |
| def tearDown(self): |
| if self.inst is not None: |
| self.inst.close() |
| |
| def test_ctor_app_is_None(self): |
| self.inst = None |
| self.assertRaises(ValueError, self._makeOneWithMap, app=None) |
| |
| def test_ctor_start_true(self): |
| inst = self._makeOneWithMap(_start=True) |
| self.assertEqual(inst.accepting, True) |
| self.assertEqual(inst.socket.listened, 1024) |
| |
| def test_ctor_makes_dispatcher(self): |
| inst = self._makeOne(_start=False, map={}) |
| self.assertEqual( |
| inst.task_dispatcher.__class__.__name__, "ThreadedTaskDispatcher" |
| ) |
| |
| def test_ctor_start_false(self): |
| inst = self._makeOneWithMap(_start=False) |
| self.assertEqual(inst.accepting, False) |
| |
| def test_get_server_multi(self): |
| inst = self._makeOneWithMulti() |
| self.assertEqual(inst.__class__.__name__, "MultiSocketServer") |
| |
| def test_run(self): |
| inst = self._makeOneWithMap(_start=False) |
| inst.asyncore = DummyAsyncore() |
| inst.task_dispatcher = DummyTaskDispatcher() |
| inst.run() |
| self.assertTrue(inst.task_dispatcher.was_shutdown) |
| |
| def test_run_base_server(self): |
| inst = self._makeOneWithMulti(_start=False) |
| inst.asyncore = DummyAsyncore() |
| inst.task_dispatcher = DummyTaskDispatcher() |
| inst.run() |
| self.assertTrue(inst.task_dispatcher.was_shutdown) |
| |
| def test_pull_trigger(self): |
| inst = self._makeOneWithMap(_start=False) |
| inst.trigger.close() |
| inst.trigger = DummyTrigger() |
| inst.pull_trigger() |
| self.assertEqual(inst.trigger.pulled, True) |
| |
| def test_add_task(self): |
| task = DummyTask() |
| inst = self._makeOneWithMap() |
| inst.add_task(task) |
| self.assertEqual(inst.task_dispatcher.tasks, [task]) |
| self.assertFalse(task.serviced) |
| |
| def test_readable_not_accepting(self): |
| inst = self._makeOneWithMap() |
| inst.accepting = False |
| self.assertFalse(inst.readable()) |
| |
| def test_readable_maplen_gt_connection_limit(self): |
| inst = self._makeOneWithMap() |
| inst.accepting = True |
| inst.adj = DummyAdj |
| inst._map = {"a": 1, "b": 2} |
| self.assertFalse(inst.readable()) |
| self.assertTrue(inst.in_connection_overflow) |
| |
| def test_readable_maplen_lt_connection_limit(self): |
| inst = self._makeOneWithMap() |
| inst.accepting = True |
| inst.adj = DummyAdj |
| inst._map = {} |
| self.assertTrue(inst.readable()) |
| self.assertFalse(inst.in_connection_overflow) |
| |
| def test_readable_maplen_toggles_connection_overflow(self): |
| inst = self._makeOneWithMap() |
| inst.accepting = True |
| inst.adj = DummyAdj |
| inst._map = {"a": 1, "b": 2} |
| self.assertFalse(inst.in_connection_overflow) |
| self.assertFalse(inst.readable()) |
| self.assertTrue(inst.in_connection_overflow) |
| inst._map = {} |
| self.assertTrue(inst.readable()) |
| self.assertFalse(inst.in_connection_overflow) |
| |
| def test_readable_maintenance_false(self): |
| import time |
| |
| inst = self._makeOneWithMap() |
| then = time.time() + 1000 |
| inst.next_channel_cleanup = then |
| L = [] |
| inst.maintenance = lambda t: L.append(t) |
| inst.readable() |
| self.assertEqual(L, []) |
| self.assertEqual(inst.next_channel_cleanup, then) |
| |
| def test_readable_maintenance_true(self): |
| inst = self._makeOneWithMap() |
| inst.next_channel_cleanup = 0 |
| L = [] |
| inst.maintenance = lambda t: L.append(t) |
| inst.readable() |
| self.assertEqual(len(L), 1) |
| self.assertNotEqual(inst.next_channel_cleanup, 0) |
| |
| def test_writable(self): |
| inst = self._makeOneWithMap() |
| self.assertFalse(inst.writable()) |
| |
| def test_handle_read(self): |
| inst = self._makeOneWithMap() |
| self.assertEqual(inst.handle_read(), None) |
| |
| def test_handle_connect(self): |
| inst = self._makeOneWithMap() |
| self.assertEqual(inst.handle_connect(), None) |
| |
| def test_handle_accept_wouldblock_socket_error(self): |
| inst = self._makeOneWithMap() |
| ewouldblock = socket.error(errno.EWOULDBLOCK) |
| inst.socket = DummySock(toraise=ewouldblock) |
| inst.handle_accept() |
| self.assertEqual(inst.socket.accepted, False) |
| |
| def test_handle_accept_other_socket_error(self): |
| inst = self._makeOneWithMap() |
| eaborted = socket.error(errno.ECONNABORTED) |
| inst.socket = DummySock(toraise=eaborted) |
| inst.adj = DummyAdj |
| |
| def foo(): |
| raise OSError |
| |
| inst.accept = foo |
| inst.logger = DummyLogger() |
| inst.handle_accept() |
| self.assertEqual(inst.socket.accepted, False) |
| self.assertEqual(len(inst.logger.logged), 1) |
| |
| def test_handle_accept_noerror(self): |
| inst = self._makeOneWithMap() |
| innersock = DummySock() |
| inst.socket = DummySock(acceptresult=(innersock, None)) |
| inst.adj = DummyAdj |
| L = [] |
| inst.channel_class = lambda *arg, **kw: L.append(arg) |
| inst.handle_accept() |
| self.assertEqual(inst.socket.accepted, True) |
| self.assertEqual(innersock.opts, [("level", "optname", "value")]) |
| self.assertEqual(L, [(inst, innersock, None, inst.adj)]) |
| |
| def test_maintenance(self): |
| inst = self._makeOneWithMap() |
| |
| class DummyChannel: |
| requests = [] |
| |
| zombie = DummyChannel() |
| zombie.last_activity = 0 |
| zombie.running_tasks = False |
| inst.active_channels[100] = zombie |
| inst.maintenance(10000) |
| self.assertEqual(zombie.will_close, True) |
| |
| def test_backward_compatibility(self): |
| from waitress.adjustments import Adjustments |
| from waitress.server import TcpWSGIServer, WSGIServer |
| |
| self.assertTrue(WSGIServer is TcpWSGIServer) |
| self.inst = WSGIServer(None, _start=False, port=1234) |
| # Ensure the adjustment was actually applied. |
| self.assertNotEqual(Adjustments.port, 1234) |
| self.assertEqual(self.inst.adj.port, 1234) |
| |
| def test_create_with_one_tcp_socket(self): |
| from waitress.server import TcpWSGIServer |
| |
| sockets = [socket.socket(socket.AF_INET, socket.SOCK_STREAM)] |
| sockets[0].bind(("127.0.0.1", 0)) |
| inst = self._makeWithSockets(_start=False, sockets=sockets) |
| self.assertTrue(isinstance(inst, TcpWSGIServer)) |
| |
| def test_create_with_multiple_tcp_sockets(self): |
| from waitress.server import MultiSocketServer |
| |
| sockets = [ |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM), |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM), |
| ] |
| sockets[0].bind(("127.0.0.1", 0)) |
| sockets[1].bind(("127.0.0.1", 0)) |
| inst = self._makeWithSockets(_start=False, sockets=sockets) |
| self.assertTrue(isinstance(inst, MultiSocketServer)) |
| self.assertEqual(len(inst.effective_listen), 2) |
| |
| def test_create_with_one_socket_should_not_bind_socket(self): |
| innersock = DummySock() |
| sockets = [DummySock(acceptresult=(innersock, None))] |
| sockets[0].bind(("127.0.0.1", 80)) |
| sockets[0].bind_called = False |
| inst = self._makeWithSockets(_start=False, sockets=sockets) |
| self.assertEqual(inst.socket.bound, ("127.0.0.1", 80)) |
| self.assertFalse(inst.socket.bind_called) |
| |
| def test_create_with_one_socket_handle_accept_noerror(self): |
| innersock = DummySock() |
| sockets = [DummySock(acceptresult=(innersock, None))] |
| sockets[0].bind(("127.0.0.1", 80)) |
| inst = self._makeWithSockets(sockets=sockets) |
| L = [] |
| inst.channel_class = lambda *arg, **kw: L.append(arg) |
| inst.adj = DummyAdj |
| inst.handle_accept() |
| self.assertEqual(sockets[0].accepted, True) |
| self.assertEqual(innersock.opts, [("level", "optname", "value")]) |
| self.assertEqual(L, [(inst, innersock, None, inst.adj)]) |
| |
| |
| if hasattr(socket, "AF_UNIX"): |
| |
| class TestUnixWSGIServer(unittest.TestCase): |
| unix_socket = "/tmp/waitress.test.sock" |
| |
| def _makeOne(self, _start=True, _sock=None): |
| from waitress.server import create_server |
| |
| self.inst = create_server( |
| dummy_app, |
| map={}, |
| _start=_start, |
| _sock=_sock, |
| _dispatcher=DummyTaskDispatcher(), |
| unix_socket=self.unix_socket, |
| unix_socket_perms="600", |
| ) |
| return self.inst |
| |
| def _makeWithSockets( |
| self, |
| application=dummy_app, |
| _dispatcher=None, |
| map=None, |
| _start=True, |
| _sock=None, |
| _server=None, |
| sockets=None, |
| ): |
| from waitress.server import create_server |
| |
| _sockets = [] |
| if sockets is not None: |
| _sockets = sockets |
| self.inst = create_server( |
| application, |
| map=map, |
| _dispatcher=_dispatcher, |
| _start=_start, |
| _sock=_sock, |
| sockets=_sockets, |
| ) |
| return self.inst |
| |
| def tearDown(self): |
| self.inst.close() |
| |
| def _makeDummy(self, *args, **kwargs): |
| sock = DummySock(*args, **kwargs) |
| sock.family = socket.AF_UNIX |
| return sock |
| |
| def test_unix(self): |
| inst = self._makeOne(_start=False) |
| self.assertEqual(inst.socket.family, socket.AF_UNIX) |
| self.assertEqual(inst.socket.getsockname(), self.unix_socket) |
| |
| def test_handle_accept(self): |
| # Working on the assumption that we only have to test the happy path |
| # for Unix domain sockets as the other paths should've been covered |
| # by inet sockets. |
| client = self._makeDummy() |
| listen = self._makeDummy(acceptresult=(client, None)) |
| inst = self._makeOne(_sock=listen) |
| self.assertEqual(inst.accepting, True) |
| self.assertEqual(inst.socket.listened, 1024) |
| L = [] |
| inst.channel_class = lambda *arg, **kw: L.append(arg) |
| inst.handle_accept() |
| self.assertEqual(inst.socket.accepted, True) |
| self.assertEqual(client.opts, []) |
| self.assertEqual(L, [(inst, client, ("localhost", None), inst.adj)]) |
| |
| def test_creates_new_sockinfo(self): |
| from waitress.server import UnixWSGIServer |
| |
| self.inst = UnixWSGIServer( |
| dummy_app, unix_socket=self.unix_socket, unix_socket_perms="600" |
| ) |
| |
| self.assertEqual(self.inst.sockinfo[0], socket.AF_UNIX) |
| |
| def test_create_with_unix_socket(self): |
| from waitress.server import ( |
| BaseWSGIServer, |
| MultiSocketServer, |
| TcpWSGIServer, |
| UnixWSGIServer, |
| ) |
| |
| sockets = [ |
| socket.socket(socket.AF_UNIX, socket.SOCK_STREAM), |
| socket.socket(socket.AF_UNIX, socket.SOCK_STREAM), |
| ] |
| inst = self._makeWithSockets(sockets=sockets, _start=False) |
| self.assertTrue(isinstance(inst, MultiSocketServer)) |
| server = list( |
| filter(lambda s: isinstance(s, BaseWSGIServer), inst.map.values()) |
| ) |
| self.assertTrue(isinstance(server[0], UnixWSGIServer)) |
| self.assertTrue(isinstance(server[1], UnixWSGIServer)) |
| |
| |
| class DummySock(socket.socket): |
| accepted = False |
| blocking = False |
| family = socket.AF_INET |
| type = socket.SOCK_STREAM |
| proto = 0 |
| |
| def __init__(self, toraise=None, acceptresult=(None, None)): |
| self.toraise = toraise |
| self.acceptresult = acceptresult |
| self.bound = None |
| self.opts = [] |
| self.bind_called = False |
| |
| def bind(self, addr): |
| self.bind_called = True |
| self.bound = addr |
| |
| def accept(self): |
| if self.toraise: |
| raise self.toraise |
| self.accepted = True |
| return self.acceptresult |
| |
| def setblocking(self, x): |
| self.blocking = True |
| |
| def fileno(self): |
| return 10 |
| |
| def getpeername(self): |
| return "127.0.0.1" |
| |
| def setsockopt(self, *arg): |
| self.opts.append(arg) |
| |
| def getsockopt(self, *arg): |
| return 1 |
| |
| def listen(self, num): |
| self.listened = num |
| |
| def getsockname(self): |
| return self.bound |
| |
| def close(self): |
| pass |
| |
| |
| class DummyTaskDispatcher: |
| def __init__(self): |
| self.tasks = [] |
| |
| def add_task(self, task): |
| self.tasks.append(task) |
| |
| def shutdown(self): |
| self.was_shutdown = True |
| |
| |
| class DummyTask: |
| serviced = False |
| start_response_called = False |
| wrote_header = False |
| status = "200 OK" |
| |
| def __init__(self): |
| self.response_headers = {} |
| self.written = "" |
| |
| def service(self): # pragma: no cover |
| self.serviced = True |
| |
| |
| class DummyAdj: |
| connection_limit = 1 |
| log_socket_errors = True |
| socket_options = [("level", "optname", "value")] |
| cleanup_interval = 900 |
| channel_timeout = 300 |
| |
| |
| class DummyAsyncore: |
| def loop(self, timeout=30.0, use_poll=False, map=None, count=None): |
| raise SystemExit |
| |
| |
| class DummyTrigger: |
| def pull_trigger(self): |
| self.pulled = True |
| |
| def close(self): |
| pass |
| |
| |
| class DummyLogger: |
| def __init__(self): |
| self.logged = [] |
| |
| def warning(self, msg, **kw): |
| self.logged.append(msg) |