| import sys |
| from tests.compat import unittest |
| from webob import Request, Response |
| |
| import warnings |
| from unittest import mock |
| from io import StringIO |
| |
| from webtest import TestApp |
| from webtest.compat import to_bytes |
| from webtest.lint import check_headers |
| from webtest.lint import check_content_type |
| from webtest.lint import check_environ |
| from webtest.lint import IteratorWrapper |
| from webtest.lint import WriteWrapper |
| from webtest.lint import ErrorWrapper |
| from webtest.lint import InputWrapper |
| from webtest.lint import to_string |
| from webtest.lint import middleware |
| from webtest.lint import _assert_latin1_str |
| |
| from six import BytesIO |
| |
| |
| def application(environ, start_response): |
| req = Request(environ) |
| resp = Response() |
| env_input = environ['wsgi.input'] |
| len_body = len(req.body) |
| env_input.input.seek(0) |
| if req.path_info == '/read': |
| resp.body = env_input.read(len_body) |
| elif req.path_info == '/read_line': |
| resp.body = env_input.readline(len_body) |
| elif req.path_info == '/read_lines': |
| resp.body = b'-'.join(env_input.readlines(len_body)) |
| elif req.path_info == '/close': |
| resp.body = env_input.close() |
| return resp(environ, start_response) |
| |
| |
| class TestLatin1Assertion(unittest.TestCase): |
| |
| def test_valid_type(self): |
| value = "useful-inførmation-5" |
| assert value == _assert_latin1_str(value, "fail") |
| |
| def test_invalid_type(self): |
| value = b"useful-information-5" |
| self.assertRaises(AssertionError, _assert_latin1_str, value, "fail") |
| |
| |
| class TestToString(unittest.TestCase): |
| |
| def test_to_string(self): |
| self.assertEqual(to_string('foo'), 'foo') |
| self.assertEqual(to_string(b'foo'), 'foo') |
| |
| |
| class TestMiddleware(unittest.TestCase): |
| |
| @unittest.skipIf(sys.flags.optimize > 0, "skip assert tests if optimize is enabled") |
| def test_lint_too_few_args(self): |
| linter = middleware(application) |
| with self.assertRaisesRegexp(AssertionError, "Two arguments required"): |
| linter() |
| with self.assertRaisesRegexp(AssertionError, "Two arguments required"): |
| linter({}) |
| |
| @unittest.skipIf(sys.flags.optimize > 0, "skip assert tests if optimize is enabled") |
| def test_lint_no_keyword_args(self): |
| linter = middleware(application) |
| with self.assertRaisesRegexp(AssertionError, "No keyword arguments " |
| "allowed"): |
| linter({}, 'foo', baz='baz') |
| |
| # TODO: test start_response_wrapper |
| |
| @mock.patch.multiple('webtest.lint', |
| check_environ=lambda x: True, # don't block too early |
| InputWrapper=lambda x: True) |
| def test_lint_iterator_returned(self): |
| linter = middleware(lambda x, y: None) # None is not an iterator |
| msg = "The application must return an iterator, if only an empty list" |
| with self.assertRaisesRegexp(AssertionError, msg): |
| linter({'wsgi.input': 'foo', 'wsgi.errors': 'foo'}, 'foo') |
| |
| |
| class TestInputWrapper(unittest.TestCase): |
| def test_read(self): |
| app = TestApp(application) |
| resp = app.post('/read', 'hello') |
| self.assertEqual(resp.body, b'hello') |
| |
| def test_readline(self): |
| app = TestApp(application) |
| resp = app.post('/read_line', 'hello\n') |
| self.assertEqual(resp.body, b'hello\n') |
| |
| def test_readlines(self): |
| app = TestApp(application) |
| resp = app.post('/read_lines', 'hello\nt\n') |
| self.assertEqual(resp.body, b'hello\n-t\n') |
| |
| def test_close(self): |
| input_wrapper = InputWrapper(None) |
| self.assertRaises(AssertionError, input_wrapper.close) |
| |
| def test_iter(self): |
| data = to_bytes("A line\nAnother line\nA final line\n") |
| input_wrapper = InputWrapper(BytesIO(data)) |
| self.assertEquals(to_bytes("").join(input_wrapper), data, '') |
| |
| def test_seek(self): |
| data = to_bytes("A line\nAnother line\nA final line\n") |
| input_wrapper = InputWrapper(BytesIO(data)) |
| input_wrapper.seek(0) |
| self.assertEquals(to_bytes("").join(input_wrapper), data, '') |
| |
| |
| class TestMiddleware2(unittest.TestCase): |
| def test_exc_info(self): |
| def application_exc_info(environ, start_response): |
| body = to_bytes('body stuff') |
| headers = [ |
| ('Content-Type', 'text/plain; charset=utf-8'), |
| ('Content-Length', str(len(body)))] |
| # PEP 3333 requires native strings: |
| headers = [(str(k), str(v)) for k, v in headers] |
| start_response(to_bytes('200 OK'), headers, ('stuff',)) |
| return [body] |
| |
| app = TestApp(application_exc_info) |
| app.get('/') |
| # don't know what to assert here... a bit cheating, just covers code |
| |
| |
| class TestCheckContentType(unittest.TestCase): |
| def test_no_content(self): |
| status = "204 No Content" |
| headers = [ |
| ('Content-Type', 'text/plain; charset=utf-8'), |
| ('Content-Length', '4') |
| ] |
| self.assertRaises(AssertionError, check_content_type, status, headers) |
| |
| def test_no_content_type(self): |
| status = "200 OK" |
| headers = [ |
| ('Content-Length', '4') |
| ] |
| self.assertRaises(AssertionError, check_content_type, status, headers) |
| |
| |
| class TestCheckHeaders(unittest.TestCase): |
| |
| def test_header_bytes_name(self): |
| headers = [(b'X-Price', '100')] |
| self.assertRaises(AssertionError, check_headers, headers) |
| |
| def test_header_bytes_value(self): |
| headers = [('X-Price', b'100')] |
| self.assertRaises(AssertionError, check_headers, headers) |
| |
| def test_header_non_latin1_value(self): |
| headers = [('X-Price', '100€')] |
| self.assertRaises(AssertionError, check_headers, headers) |
| |
| def test_header_non_latin1_name(self): |
| headers = [('X-€', 'foo')] |
| self.assertRaises(AssertionError, check_headers, headers) |
| |
| |
| class TestCheckEnviron(unittest.TestCase): |
| def test_no_query_string(self): |
| environ = { |
| 'REQUEST_METHOD': 'GET', |
| 'SERVER_NAME': 'localhost', |
| 'SERVER_PORT': '80', |
| 'wsgi.version': (1, 0, 1), |
| 'wsgi.input': StringIO('test'), |
| 'wsgi.errors': StringIO(), |
| 'wsgi.multithread': None, |
| 'wsgi.multiprocess': None, |
| 'wsgi.run_once': None, |
| 'wsgi.url_scheme': 'http', |
| 'PATH_INFO': '/', |
| } |
| with warnings.catch_warnings(record=True) as w: |
| warnings.simplefilter("always") |
| check_environ(environ) |
| self.assertEqual(len(w), 1, "We should have only one warning") |
| self.assertTrue( |
| "QUERY_STRING" in str(w[-1].message), |
| "The warning message should say something about QUERY_STRING") |
| |
| def test_no_valid_request(self): |
| environ = { |
| 'REQUEST_METHOD': 'PROPFIND', |
| 'SERVER_NAME': 'localhost', |
| 'SERVER_PORT': '80', |
| 'wsgi.version': (1, 0, 1), |
| 'wsgi.input': StringIO('test'), |
| 'wsgi.errors': StringIO(), |
| 'wsgi.multithread': None, |
| 'wsgi.multiprocess': None, |
| 'wsgi.run_once': None, |
| 'wsgi.url_scheme': 'http', |
| 'PATH_INFO': '/', |
| 'QUERY_STRING': '', |
| } |
| with warnings.catch_warnings(record=True) as w: |
| warnings.simplefilter("always") |
| check_environ(environ) |
| self.assertEqual(len(w), 1, "We should have only one warning") |
| self.assertTrue( |
| "REQUEST_METHOD" in str(w[-1].message), |
| "The warning message should say something " |
| "about REQUEST_METHOD") |
| |
| def test_handles_native_strings_in_variables(self): |
| path = '/umläut' |
| environ = { |
| 'REQUEST_METHOD': 'GET', |
| 'SERVER_NAME': 'localhost', |
| 'SERVER_PORT': '80', |
| 'wsgi.version': (1, 0, 1), |
| 'wsgi.input': StringIO('test'), |
| 'wsgi.errors': StringIO(), |
| 'wsgi.multithread': None, |
| 'wsgi.multiprocess': None, |
| 'wsgi.run_once': None, |
| 'wsgi.url_scheme': 'http', |
| 'PATH_INFO': path, |
| 'QUERY_STRING': '', |
| } |
| with warnings.catch_warnings(record=True) as w: |
| warnings.simplefilter("always") |
| check_environ(environ) |
| self.assertEqual(0, len(w), "We should have no warning") |
| |
| |
| class TestIteratorWrapper(unittest.TestCase): |
| def test_close(self): |
| class MockIterator: |
| |
| def __init__(self): |
| self.closed = False |
| |
| def __iter__(self): |
| return self |
| |
| def __next__(self): |
| return None |
| |
| next = __next__ |
| |
| def close(self): |
| self.closed = True |
| |
| mock = MockIterator() |
| wrapper = IteratorWrapper(mock, None) |
| wrapper.close() |
| |
| self.assertTrue(mock.closed, "Original iterator has not been closed") |
| |
| |
| class TestWriteWrapper(unittest.TestCase): |
| |
| @unittest.skipIf(sys.flags.optimize > 0, "skip assert tests if optimize is enabled") |
| def test_wrong_type(self): |
| write_wrapper = WriteWrapper(None) |
| self.assertRaises(AssertionError, write_wrapper, 'not a binary') |
| |
| def test_normal(self): |
| class MockWriter: |
| def __init__(self): |
| self.written = [] |
| |
| def __call__(self, s): |
| self.written.append(s) |
| |
| data = to_bytes('foo') |
| mock = MockWriter() |
| write_wrapper = WriteWrapper(mock) |
| write_wrapper(data) |
| self.assertEqual( |
| mock.written, [data], |
| "WriterWrapper should call original writer when data is binary " |
| "type") |
| |
| |
| class TestErrorWrapper(unittest.TestCase): |
| |
| def test_dont_close(self): |
| error_wrapper = ErrorWrapper(None) |
| self.assertRaises(AssertionError, error_wrapper.close) |
| |
| class FakeError: |
| def __init__(self): |
| self.written = [] |
| self.flushed = False |
| |
| def write(self, s): |
| self.written.append(s) |
| |
| def writelines(self, lines): |
| for line in lines: |
| self.write(line) |
| |
| def flush(self): |
| self.flushed = True |
| |
| def test_writelines(self): |
| fake_error = self.FakeError() |
| error_wrapper = ErrorWrapper(fake_error) |
| data = [to_bytes('a line'), to_bytes('another line')] |
| error_wrapper.writelines(data) |
| self.assertEqual(fake_error.written, data, |
| "ErrorWrapper should call original writer") |
| |
| def test_flush(self): |
| fake_error = self.FakeError() |
| error_wrapper = ErrorWrapper(fake_error) |
| error_wrapper.flush() |
| self.assertTrue( |
| fake_error.flushed, |
| "ErrorWrapper should have called original wsgi_errors's flush") |