| from datetime import ( |
| date, |
| datetime, |
| ) |
| |
| import re |
| |
| from webob.byterange import ( |
| ContentRange, |
| Range, |
| ) |
| |
| from webob.compat import ( |
| PY3, |
| text_type, |
| ) |
| |
| from webob.datetime_utils import ( |
| parse_date, |
| serialize_date, |
| ) |
| |
| from webob.util import ( |
| header_docstring, |
| warn_deprecation, |
| ) |
| |
| |
| CHARSET_RE = re.compile(r';\s*charset=([^;]*)', re.I) |
| SCHEME_RE = re.compile(r'^[a-z]+:', re.I) |
| |
| |
| _not_given = object() |
| |
| def environ_getter(key, default=_not_given, rfc_section=None): |
| if rfc_section: |
| doc = header_docstring(key, rfc_section) |
| else: |
| doc = "Gets and sets the ``%s`` key in the environment." % key |
| if default is _not_given: |
| def fget(req): |
| return req.environ[key] |
| def fset(req, val): |
| req.environ[key] = val |
| fdel = None |
| else: |
| def fget(req): |
| return req.environ.get(key, default) |
| def fset(req, val): |
| if val is None: |
| if key in req.environ: |
| del req.environ[key] |
| else: |
| req.environ[key] = val |
| def fdel(req): |
| del req.environ[key] |
| return property(fget, fset, fdel, doc=doc) |
| |
| |
| def environ_decoder(key, default=_not_given, rfc_section=None, |
| encattr=None): |
| if rfc_section: |
| doc = header_docstring(key, rfc_section) |
| else: |
| doc = "Gets and sets the ``%s`` key in the environment." % key |
| if default is _not_given: |
| def fget(req): |
| return req.encget(key, encattr=encattr) |
| def fset(req, val): |
| return req.encset(key, val, encattr=encattr) |
| fdel = None |
| else: |
| def fget(req): |
| return req.encget(key, default, encattr=encattr) |
| def fset(req, val): |
| if val is None: |
| if key in req.environ: |
| del req.environ[key] |
| else: |
| return req.encset(key, val, encattr=encattr) |
| def fdel(req): |
| del req.environ[key] |
| return property(fget, fset, fdel, doc=doc) |
| |
| def upath_property(key): |
| if PY3: # pragma: no cover |
| def fget(req): |
| encoding = req.url_encoding |
| return req.environ.get(key, '').encode('latin-1').decode(encoding) |
| def fset(req, val): |
| encoding = req.url_encoding |
| req.environ[key] = val.encode(encoding).decode('latin-1') |
| else: |
| def fget(req): |
| encoding = req.url_encoding |
| return req.environ.get(key, '').decode(encoding) |
| def fset(req, val): |
| encoding = req.url_encoding |
| if isinstance(val, text_type): |
| val = val.encode(encoding) |
| req.environ[key] = val |
| return property(fget, fset, doc='upath_property(%r)' % key) |
| |
| |
| def deprecated_property(attr, name, text, version): # pragma: no cover |
| """ |
| Wraps a descriptor, with a deprecation warning or error |
| """ |
| def warn(): |
| warn_deprecation('The attribute %s is deprecated: %s' |
| % (name, text), |
| version, |
| 3 |
| ) |
| def fget(self): |
| warn() |
| return attr.__get__(self, type(self)) |
| def fset(self, val): |
| warn() |
| attr.__set__(self, val) |
| def fdel(self): |
| warn() |
| attr.__delete__(self) |
| return property(fget, fset, fdel, |
| '<Deprecated attribute %s>' % name |
| ) |
| |
| |
| def header_getter(header, rfc_section): |
| doc = header_docstring(header, rfc_section) |
| key = header.lower() |
| |
| def fget(r): |
| for k, v in r._headerlist: |
| if k.lower() == key: |
| return v |
| |
| def fset(r, value): |
| fdel(r) |
| if value is not None: |
| if isinstance(value, text_type) and not PY3: |
| value = value.encode('latin-1') |
| r._headerlist.append((header, value)) |
| |
| def fdel(r): |
| items = r._headerlist |
| for i in range(len(items)-1, -1, -1): |
| if items[i][0].lower() == key: |
| del items[i] |
| |
| return property(fget, fset, fdel, doc) |
| |
| |
| |
| |
| def converter(prop, parse, serialize, convert_name=None): |
| assert isinstance(prop, property) |
| convert_name = convert_name or "``%s`` and ``%s``" % (parse.__name__, |
| serialize.__name__) |
| doc = prop.__doc__ or '' |
| doc += " Converts it using %s." % convert_name |
| hget, hset = prop.fget, prop.fset |
| def fget(r): |
| return parse(hget(r)) |
| def fset(r, val): |
| if val is not None: |
| val = serialize(val) |
| hset(r, val) |
| return property(fget, fset, prop.fdel, doc) |
| |
| |
| |
| def list_header(header, rfc_section): |
| prop = header_getter(header, rfc_section) |
| return converter(prop, parse_list, serialize_list, 'list') |
| |
| def parse_list(value): |
| if not value: |
| return None |
| return tuple(filter(None, [v.strip() for v in value.split(',')])) |
| |
| def serialize_list(value): |
| if isinstance(value, (text_type, bytes)): |
| return str(value) |
| else: |
| return ', '.join(map(str, value)) |
| |
| |
| |
| |
| def converter_date(prop): |
| return converter(prop, parse_date, serialize_date, 'HTTP date') |
| |
| def date_header(header, rfc_section): |
| return converter_date(header_getter(header, rfc_section)) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| ######################## |
| ## Converter functions |
| ######################## |
| |
| |
| _rx_etag = re.compile(r'(?:^|\s)(W/)?"((?:\\"|.)*?)"') |
| |
| def parse_etag_response(value, strong=False): |
| """ |
| Parse a response ETag. |
| See: |
| * http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.19 |
| * http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.11 |
| """ |
| if not value: |
| return None |
| m = _rx_etag.match(value) |
| if not m: |
| # this etag is invalid, but we'll just return it anyway |
| return value |
| elif strong and m.group(1): |
| # this is a weak etag and we want only strong ones |
| return None |
| else: |
| return m.group(2).replace('\\"', '"') |
| |
| def serialize_etag_response(value): #return '"%s"' % value.replace('"', '\\"') |
| strong = True |
| if isinstance(value, tuple): |
| value, strong = value |
| elif _rx_etag.match(value): |
| # this is a valid etag already |
| return value |
| # let's quote the value |
| r = '"%s"' % value.replace('"', '\\"') |
| if not strong: |
| r = 'W/' + r |
| return r |
| |
| def serialize_if_range(value): |
| if isinstance(value, (datetime, date)): |
| return serialize_date(value) |
| value = str(value) |
| return value or None |
| |
| def parse_range(value): |
| if not value: |
| return None |
| # Might return None too: |
| return Range.parse(value) |
| |
| def serialize_range(value): |
| if not value: |
| return None |
| elif isinstance(value, (list, tuple)): |
| return str(Range(*value)) |
| else: |
| assert isinstance(value, str) |
| return value |
| |
| def parse_int(value): |
| if value is None or value == '': |
| return None |
| return int(value) |
| |
| def parse_int_safe(value): |
| if value is None or value == '': |
| return None |
| try: |
| return int(value) |
| except ValueError: |
| return None |
| |
| serialize_int = str |
| |
| def parse_content_range(value): |
| if not value or not value.strip(): |
| return None |
| # May still return None |
| return ContentRange.parse(value) |
| |
| def serialize_content_range(value): |
| if isinstance(value, (tuple, list)): |
| if len(value) not in (2, 3): |
| raise ValueError( |
| "When setting content_range to a list/tuple, it must " |
| "be length 2 or 3 (not %r)" % value) |
| if len(value) == 2: |
| begin, end = value |
| length = None |
| else: |
| begin, end, length = value |
| value = ContentRange(begin, end, length) |
| value = str(value).strip() |
| if not value: |
| return None |
| return value |
| |
| |
| |
| |
| _rx_auth_param = re.compile(r'([a-z]+)[ \t]*=[ \t]*(".*?"|[^,]*?)[ \t]*(?:\Z|, *)') |
| |
| def parse_auth_params(params): |
| r = {} |
| for k, v in _rx_auth_param.findall(params): |
| r[k] = v.strip('"') |
| return r |
| |
| # see http://lists.w3.org/Archives/Public/ietf-http-wg/2009OctDec/0297.html |
| known_auth_schemes = ['Basic', 'Digest', 'WSSE', 'HMACDigest', 'GoogleLogin', |
| 'Cookie', 'OpenID'] |
| known_auth_schemes = dict.fromkeys(known_auth_schemes, None) |
| |
| def parse_auth(val): |
| if val is not None: |
| authtype, params = val.split(' ', 1) |
| if authtype in known_auth_schemes: |
| if authtype == 'Basic' and '"' not in params: |
| # this is the "Authentication: Basic XXXXX==" case |
| pass |
| else: |
| params = parse_auth_params(params) |
| return authtype, params |
| return val |
| |
| def serialize_auth(val): |
| if isinstance(val, (tuple, list)): |
| authtype, params = val |
| if isinstance(params, dict): |
| params = ', '.join(map('%s="%s"'.__mod__, params.items())) |
| assert isinstance(params, str) |
| return '%s %s' % (authtype, params) |
| return val |