blob: e2633217ea06745486575d854a2912b8a115cbd0 [file] [log] [blame] [edit]
import sys
from tests.compat import unittest
from webob import Request, Response
import warnings
from unittest import mock
from io import StringIO
from webtest import TestApp
from webtest.compat import to_bytes
from webtest.lint import check_headers
from webtest.lint import check_content_type
from webtest.lint import check_environ
from webtest.lint import IteratorWrapper
from webtest.lint import WriteWrapper
from webtest.lint import ErrorWrapper
from webtest.lint import InputWrapper
from webtest.lint import to_string
from webtest.lint import middleware
from webtest.lint import _assert_latin1_str
from six import BytesIO
def application(environ, start_response):
req = Request(environ)
resp = Response()
env_input = environ['wsgi.input']
len_body = len(req.body)
env_input.input.seek(0)
if req.path_info == '/read':
resp.body = env_input.read(len_body)
elif req.path_info == '/read_line':
resp.body = env_input.readline(len_body)
elif req.path_info == '/read_lines':
resp.body = b'-'.join(env_input.readlines(len_body))
elif req.path_info == '/close':
resp.body = env_input.close()
return resp(environ, start_response)
class TestLatin1Assertion(unittest.TestCase):
def test_valid_type(self):
value = "useful-inførmation-5"
assert value == _assert_latin1_str(value, "fail")
def test_invalid_type(self):
value = b"useful-information-5"
self.assertRaises(AssertionError, _assert_latin1_str, value, "fail")
class TestToString(unittest.TestCase):
def test_to_string(self):
self.assertEqual(to_string('foo'), 'foo')
self.assertEqual(to_string(b'foo'), 'foo')
class TestMiddleware(unittest.TestCase):
@unittest.skipIf(sys.flags.optimize > 0, "skip assert tests if optimize is enabled")
def test_lint_too_few_args(self):
linter = middleware(application)
with self.assertRaisesRegexp(AssertionError, "Two arguments required"):
linter()
with self.assertRaisesRegexp(AssertionError, "Two arguments required"):
linter({})
@unittest.skipIf(sys.flags.optimize > 0, "skip assert tests if optimize is enabled")
def test_lint_no_keyword_args(self):
linter = middleware(application)
with self.assertRaisesRegexp(AssertionError, "No keyword arguments "
"allowed"):
linter({}, 'foo', baz='baz')
# TODO: test start_response_wrapper
@mock.patch.multiple('webtest.lint',
check_environ=lambda x: True, # don't block too early
InputWrapper=lambda x: True)
def test_lint_iterator_returned(self):
linter = middleware(lambda x, y: None) # None is not an iterator
msg = "The application must return an iterator, if only an empty list"
with self.assertRaisesRegexp(AssertionError, msg):
linter({'wsgi.input': 'foo', 'wsgi.errors': 'foo'}, 'foo')
class TestInputWrapper(unittest.TestCase):
def test_read(self):
app = TestApp(application)
resp = app.post('/read', 'hello')
self.assertEqual(resp.body, b'hello')
def test_readline(self):
app = TestApp(application)
resp = app.post('/read_line', 'hello\n')
self.assertEqual(resp.body, b'hello\n')
def test_readlines(self):
app = TestApp(application)
resp = app.post('/read_lines', 'hello\nt\n')
self.assertEqual(resp.body, b'hello\n-t\n')
def test_close(self):
input_wrapper = InputWrapper(None)
self.assertRaises(AssertionError, input_wrapper.close)
def test_iter(self):
data = to_bytes("A line\nAnother line\nA final line\n")
input_wrapper = InputWrapper(BytesIO(data))
self.assertEquals(to_bytes("").join(input_wrapper), data, '')
def test_seek(self):
data = to_bytes("A line\nAnother line\nA final line\n")
input_wrapper = InputWrapper(BytesIO(data))
input_wrapper.seek(0)
self.assertEquals(to_bytes("").join(input_wrapper), data, '')
class TestMiddleware2(unittest.TestCase):
def test_exc_info(self):
def application_exc_info(environ, start_response):
body = to_bytes('body stuff')
headers = [
('Content-Type', 'text/plain; charset=utf-8'),
('Content-Length', str(len(body)))]
# PEP 3333 requires native strings:
headers = [(str(k), str(v)) for k, v in headers]
start_response(to_bytes('200 OK'), headers, ('stuff',))
return [body]
app = TestApp(application_exc_info)
app.get('/')
# don't know what to assert here... a bit cheating, just covers code
class TestCheckContentType(unittest.TestCase):
def test_no_content(self):
status = "204 No Content"
headers = [
('Content-Type', 'text/plain; charset=utf-8'),
('Content-Length', '4')
]
self.assertRaises(AssertionError, check_content_type, status, headers)
def test_no_content_type(self):
status = "200 OK"
headers = [
('Content-Length', '4')
]
self.assertRaises(AssertionError, check_content_type, status, headers)
class TestCheckHeaders(unittest.TestCase):
def test_header_bytes_name(self):
headers = [(b'X-Price', '100')]
self.assertRaises(AssertionError, check_headers, headers)
def test_header_bytes_value(self):
headers = [('X-Price', b'100')]
self.assertRaises(AssertionError, check_headers, headers)
def test_header_non_latin1_value(self):
headers = [('X-Price', '100€')]
self.assertRaises(AssertionError, check_headers, headers)
def test_header_non_latin1_name(self):
headers = [('X-€', 'foo')]
self.assertRaises(AssertionError, check_headers, headers)
class TestCheckEnviron(unittest.TestCase):
def test_no_query_string(self):
environ = {
'REQUEST_METHOD': 'GET',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '80',
'wsgi.version': (1, 0, 1),
'wsgi.input': StringIO('test'),
'wsgi.errors': StringIO(),
'wsgi.multithread': None,
'wsgi.multiprocess': None,
'wsgi.run_once': None,
'wsgi.url_scheme': 'http',
'PATH_INFO': '/',
}
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
check_environ(environ)
self.assertEqual(len(w), 1, "We should have only one warning")
self.assertTrue(
"QUERY_STRING" in str(w[-1].message),
"The warning message should say something about QUERY_STRING")
def test_no_valid_request(self):
environ = {
'REQUEST_METHOD': 'PROPFIND',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '80',
'wsgi.version': (1, 0, 1),
'wsgi.input': StringIO('test'),
'wsgi.errors': StringIO(),
'wsgi.multithread': None,
'wsgi.multiprocess': None,
'wsgi.run_once': None,
'wsgi.url_scheme': 'http',
'PATH_INFO': '/',
'QUERY_STRING': '',
}
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
check_environ(environ)
self.assertEqual(len(w), 1, "We should have only one warning")
self.assertTrue(
"REQUEST_METHOD" in str(w[-1].message),
"The warning message should say something "
"about REQUEST_METHOD")
def test_handles_native_strings_in_variables(self):
path = '/umläut'
environ = {
'REQUEST_METHOD': 'GET',
'SERVER_NAME': 'localhost',
'SERVER_PORT': '80',
'wsgi.version': (1, 0, 1),
'wsgi.input': StringIO('test'),
'wsgi.errors': StringIO(),
'wsgi.multithread': None,
'wsgi.multiprocess': None,
'wsgi.run_once': None,
'wsgi.url_scheme': 'http',
'PATH_INFO': path,
'QUERY_STRING': '',
}
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
check_environ(environ)
self.assertEqual(0, len(w), "We should have no warning")
class TestIteratorWrapper(unittest.TestCase):
def test_close(self):
class MockIterator:
def __init__(self):
self.closed = False
def __iter__(self):
return self
def __next__(self):
return None
next = __next__
def close(self):
self.closed = True
mock = MockIterator()
wrapper = IteratorWrapper(mock, None)
wrapper.close()
self.assertTrue(mock.closed, "Original iterator has not been closed")
class TestWriteWrapper(unittest.TestCase):
@unittest.skipIf(sys.flags.optimize > 0, "skip assert tests if optimize is enabled")
def test_wrong_type(self):
write_wrapper = WriteWrapper(None)
self.assertRaises(AssertionError, write_wrapper, 'not a binary')
def test_normal(self):
class MockWriter:
def __init__(self):
self.written = []
def __call__(self, s):
self.written.append(s)
data = to_bytes('foo')
mock = MockWriter()
write_wrapper = WriteWrapper(mock)
write_wrapper(data)
self.assertEqual(
mock.written, [data],
"WriterWrapper should call original writer when data is binary "
"type")
class TestErrorWrapper(unittest.TestCase):
def test_dont_close(self):
error_wrapper = ErrorWrapper(None)
self.assertRaises(AssertionError, error_wrapper.close)
class FakeError:
def __init__(self):
self.written = []
self.flushed = False
def write(self, s):
self.written.append(s)
def writelines(self, lines):
for line in lines:
self.write(line)
def flush(self):
self.flushed = True
def test_writelines(self):
fake_error = self.FakeError()
error_wrapper = ErrorWrapper(fake_error)
data = [to_bytes('a line'), to_bytes('another line')]
error_wrapper.writelines(data)
self.assertEqual(fake_error.written, data,
"ErrorWrapper should call original writer")
def test_flush(self):
fake_error = self.FakeError()
error_wrapper = ErrorWrapper(fake_error)
error_wrapper.flush()
self.assertTrue(
fake_error.flushed,
"ErrorWrapper should have called original wsgi_errors's flush")