blob: efe968552f8ab71c67960555e13a66d8280f7994 [file] [log] [blame]
class Range(object):
"""
Represents the Range header.
This only represents ``bytes`` ranges, which are the only kind
specified in HTTP. This can represent multiple sets of ranges,
but no place else is this multi-range facility supported.
"""
def __init__(self, ranges): # expect non-inclusive
for begin, end in ranges:
assert end is None or end >= 0, "Bad ranges: %r" % ranges
self.ranges = ranges
def satisfiable(self, length):
"""
Returns true if this range can be satisfied by the resource
with the given byte length.
"""
return self.range_for_length(length) is not None
def range_for_length(self, length):
"""
*If* there is only one range, and *if* it is satisfiable by
the given length, then return a (begin, end) non-inclusive range
of bytes to serve. Otherwise return None
"""
if length is None or len(self.ranges) != 1:
return None
start, end = self.ranges[0]
if end is None:
end = length
if start < 0:
start += length
if _is_content_range_valid(start, end, length):
stop = min(end, length)
return (start, stop)
else:
return None
def content_range(self, length):
"""
Works like range_for_length; returns None or a ContentRange object
You can use it like::
response.content_range = req.range.content_range(response.content_length)
Though it's still up to you to actually serve that content range!
"""
range = self.range_for_length(length)
if range is None:
return None
return ContentRange(range[0], range[1], length)
def __str__(self):
parts = []
for begin, end in self.ranges:
if end is None:
if begin >= 0:
parts.append('%s-' % begin)
else:
parts.append(str(begin))
else:
if begin < 0:
raise ValueError("(%r, %r) should have a non-negative first value"
% (begin, end))
if end <= 0:
raise ValueError("(%r, %r) should have a positive second value"
% (begin, end))
parts.append('%s-%s' % (begin, end-1))
return 'bytes=%s' % ','.join(parts)
def __repr__(self):
return '<%s ranges=%s>' % (
self.__class__.__name__,
', '.join(map(repr, self.ranges)))
@classmethod
def parse(cls, header):
"""
Parse the header; may return None if header is invalid
"""
bytes = cls.parse_bytes(header)
if bytes is None:
return None
units, ranges = bytes
if units != 'bytes' or ranges is None:
return None
return cls(ranges)
@staticmethod
def parse_bytes(header):
"""
Parse a Range header into (bytes, list_of_ranges).
ranges in list_of_ranges are non-inclusive (unlike the HTTP header).
Will return None if the header is invalid
"""
if not header:
raise TypeError("The header must not be empty")
ranges = []
last_end = 0
try:
(units, range) = header.split("=", 1)
units = units.strip().lower()
for item in range.split(","):
if '-' not in item:
raise ValueError()
if item.startswith('-'):
# This is a range asking for a trailing chunk.
if last_end < 0:
raise ValueError('too many end ranges')
begin = int(item)
end = None
last_end = -1
else:
(begin, end) = item.split("-", 1)
begin = int(begin)
if begin < last_end or last_end < 0:
raise ValueError('begin<last_end, or last_end<0')
if end.strip():
end = int(end) + 1 # return val is non-inclusive
if begin >= end:
raise ValueError('begin>end')
else:
end = None
last_end = end
ranges.append((begin, end))
except ValueError, e:
# In this case where the Range header is malformed,
# section 14.16 says to treat the request as if the
# Range header was not present. How do I log this?
return None
return (units, ranges)
class ContentRange(object):
"""
Represents the Content-Range header
This header is ``start-stop/length``, where start-stop and length
can be ``*`` (represented as None in the attributes).
"""
def __init__(self, start, stop, length):
if not _is_content_range_valid(start, stop, length):
raise ValueError("Bad start:stop/length: %r-%r/%r" % (start, stop, length))
self.start = start
self.stop = stop # this is python-style range end (non-inclusive)
self.length = length
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self)
def __str__(self):
if self.length is None:
length = '*'
else:
length = self.length
if self.start is None:
assert self.stop is None
return 'bytes */%s' % length
stop = self.stop - 1 # from non-inclusive to HTTP-style
return 'bytes %s-%s/%s' % (self.start, stop, length)
def __iter__(self):
"""
Mostly so you can unpack this, like:
start, stop, length = res.content_range
"""
return iter([self.start, self.stop, self.length])
@classmethod
def parse(cls, value):
"""
Parse the header. May return None if it cannot parse.
"""
if value is None:
return None
value = value.strip()
if not value.startswith('bytes '):
# Unparseable
return None
value = value[len('bytes '):].strip()
if '/' not in value:
# Invalid, no length given
return None
range, length = value.split('/', 1)
if length == '*':
length = None
elif length.isdigit():
length = int(length)
else:
return None # invalid length
if range == '*':
return cls(None, None, length)
elif '-' not in range:
# Invalid, no range
return None
else:
start, stop = range.split('-', 1)
try:
start = int(start)
stop = int(stop)
stop += 1 # convert to non-inclusive
except ValueError:
# Parse problem
return None
if _is_content_range_valid(start, stop, length, response=True):
return cls(start, stop, length)
return None
def _is_content_range_valid(start, stop, length, response=False):
if (start is None) != (stop is None):
return False
elif start is None:
return length is None or length >= 0
elif length is None:
return 0 <= start < stop
elif start >= stop:
return False
elif response and stop > length:
# "content-range: bytes 0-50/10" is invalid for a response
# "range: bytes 0-50" is valid for a request to a 10-bytes entity
return False
else:
return 0 <= start < length