| import time |
| from datetime import datetime |
| |
| from ._compat import text_type |
| from .encoding import base64_decode |
| from .encoding import base64_encode |
| from .encoding import bytes_to_int |
| from .encoding import int_to_bytes |
| from .encoding import want_bytes |
| from .exc import BadSignature |
| from .exc import BadTimeSignature |
| from .exc import SignatureExpired |
| from .serializer import Serializer |
| from .signer import Signer |
| |
| |
| class TimestampSigner(Signer): |
| """Works like the regular :class:`.Signer` but also records the time |
| of the signing and can be used to expire signatures. The |
| :meth:`unsign` method can raise :exc:`.SignatureExpired` if the |
| unsigning failed because the signature is expired. |
| """ |
| |
| def get_timestamp(self): |
| """Returns the current timestamp. The function must return an |
| integer. |
| """ |
| return int(time.time()) |
| |
| def timestamp_to_datetime(self, ts): |
| """Used to convert the timestamp from :meth:`get_timestamp` into |
| a datetime object. |
| """ |
| return datetime.utcfromtimestamp(ts) |
| |
| def sign(self, value): |
| """Signs the given string and also attaches time information.""" |
| value = want_bytes(value) |
| timestamp = base64_encode(int_to_bytes(self.get_timestamp())) |
| sep = want_bytes(self.sep) |
| value = value + sep + timestamp |
| return value + sep + self.get_signature(value) |
| |
| def unsign(self, value, max_age=None, return_timestamp=False): |
| """Works like the regular :meth:`.Signer.unsign` but can also |
| validate the time. See the base docstring of the class for |
| the general behavior. If ``return_timestamp`` is ``True`` the |
| timestamp of the signature will be returned as a naive |
| :class:`datetime.datetime` object in UTC. |
| """ |
| try: |
| result = Signer.unsign(self, value) |
| sig_error = None |
| except BadSignature as e: |
| sig_error = e |
| result = e.payload or b"" |
| sep = want_bytes(self.sep) |
| |
| # If there is no timestamp in the result there is something |
| # seriously wrong. In case there was a signature error, we raise |
| # that one directly, otherwise we have a weird situation in |
| # which we shouldn't have come except someone uses a time-based |
| # serializer on non-timestamp data, so catch that. |
| if sep not in result: |
| if sig_error: |
| raise sig_error |
| raise BadTimeSignature("timestamp missing", payload=result) |
| |
| value, timestamp = result.rsplit(sep, 1) |
| try: |
| timestamp = bytes_to_int(base64_decode(timestamp)) |
| except Exception: |
| timestamp = None |
| |
| # Signature is *not* okay. Raise a proper error now that we have |
| # split the value and the timestamp. |
| if sig_error is not None: |
| raise BadTimeSignature( |
| text_type(sig_error), payload=value, date_signed=timestamp |
| ) |
| |
| # Signature was okay but the timestamp is actually not there or |
| # malformed. Should not happen, but we handle it anyway. |
| if timestamp is None: |
| raise BadTimeSignature("Malformed timestamp", payload=value) |
| |
| # Check timestamp is not older than max_age |
| if max_age is not None: |
| age = self.get_timestamp() - timestamp |
| if age > max_age: |
| raise SignatureExpired( |
| "Signature age %s > %s seconds" % (age, max_age), |
| payload=value, |
| date_signed=self.timestamp_to_datetime(timestamp), |
| ) |
| |
| if return_timestamp: |
| return value, self.timestamp_to_datetime(timestamp) |
| return value |
| |
| def validate(self, signed_value, max_age=None): |
| """Only validates the given signed value. Returns ``True`` if |
| the signature exists and is valid.""" |
| try: |
| self.unsign(signed_value, max_age=max_age) |
| return True |
| except BadSignature: |
| return False |
| |
| |
| class TimedSerializer(Serializer): |
| """Uses :class:`TimestampSigner` instead of the default |
| :class:`.Signer`. |
| """ |
| |
| default_signer = TimestampSigner |
| |
| def loads(self, s, max_age=None, return_timestamp=False, salt=None): |
| """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the |
| signature validation fails. If a ``max_age`` is provided it will |
| ensure the signature is not older than that time in seconds. In |
| case the signature is outdated, :exc:`.SignatureExpired` is |
| raised. All arguments are forwarded to the signer's |
| :meth:`~TimestampSigner.unsign` method. |
| """ |
| s = want_bytes(s) |
| last_exception = None |
| for signer in self.iter_unsigners(salt): |
| try: |
| base64d, timestamp = signer.unsign(s, max_age, return_timestamp=True) |
| payload = self.load_payload(base64d) |
| if return_timestamp: |
| return payload, timestamp |
| return payload |
| # If we get a signature expired it means we could read the |
| # signature but it's invalid. In that case we do not want to |
| # try the next signer. |
| except SignatureExpired: |
| raise |
| except BadSignature as err: |
| last_exception = err |
| raise last_exception |
| |
| def loads_unsafe(self, s, max_age=None, salt=None): |
| load_kwargs = {"max_age": max_age} |
| load_payload_kwargs = {} |
| return self._loads_unsafe_impl(s, salt, load_kwargs, load_payload_kwargs) |