blob: ed4e9ceedbd9b28b925ce5da7ce2fba0779af603 [file] [log] [blame]
"""Internal utilities for determining cache expiration and other cache actions.
.. automodsumm:: requests_cache.cache_control
:classes-only:
:nosignatures:
.. automodsumm:: requests_cache.cache_control
:functions-only:
:nosignatures:
"""
from __future__ import annotations
from datetime import datetime
from logging import getLogger
from typing import Dict, MutableMapping, Optional, Tuple, Union
from attr import define, field
from requests import PreparedRequest, Response
from requests.models import CaseInsensitiveDict
from ._utils import coalesce, try_int
from .expiration import (
DO_NOT_CACHE,
NEVER_EXPIRE,
ExpirationTime,
get_expiration_datetime,
get_url_expiration,
)
from .models import CachedResponse
from .settings import CacheSettings, RequestSettings
__all__ = ['CacheActions']
# Temporary header only used to support the 'refresh' option in CachedSession.request()
REFRESH_TEMP_HEADER = 'requests-cache-refresh'
CacheDirective = Union[None, int, bool]
logger = getLogger(__name__)
@define
class CacheActions:
"""Translates cache settings and headers into specific actions to take for a given cache item.
This class defines the caching policy, and resulting actions are handled in
:py:meth:`CachedSession.send`.
.. rubric:: Notes
* See :ref:`precedence` for behavior if multiple sources provide an expiration
* See :ref:`headers` for more details about header behavior
* The following arguments/properties are the outputs of this class:
Args:
cache_key: The cache key created based on the initial request
error_504: Indicates the request cannot be fulfilled based on cache settings
expire_after: User or header-provided expiration value
send_request: Send a new request
resend_request: Send a new request to refresh a stale cache item
skip_read: Skip reading from the cache
skip_write: Skip writing to the cache
"""
# Outputs
cache_key: str = field(default=None)
error_504: bool = field(default=False)
expire_after: ExpirationTime = field(default=None)
resend_request: bool = field(default=False)
send_request: bool = field(default=False)
skip_read: bool = field(default=False)
skip_write: bool = field(default=False)
# Inputs/internal attributes
_settings: RequestSettings = field(default=None, repr=False, init=False)
_validation_headers: Dict[str, str] = field(factory=dict, repr=False, init=False)
@classmethod
def from_request(
cls,
cache_key: str,
request: PreparedRequest,
settings: CacheSettings,
**kwargs,
):
"""Initialize from request info and cache settings"""
request.headers = request.headers or CaseInsensitiveDict()
directives = get_cache_directives(request.headers)
logger.debug(f'Cache directives from request headers: {directives}')
# Merge session settings and request settings
settings = RequestSettings(settings, skip_invalid=True, **kwargs)
settings.only_if_cached = settings.only_if_cached or 'only-if-cached' in directives
settings.refresh = settings.refresh or bool(request.headers.pop(REFRESH_TEMP_HEADER, False))
settings.revalidate = settings.revalidate or 'no-cache' in directives
# Check expiration values in order of precedence
expire_after = coalesce(
directives.get('max-age'),
settings.request_expire_after,
get_url_expiration(request.url, settings.urls_expire_after),
settings.expire_after,
)
# Check conditions for reading from the cache
skip_read = any(
[
settings.refresh,
settings.disabled,
'no-store' in directives,
try_int(expire_after) == DO_NOT_CACHE,
]
)
actions = cls(
cache_key=cache_key,
expire_after=expire_after,
skip_read=skip_read,
skip_write='no-store' in directives,
)
actions._settings = settings
actions._validation_headers = {}
return actions
@property
def expires(self) -> Optional[datetime]:
"""Convert the user/header-provided expiration value to a datetime"""
return get_expiration_datetime(self.expire_after)
def update_from_cached_response(self, cached_response: CachedResponse):
"""Check for relevant cache headers from a cached response, and set headers for a
conditional request, if possible.
Used after fetching a cached response, but before potentially sending a new request.
"""
# Determine if we need to send a new request or respond with an error
is_expired = getattr(cached_response, 'is_expired', False)
invalid_response = cached_response is None or is_expired
if invalid_response and self._settings.only_if_cached and not self._settings.stale_if_error:
self.error_504 = True
elif cached_response is None:
self.send_request = True
elif is_expired and not (self._settings.only_if_cached and self._settings.stale_if_error):
self.resend_request = True
if cached_response is not None:
self._update_validation_headers(cached_response)
def _update_validation_headers(self, response: CachedResponse):
# Revalidation may be triggered by either stale response or request/cached response headers
directives = get_cache_directives(response.headers)
revalidate = _has_validator(response.headers) and any(
[
response.is_expired,
self._settings.revalidate,
'no-cache' in directives,
'must-revalidate' in directives and directives.get('max-age') == 0,
]
)
# Add the appropriate validation headers, if needed
if revalidate:
self.send_request = True
if response.headers.get('ETag'):
self._validation_headers['If-None-Match'] = response.headers['ETag']
if response.headers.get('Last-Modified'):
self._validation_headers['If-Modified-Since'] = response.headers['Last-Modified']
def update_from_response(self, response: Response):
"""Update expiration + actions based on headers and other details from a new response.
Used after receiving a new response, but before saving it to the cache.
"""
if self._settings.cache_control:
self._update_from_response_headers(response)
# If "expired" but there's a validator, save it to the cache and revalidate on use
expire_immediately = try_int(self.expire_after) == DO_NOT_CACHE
has_validator = _has_validator(response.headers)
self.skip_write = self.skip_write or (expire_immediately and not has_validator)
# Apply filter callback, if any
callback = self._settings.filter_fn
filtered_out = callback is not None and not callback(response)
# Apply and log remaining checks needed to determine if the response should be cached
cache_criteria = {
'disabled cache': self._settings.disabled,
'disabled method': str(response.request.method) not in self._settings.allowable_methods,
'disabled status': response.status_code not in self._settings.allowable_codes,
'disabled by filter': filtered_out,
'disabled by headers or expiration params': self.skip_write,
}
logger.debug(f'Pre-cache checks for response from {response.url}: {cache_criteria}')
self.skip_write = any(cache_criteria.values())
def _update_from_response_headers(self, response: Response):
"""Check response headers for expiration and other cache directives"""
directives = get_cache_directives(response.headers)
logger.debug(f'Cache directives from response headers: {directives}')
if directives.get('immutable'):
self.expire_after = NEVER_EXPIRE
else:
self.expire_after = coalesce(
directives.get('max-age'),
directives.get('expires'),
self.expire_after,
)
self.skip_write = self.skip_write or 'no-store' in directives
def update_request(self, request: PreparedRequest) -> PreparedRequest:
"""Apply validation headers (if any) before sending a request"""
request.headers.update(self._validation_headers)
return request
def update_revalidated_response(
self, response: Response, cached_response: CachedResponse
) -> CachedResponse:
"""After revalidation, update the cached response's headers and reset its expiration"""
logger.debug(
f'Response for URL {response.request.url} has not been modified; updating and using cached response'
)
cached_response.expires = self.expires
cached_response.headers.update(response.headers)
self.update_from_response(cached_response)
return cached_response
def append_directive(
headers: Optional[MutableMapping[str, str]], directive: str
) -> MutableMapping[str, str]:
"""Append a Cache-Control directive to existing headers (if any)"""
headers = CaseInsensitiveDict(headers)
directives = headers['Cache-Control'].split(',') if headers.get('Cache-Control') else []
directives.append(directive)
headers['Cache-Control'] = ','.join(directives)
return headers
def get_cache_directives(headers: MutableMapping) -> Dict[str, CacheDirective]:
"""Get all Cache-Control directives as a dict. Handle duplicate headers and comma-separated
lists. Key-only directives are returned as ``{key: True}``.
"""
if not headers:
return {}
kv_directives = {}
if headers.get('Cache-Control'):
cache_directives = headers['Cache-Control'].split(',')
kv_directives = dict([_split_kv_directive(value) for value in cache_directives])
if 'Expires' in headers:
kv_directives['expires'] = headers['Expires']
return kv_directives
def _split_kv_directive(header_value: str) -> Tuple[str, CacheDirective]:
"""Split a cache directive into a ``(key, int)`` pair, if possible; otherwise just
``(key, True)``.
"""
header_value = header_value.strip()
if '=' in header_value:
k, v = header_value.split('=', 1)
return k, try_int(v)
else:
return header_value, True
def _has_validator(headers: MutableMapping) -> bool:
return bool(headers.get('ETag') or headers.get('Last-Modified'))