blob: d3a0ca1029f804553e5652ea73c7b36df485045a [file] [log] [blame]
# -*- coding: utf-8 -*-
"""
webapp2_extras.auth
===================
Utilities for authentication and authorization.
:copyright: 2011 by tipfy.org.
:license: Apache Sotware License, see LICENSE for details.
"""
import logging
import time
import webapp2
from webapp2_extras import security
from webapp2_extras import sessions
#: Default configuration values for this module. Keys are:
#:
#: user_model
#: User model which authenticates custom users and tokens.
#: Can also be a string in dotted notation to be lazily imported.
#: Default is :class:`webapp2_extras.appengine.auth.models.User`.
#:
#: session_backend
#: Name of the session backend to be used. Default is `securecookie`.
#:
#: cookie_name
#: Name of the cookie to save the auth session. Default is `auth`.
#:
#: token_max_age
#: Number of seconds of inactivity after which an auth token is
#: invalidated. The same value is used to set the ``max_age`` for
#: persistent auth sessions. Default is 86400 * 7 * 3 (3 weeks).
#:
#: token_new_age
#: Number of seconds after which a new token is created and written to
#: the database, and the old one is invalidated.
#: Use this to limit database writes; set to None to write on all requests.
#: Default is 86400 (1 day).
#:
#: token_cache_age
#: Number of seconds after which a token must be checked in the database.
#: Use this to limit database reads; set to None to read on all requests.
#: Default is 3600 (1 hour).
#:
#: user_attributes
#: A list of extra user attributes to be stored in the session.
# The user object must provide all of them as attributes.
#: Default is an empty list.
default_config = {
'user_model': 'webapp2_extras.appengine.auth.models.User',
'session_backend': 'securecookie',
'cookie_name': 'auth',
'token_max_age': 86400 * 7 * 3,
'token_new_age': 86400,
'token_cache_age': 3600,
'user_attributes': [],
}
#: Internal flag for anonymous users.
_anon = object()
class AuthError(Exception):
"""Base auth exception."""
class InvalidAuthIdError(AuthError):
"""Raised when a user can't be fetched given an auth_id."""
class InvalidPasswordError(AuthError):
"""Raised when a user password doesn't match."""
class AuthStore(object):
"""Provides common utilities and configuration for :class:`Auth`."""
#: Configuration key.
config_key = __name__
#: Required attributes stored in a session.
_session_attributes = ['user_id', 'remember',
'token', 'token_ts', 'cache_ts']
def __init__(self, app, config=None):
"""Initializes the session store.
:param app:
A :class:`webapp2.WSGIApplication` instance.
:param config:
A dictionary of configuration values to be overridden. See
the available keys in :data:`default_config`.
"""
self.app = app
# Base configuration.
self.config = app.config.load_config(self.config_key,
default_values=default_config, user_values=config)
# User data we're interested in -------------------------------------------
@webapp2.cached_property
def session_attributes(self):
"""The list of attributes stored in a session.
This must be an ordered list of unique elements.
"""
seen = set()
attrs = self._session_attributes + self.user_attributes
return [a for a in attrs if a not in seen and not seen.add(a)]
@webapp2.cached_property
def user_attributes(self):
"""The list of attributes retrieved from the user model.
This must be an ordered list of unique elements.
"""
seen = set()
attrs = self.config['user_attributes']
return [a for a in attrs if a not in seen and not seen.add(a)]
# User model related ------------------------------------------------------
@webapp2.cached_property
def user_model(self):
"""Configured user model."""
cls = self.config['user_model']
if isinstance(cls, basestring):
cls = self.config['user_model'] = webapp2.import_string(cls)
return cls
def get_user_by_auth_password(self, auth_id, password, silent=False):
"""Returns a user dict based on auth_id and password.
:param auth_id:
Authentication id.
:param password:
User password.
:param silent:
If True, raises an exception if auth_id or password are invalid.
:returns:
A dictionary with user data.
:raises:
``InvalidAuthIdError`` or ``InvalidPasswordError``.
"""
try:
user = self.user_model.get_by_auth_password(auth_id, password)
return self.user_to_dict(user)
except (InvalidAuthIdError, InvalidPasswordError):
if not silent:
raise
return None
def get_user_by_auth_token(self, user_id, token):
"""Returns a user dict based on user_id and auth token.
:param user_id:
User id.
:param token:
Authentication token.
:returns:
A tuple ``(user_dict, token_timestamp)``. Both values can be None.
The token timestamp will be None if the user is invalid or it
is valid but the token requires renewal.
"""
user, ts = self.user_model.get_by_auth_token(user_id, token)
return self.user_to_dict(user), ts
def create_auth_token(self, user_id):
"""Creates a new authentication token.
:param user_id:
Authentication id.
:returns:
A new authentication token.
"""
return self.user_model.create_auth_token(user_id)
def delete_auth_token(self, user_id, token):
"""Deletes an authentication token.
:param user_id:
User id.
:param token:
Authentication token.
"""
return self.user_model.delete_auth_token(user_id, token)
def user_to_dict(self, user):
"""Returns a dictionary based on a user object.
Extra attributes to be retrieved must be set in this module's
configuration.
:param user:
User object: an instance the custom user model.
:returns:
A dictionary with user data.
"""
if not user:
return None
user_dict = dict((a, getattr(user, a)) for a in self.user_attributes)
user_dict['user_id'] = user.get_id()
return user_dict
# Session related ---------------------------------------------------------
def get_session(self, request):
"""Returns an auth session.
:param request:
A :class:`webapp2.Request` instance.
:returns:
A session dict.
"""
store = sessions.get_store(request=request)
return store.get_session(self.config['cookie_name'],
backend=self.config['session_backend'])
def serialize_session(self, data):
"""Serializes values for a session.
:param data:
A dict with session data.
:returns:
A list with session data.
"""
try:
assert len(data) >= len(self.session_attributes)
return [data.get(k) for k in self.session_attributes]
except AssertionError:
logging.warning(
'Invalid user data: %r. Expected attributes: %r.' %
(data, self.session_attributes))
return None
def deserialize_session(self, data):
"""Deserializes values for a session.
:param data:
A list with session data.
:returns:
A dict with session data.
"""
try:
assert len(data) >= len(self.session_attributes)
return dict(zip(self.session_attributes, data))
except AssertionError:
logging.warning(
'Invalid user data: %r. Expected attributes: %r.' %
(data, self.session_attributes))
return None
# Validators --------------------------------------------------------------
def validate_password(self, auth_id, password, silent=False):
"""Validates a password.
Passwords are used to log-in using forms or to request auth tokens
from services.
:param auth_id:
Authentication id.
:param password:
Password to be checked.
:param silent:
If True, raises an exception if auth_id or password are invalid.
:returns:
user or None
:raises:
``InvalidAuthIdError`` or ``InvalidPasswordError``.
"""
return self.get_user_by_auth_password(auth_id, password, silent=silent)
def validate_token(self, user_id, token, token_ts=None):
"""Validates a token.
Tokens are random strings used to authenticate temporarily. They are
used to validate sessions or service requests.
:param user_id:
User id.
:param token:
Token to be checked.
:param token_ts:
Optional token timestamp used to pre-validate the token age.
:returns:
A tuple ``(user_dict, token)``.
"""
now = int(time.time())
delete = token_ts and ((now - token_ts) > self.config['token_max_age'])
create = False
if not delete:
# Try to fetch the user.
user, ts = self.get_user_by_auth_token(user_id, token)
if user:
# Now validate the real timestamp.
delete = (now - ts) > self.config['token_max_age']
create = (now - ts) > self.config['token_new_age']
if delete or create or not user:
if delete or create:
# Delete token from db.
self.delete_auth_token(user_id, token)
if delete:
user = None
token = None
return user, token
def validate_cache_timestamp(self, cache_ts, token_ts=None):
"""Validates a cache timestamp.
:param cache_ts:
Token timestamp to validate the cache age.
:param token_ts:
Token timestamp to validate the token age.
:returns:
True if it is valid, False otherwise.
"""
now = int(time.time())
valid = (now - cache_ts) < self.config['token_cache_age']
if valid and token_ts:
valid2 = (now - token_ts) < self.config['token_max_age']
valid3 = (now - token_ts) < self.config['token_new_age']
valid = valid2 and valid3
return valid
class Auth(object):
"""Authentication provider for a single request."""
#: A :class:`webapp2.Request` instance.
request = None
#: An :class:`AuthStore` instance.
store = None
#: Cached user for the request.
_user = None
def __init__(self, request):
"""Initializes the auth provider for a request.
:param request:
A :class:`webapp2.Request` instance.
"""
self.request = request
self.store = get_store(app=request.app)
# Retrieving a user -------------------------------------------------------
def _user_or_none(self):
return self._user if self._user is not _anon else None
def get_user_by_session(self, save_session=True):
"""Returns a user based on the current session.
:param save_session:
If True, saves the user in the session if authentication succeeds.
:returns:
A user dict or None.
"""
if self._user is None:
data = self.get_session_data(pop=True)
if not data:
self._user = _anon
else:
self._user = self.get_user_by_token(
user_id=data['user_id'], token=data['token'],
token_ts=data['token_ts'], cache=data,
cache_ts=data['cache_ts'], remember=data['remember'],
save_session=save_session)
return self._user_or_none()
def get_user_by_token(self, user_id, token, token_ts=None, cache=None,
cache_ts=None, remember=False, save_session=True):
"""Returns a user based on an authentication token.
:param user_id:
User id.
:param token:
Authentication token.
:param token_ts:
Token timestamp, used to perform pre-validation.
:param cache:
Cached user data (from the session).
:param cache_ts:
Cache timestamp.
:param remember:
If True, saves permanent sessions.
:param save_session:
If True, saves the user in the session if authentication succeeds.
:returns:
A user dict or None.
"""
if self._user is not None:
assert (self._user is not _anon and
self._user['user_id'] == user_id and
self._user['token'] == token)
return self._user_or_none()
if cache and cache_ts:
valid = self.store.validate_cache_timestamp(cache_ts, token_ts)
if valid:
self._user = cache
else:
cache_ts = None
if self._user is None:
# Fetch and validate the token.
self._user, token = self.store.validate_token(user_id, token,
token_ts=token_ts)
if self._user is None:
self._user = _anon
elif save_session:
if not token:
token_ts = None
self.set_session(self._user, token=token, token_ts=token_ts,
cache_ts=cache_ts, remember=remember)
return self._user_or_none()
def get_user_by_password(self, auth_id, password, remember=False,
save_session=True, silent=False):
"""Returns a user based on password credentials.
:param auth_id:
Authentication id.
:param password:
User password.
:param remember:
If True, saves permanent sessions.
:param save_session:
If True, saves the user in the session if authentication succeeds.
:param silent:
If True, raises an exception if auth_id or password are invalid.
:returns:
A user dict or None.
:raises:
``InvalidAuthIdError`` or ``InvalidPasswordError``.
"""
if save_session:
# During a login attempt, invalidate current session.
self.unset_session()
self._user = self.store.validate_password(auth_id, password,
silent=silent)
if not self._user:
self._user = _anon
elif save_session:
# This always creates a new token with new timestamp.
self.set_session(self._user, remember=remember)
return self._user_or_none()
# Storing and removing user from session ----------------------------------
@webapp2.cached_property
def session(self):
"""Auth session."""
return self.store.get_session(self.request)
def set_session(self, user, token=None, token_ts=None, cache_ts=None,
remember=False, **session_args):
"""Saves a user in the session.
:param user:
A dictionary with user data.
:param token:
A unique token to be persisted. If None, a new one is created.
:param token_ts:
Token timestamp. If None, a new one is created.
:param cache_ts:
Token cache timestamp. If None, a new one is created.
:remember:
If True, session is set to be persisted.
:param session_args:
Keyword arguments to set the session arguments.
"""
now = int(time.time())
token = token or self.store.create_auth_token(user['user_id'])
token_ts = token_ts or now
cache_ts = cache_ts or now
if remember:
max_age = self.store.config['token_max_age']
else:
max_age = None
session_args.setdefault('max_age', max_age)
# Create a new dict or just update user?
# We are doing the latter, and so the user dict will always have
# the session metadata (token, timestamps etc). This is easier to test.
# But we could store only user_id and custom user attributes instead.
user.update({
'token': token,
'token_ts': token_ts,
'cache_ts': cache_ts,
'remember': int(remember),
})
self.set_session_data(user, **session_args)
self._user = user
def unset_session(self):
"""Removes a user from the session and invalidates the auth token."""
self._user = None
data = self.get_session_data(pop=True)
if data:
# Invalidate current token.
self.store.delete_auth_token(data['user_id'], data['token'])
def get_session_data(self, pop=False):
"""Returns the session data as a dictionary.
:param pop:
If True, removes the session.
:returns:
A deserialized session, or None.
"""
func = self.session.pop if pop else self.session.get
rv = func('_user', None)
if rv is not None:
data = self.store.deserialize_session(rv)
if data:
return data
elif not pop:
self.session.pop('_user', None)
return None
def set_session_data(self, data, **session_args):
"""Sets the session data as a list.
:param data:
Deserialized session data.
:param session_args:
Extra arguments for the session.
"""
data = self.store.serialize_session(data)
if data is not None:
self.session['_user'] = data
self.session.container.session_args.update(session_args)
# Factories -------------------------------------------------------------------
#: Key used to store :class:`AuthStore` in the app registry.
_store_registry_key = 'webapp2_extras.auth.Auth'
#: Key used to store :class:`Auth` in the request registry.
_auth_registry_key = 'webapp2_extras.auth.Auth'
def get_store(factory=AuthStore, key=_store_registry_key, app=None):
"""Returns an instance of :class:`AuthStore` from the app registry.
It'll try to get it from the current app registry, and if it is not
registered it'll be instantiated and registered. A second call to this
function will return the same instance.
:param factory:
The callable used to build and register the instance if it is not yet
registered. The default is the class :class:`AuthStore` itself.
:param key:
The key used to store the instance in the registry. A default is used
if it is not set.
:param app:
A :class:`webapp2.WSGIApplication` instance used to store the instance.
The active app is used if it is not set.
"""
app = app or webapp2.get_app()
store = app.registry.get(key)
if not store:
store = app.registry[key] = factory(app)
return store
def set_store(store, key=_store_registry_key, app=None):
"""Sets an instance of :class:`AuthStore` in the app registry.
:param store:
An instance of :class:`AuthStore`.
:param key:
The key used to retrieve the instance from the registry. A default
is used if it is not set.
:param request:
A :class:`webapp2.WSGIApplication` instance used to retrieve the
instance. The active app is used if it is not set.
"""
app = app or webapp2.get_app()
app.registry[key] = store
def get_auth(factory=Auth, key=_auth_registry_key, request=None):
"""Returns an instance of :class:`Auth` from the request registry.
It'll try to get it from the current request registry, and if it is not
registered it'll be instantiated and registered. A second call to this
function will return the same instance.
:param factory:
The callable used to build and register the instance if it is not yet
registered. The default is the class :class:`Auth` itself.
:param key:
The key used to store the instance in the registry. A default is used
if it is not set.
:param request:
A :class:`webapp2.Request` instance used to store the instance. The
active request is used if it is not set.
"""
request = request or webapp2.get_request()
auth = request.registry.get(key)
if not auth:
auth = request.registry[key] = factory(request)
return auth
def set_auth(auth, key=_auth_registry_key, request=None):
"""Sets an instance of :class:`Auth` in the request registry.
:param auth:
An instance of :class:`Auth`.
:param key:
The key used to retrieve the instance from the registry. A default
is used if it is not set.
:param request:
A :class:`webapp2.Request` instance used to retrieve the instance. The
active request is used if it is not set.
"""
request = request or webapp2.get_request()
request.registry[key] = auth