blob: 5b9eda1c2b012158625dae05c3cbab3630de4d58 [file] [log] [blame] [edit]
from io import BytesIO
from os.path import getmtime
import tempfile
from time import gmtime
import os
import shutil
import unittest
from webob import static
from webob.compat import bytes_
from webob.request import Request, environ_from_url
from webob.response import Response
def get_response(app, path='/', **req_kw):
"""Convenient function to query an application"""
req = Request(environ_from_url(path), **req_kw)
return req.get_response(app)
def create_file(content, *paths):
"""Convenient function to create a new file with some content"""
path = os.path.join(*paths)
with open(path, 'wb') as fp:
fp.write(bytes_(content))
return path
class TestFileApp(unittest.TestCase):
def setUp(self):
fp = tempfile.NamedTemporaryFile(suffix=".py", delete=False)
self.tempfile = fp.name
fp.write(b"import this\n")
fp.close()
def tearDown(self):
os.unlink(self.tempfile)
def test_fileapp(self):
app = static.FileApp(self.tempfile)
resp1 = get_response(app)
assert resp1.content_type in ('text/x-python', 'text/plain')
self.assertEqual(resp1.charset, 'UTF-8')
self.assertEqual(resp1.last_modified.timetuple(), gmtime(getmtime(self.tempfile)))
self.assertEqual(resp1.body, b"import this\n")
resp2 = get_response(app)
assert resp2.content_type in ('text/x-python', 'text/plain')
self.assertEqual(resp2.last_modified.timetuple(), gmtime(getmtime(self.tempfile)))
self.assertEqual(resp2.body, b"import this\n")
resp3 = get_response(app, range=(7, 11))
self.assertEqual(resp3.status_code, 206)
self.assertEqual(tuple(resp3.content_range)[:2], (7, 11))
self.assertEqual(resp3.last_modified.timetuple(), gmtime(getmtime(self.tempfile)))
self.assertEqual(resp3.body, bytes_('this'))
def test_unexisting_file(self):
app = static.FileApp('/tmp/this/doesnt/exist')
self.assertEqual(404, get_response(app).status_code)
def test_allowed_methods(self):
app = static.FileApp(self.tempfile)
# Alias
resp = lambda method: get_response(app, method=method)
self.assertEqual(200, resp(method='GET').status_code)
self.assertEqual(200, resp(method='HEAD').status_code)
self.assertEqual(405, resp(method='POST').status_code)
# Actually any other method is not allowed
self.assertEqual(405, resp(method='xxx').status_code)
def test_exception_while_opening_file(self):
# Mock the built-in ``open()`` function to allow finner control about
# what we are testing.
def open_ioerror(*args, **kwargs):
raise IOError()
def open_oserror(*args, **kwargs):
raise OSError()
app = static.FileApp(self.tempfile)
app._open = open_ioerror
self.assertEqual(403, get_response(app).status_code)
app._open = open_oserror
self.assertEqual(403, get_response(app).status_code)
def test_use_wsgi_filewrapper(self):
class TestWrapper(object):
def __init__(self, file, block_size):
self.file = file
self.block_size = block_size
environ = environ_from_url('/')
environ['wsgi.file_wrapper'] = TestWrapper
app = static.FileApp(self.tempfile)
app_iter = Request(environ).get_response(app).app_iter
self.assertTrue(isinstance(app_iter, TestWrapper))
self.assertEqual(bytes_('import this\n'), app_iter.file.read())
self.assertEqual(static.BLOCK_SIZE, app_iter.block_size)
class TestFileIter(unittest.TestCase):
def test_empty_file(self):
fp = BytesIO()
fi = static.FileIter(fp)
self.assertRaises(StopIteration, next, iter(fi))
def test_seek(self):
fp = BytesIO(bytes_("0123456789"))
i = static.FileIter(fp).app_iter_range(seek=4)
self.assertEqual(bytes_("456789"), next(i))
self.assertRaises(StopIteration, next, i)
def test_limit(self):
fp = BytesIO(bytes_("0123456789"))
i = static.FileIter(fp).app_iter_range(limit=4)
self.assertEqual(bytes_("0123"), next(i))
self.assertRaises(StopIteration, next, i)
def test_limit_and_seek(self):
fp = BytesIO(bytes_("0123456789"))
i = static.FileIter(fp).app_iter_range(limit=4, seek=1)
self.assertEqual(bytes_("123"), next(i))
self.assertRaises(StopIteration, next, i)
def test_multiple_reads(self):
fp = BytesIO(bytes_("012"))
i = static.FileIter(fp).app_iter_range(block_size=1)
self.assertEqual(bytes_("0"), next(i))
self.assertEqual(bytes_("1"), next(i))
self.assertEqual(bytes_("2"), next(i))
self.assertRaises(StopIteration, next, i)
def test_seek_bigger_than_limit(self):
fp = BytesIO(bytes_("0123456789"))
i = static.FileIter(fp).app_iter_range(limit=1, seek=2)
# XXX: this should not return anything actually, since we are starting
# to read after the place we wanted to stop.
self.assertEqual(bytes_("23456789"), next(i))
self.assertRaises(StopIteration, next, i)
def test_limit_is_zero(self):
fp = BytesIO(bytes_("0123456789"))
i = static.FileIter(fp).app_iter_range(limit=0)
self.assertRaises(StopIteration, next, i)
class TestDirectoryApp(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_empty_directory(self):
app = static.DirectoryApp(self.test_dir)
self.assertEqual(404, get_response(app).status_code)
self.assertEqual(404, get_response(app, '/foo').status_code)
def test_serve_file(self):
app = static.DirectoryApp(self.test_dir)
create_file('abcde', self.test_dir, 'bar')
self.assertEqual(404, get_response(app).status_code)
self.assertEqual(404, get_response(app, '/foo').status_code)
resp = get_response(app, '/bar')
self.assertEqual(200, resp.status_code)
self.assertEqual(bytes_('abcde'), resp.body)
def test_dont_serve_file_in_parent_directory(self):
# We'll have:
# /TEST_DIR/
# /TEST_DIR/bar
# /TEST_DIR/foo/ <- serve this directory
create_file('abcde', self.test_dir, 'bar')
serve_path = os.path.join(self.test_dir, 'foo')
os.mkdir(serve_path)
app = static.DirectoryApp(serve_path)
# The file exists, but is outside the served dir.
self.assertEqual(403, get_response(app, '/../bar').status_code)
def test_dont_leak_parent_directory_file_existance(self):
# We'll have:
# /TEST_DIR/
# /TEST_DIR/foo/ <- serve this directory
serve_path = os.path.join(self.test_dir, 'foo')
os.mkdir(serve_path)
app = static.DirectoryApp(serve_path)
# The file exists, but is outside the served dir.
self.assertEqual(403, get_response(app, '/../bar2').status_code)
def test_file_app_arguments(self):
app = static.DirectoryApp(self.test_dir, content_type='xxx/yyy')
create_file('abcde', self.test_dir, 'bar')
resp = get_response(app, '/bar')
self.assertEqual(200, resp.status_code)
self.assertEqual('xxx/yyy', resp.content_type)
def test_file_app_factory(self):
def make_fileapp(*args, **kwargs):
make_fileapp.called = True
return Response()
make_fileapp.called = False
app = static.DirectoryApp(self.test_dir)
app.make_fileapp = make_fileapp
create_file('abcde', self.test_dir, 'bar')
get_response(app, '/bar')
self.assertTrue(make_fileapp.called)
def test_must_serve_directory(self):
serve_path = create_file('abcde', self.test_dir, 'bar')
self.assertRaises(IOError, static.DirectoryApp, serve_path)
def test_index_page(self):
os.mkdir(os.path.join(self.test_dir, 'index-test'))
create_file(bytes_('index'), self.test_dir, 'index-test', 'index.html')
app = static.DirectoryApp(self.test_dir)
resp = get_response(app, '/index-test')
self.assertEqual(resp.status_code, 301)
self.assertTrue(resp.location.endswith('/index-test/'))
resp = get_response(app, '/index-test?test')
self.assertTrue(resp.location.endswith('/index-test/?test'))
resp = get_response(app, '/index-test/')
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.body, bytes_('index'))
self.assertEqual(resp.content_type, 'text/html')
resp = get_response(app, '/index-test/index.html')
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.body, bytes_('index'))
redir_app = static.DirectoryApp(self.test_dir, hide_index_with_redirect=True)
resp = get_response(redir_app, '/index-test/index.html')
self.assertEqual(resp.status_code, 301)
self.assertTrue(resp.location.endswith('/index-test/'))
resp = get_response(redir_app, '/index-test/index.html?test')
self.assertTrue(resp.location.endswith('/index-test/?test'))
page_app = static.DirectoryApp(self.test_dir, index_page='something-else.html')
self.assertEqual(get_response(page_app, '/index-test/').status_code, 404)