blob: fa961531347d1b576fab8329ca8e651dbaef271e [file] [log] [blame]
import mimetypes
import os
from webob import exc
from webob.dec import wsgify
from webob.response import Response
__all__ = [
'FileApp', 'DirectoryApp',
]
mimetypes._winreg = None # do not load mimetypes from windows registry
mimetypes.add_type('text/javascript', '.js') # stdlib default is application/x-javascript
mimetypes.add_type('image/x-icon', '.ico') # not among defaults
BLOCK_SIZE = 1<<16
class FileApp(object):
"""An application that will send the file at the given filename.
Adds a mime type based on `mimetypes.guess_type()`.
"""
def __init__(self, filename, **kw):
self.filename = filename
content_type, content_encoding = mimetypes.guess_type(filename)
kw.setdefault('content_type', content_type)
kw.setdefault('content_encoding', content_encoding)
kw.setdefault('accept_ranges', 'bytes')
self.kw = kw
# Used for testing purpose
self._open = open
@wsgify
def __call__(self, req):
if req.method not in ('GET', 'HEAD'):
return exc.HTTPMethodNotAllowed("You cannot %s a file" %
req.method)
try:
stat = os.stat(self.filename)
except (IOError, OSError) as e:
msg = "Can't open %r: %s" % (self.filename, e)
return exc.HTTPNotFound(comment=msg)
try:
file = self._open(self.filename, 'rb')
except (IOError, OSError) as e:
msg = "You are not permitted to view this file (%s)" % e
return exc.HTTPForbidden(msg)
if 'wsgi.file_wrapper' in req.environ:
app_iter = req.environ['wsgi.file_wrapper'](file, BLOCK_SIZE)
else:
app_iter = FileIter(file)
return Response(
app_iter = app_iter,
content_length = stat.st_size,
last_modified = stat.st_mtime,
#@@ etag
**self.kw
).conditional_response_app
class FileIter(object):
def __init__(self, file):
self.file = file
def app_iter_range(self, seek=None, limit=None, block_size=None):
"""Iter over the content of the file.
You can set the `seek` parameter to read the file starting from a
specific position.
You can set the `limit` parameter to read the file up to specific
position.
Finally, you can change the number of bytes read at once by setting the
`block_size` parameter.
"""
if block_size is None:
block_size = BLOCK_SIZE
if seek:
self.file.seek(seek)
if limit is not None:
limit -= seek
try:
while True:
data = self.file.read(min(block_size, limit)
if limit is not None
else block_size)
if not data:
return
yield data
if limit is not None:
limit -= len(data)
if limit <= 0:
return
finally:
self.file.close()
__iter__ = app_iter_range
class DirectoryApp(object):
"""An application that serves up the files in a given directory.
This will serve index files (by default ``index.html``), or set
``index_page=None`` to disable this. If you set
``hide_index_with_redirect=True`` (it defaults to False) then
requests to, e.g., ``/index.html`` will be redirected to ``/``.
To customize `FileApp` instances creation (which is what actually
serves the responses), override the `make_fileapp` method.
"""
def __init__(self, path, index_page='index.html', hide_index_with_redirect=False,
**kw):
self.path = os.path.abspath(path)
if not self.path.endswith(os.path.sep):
self.path += os.path.sep
if not os.path.isdir(self.path):
raise IOError(
"Path does not exist or is not directory: %r" % self.path)
self.index_page = index_page
self.hide_index_with_redirect = hide_index_with_redirect
self.fileapp_kw = kw
def make_fileapp(self, path):
return FileApp(path, **self.fileapp_kw)
@wsgify
def __call__(self, req):
path = os.path.abspath(os.path.join(self.path,
req.path_info.lstrip('/')))
if os.path.isdir(path) and self.index_page:
return self.index(req, path)
if (self.index_page and self.hide_index_with_redirect
and path.endswith(os.path.sep + self.index_page)):
new_url = req.path_url.rsplit('/', 1)[0]
new_url += '/'
if req.query_string:
new_url += '?' + req.query_string
return Response(
status=301,
location=new_url)
if not os.path.isfile(path):
return exc.HTTPNotFound(comment=path)
elif not path.startswith(self.path):
return exc.HTTPForbidden()
else:
return self.make_fileapp(path)
def index(self, req, path):
index_path = os.path.join(path, self.index_page)
if not os.path.isfile(index_path):
return exc.HTTPNotFound(comment=index_path)
if not req.path_info.endswith('/'):
url = req.path_url + '/'
if req.query_string:
url += '?' + req.query_string
return Response(
status=301,
location=url)
return self.make_fileapp(index_path)