blob: 449a381303ef8e4a38121fb17dd9540555edae46 [file] [log] [blame]
from __future__ import with_statement
from webob import Request, Response
import sys, logging, threading, random, urllib2, socket, cgi
from contextlib import contextmanager
from nose.tools import assert_raises, eq_ as eq
from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer, ServerHandler
from Queue import Queue, Empty
log = logging.getLogger(__name__)
__test__ = (sys.version >= '2.6') # skip these tests on py2.5
def test_request_reading():
"""
Test actual request/response cycle in the presence of Request.copy()
and other methods that can potentially hang.
"""
with serve(_test_app_req_reading) as server:
for key in _test_ops_req_read:
resp = urllib2.urlopen(server.url+key, timeout=3)
assert resp.read() == "ok"
def _test_app_req_reading(env, sr):
req = Request(env)
log.debug('starting test operation: %s', req.path_info)
test_op = _test_ops_req_read[req.path_info]
test_op(req)
log.debug('done')
r = Response("ok")
return r(env, sr)
_test_ops_req_read = {
'/copy': lambda req: req.copy(),
'/read-all': lambda req: req.body_file.read(),
'/read-0': lambda req: req.body_file.read(0),
'/make-seekable': lambda req: req.make_body_seekable()
}
# TODO: remove server logging for interrupted requests
# TODO: test interrupted body directly
def test_interrupted_request():
with serve(_test_app_req_interrupt) as server:
for path in _test_ops_req_interrupt:
_send_interrupted_req(server, path)
try:
assert _global_res.get(timeout=1)
except Empty:
raise AssertionError("Error during test %s", path)
_global_res = Queue()
def _test_app_req_interrupt(env, sr):
req = Request(env)
assert req.content_length == 100000
op = _test_ops_req_interrupt[req.path_info]
log.info("Running test: %s", req.path_info)
assert_raises(IOError, op, req)
_global_res.put(True)
sr('200 OK', [])
return []
def _req_int_cgi(req):
assert req.body_file.read(0) == ''
#req.environ.setdefault('CONTENT_LENGTH', '0')
d = cgi.FieldStorage(
fp=req.body_file,
environ=req.environ,
)
def _req_int_readline(req):
try:
eq(req.body_file.readline(), 'a=b\n')
req.body_file.readline()
except IOError:
# too early to detect disconnect
raise AssertionError
req.body_file.readline()
_test_ops_req_interrupt = {
'/copy': lambda req: req.copy(),
'/read-body': lambda req: req.body,
'/read-post': lambda req: req.POST,
'/read-all': lambda req: req.body_file.read(),
'/read-too-much': lambda req: req.body_file.read(1<<30),
'/readline': _req_int_readline,
'/readlines': lambda req: req.body_file.readlines(),
'/read-cgi': _req_int_cgi,
'/make-seekable': lambda req: req.make_body_seekable()
}
def _send_interrupted_req(server, path='/'):
sock = socket.socket()
sock.connect(('localhost', server.server_port))
f = sock.makefile('wb')
f.write(_interrupted_req % path)
f.flush()
f.close()
sock.close()
_interrupted_req = (
"POST %s HTTP/1.0\r\n"
"content-type: application/x-www-form-urlencoded\r\n"
"content-length: 100000\r\n"
"\r\n"
)
_interrupted_req += 'a=b\nz='+'x'*10000
@contextmanager
def serve(app):
server = _make_test_server(app)
try:
#worker = threading.Thread(target=server.handle_request)
worker = threading.Thread(target=server.serve_forever)
worker.setDaemon(True)
worker.start()
server.url = "http://localhost:%d" % server.server_port
log.debug("server started on %s", server.url)
yield server
finally:
log.debug("shutting server down")
server.shutdown()
worker.join(1)
if worker.isAlive():
log.warning('worker is hanged')
else:
log.debug("server stopped")
class QuietHanlder(WSGIRequestHandler):
def log_request(self, *args):
pass
ServerHandler.handle_error = lambda: None
class QuietServer(WSGIServer):
def handle_error(self, req, addr):
pass
def _make_test_server(app):
maxport = ((1<<16)-1)
# we'll make 3 attempts to find a free port
for i in range(3, 0, -1):
try:
port = random.randint(maxport/2, maxport)
server = make_server('localhost', port, app,
server_class=QuietServer,
handler_class=QuietHanlder
)
server.timeout = 5
return server
except:
if i == 1:
raise
if __name__ == '__main__':
#test_request_reading()
test_interrupted_request()