blob: 2401598241b395a02748c801e19655d428ca9c41 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Python datastore class User to be used as a datastore data type.
Classes defined here:
User: object representing a user. A user could be a Google Accounts user
or a federated user.
Error: base exception type
UserNotFoundError: UserService exception
RedirectTooLongError: UserService exception
NotAllowedError: UserService exception
"""
import os
from google.appengine.api import apiproxy_stub_map
from google.appengine.api import user_service_pb
from google.appengine.runtime import apiproxy_errors
class Error(Exception):
"""Base User error type."""
class UserNotFoundError(Error):
"""Raised by User.__init__() when there's no email argument and no user is
logged in."""
class RedirectTooLongError(Error):
"""Raised by UserService calls if the generated redirect URL was too long.
"""
class NotAllowedError(Error):
"""Raised by UserService calls if the requested redirect URL is not allowed.
"""
class User(object):
"""A user.
We provide the email address, nickname, and id for a user.
A nickname is a human-readable string which uniquely identifies a Google
user, akin to a username. It will be an email address for some users, but
not all.
A user could be a Google Accounts user or a federated login user.
federated_identity and federated_provider are only avaliable for
federated users.
"""
__user_id = None
__federated_identity = None
__federated_provider = None
def __init__(self, email=None, _auth_domain=None,
_user_id=None, federated_identity=None, federated_provider=None,
_strict_mode=True):
"""Constructor.
Args:
email: An optional string of the user's email address. It defaults to
the current user's email address.
federated_identity: federated identity of user. It defaults to the current
user's federated identity.
federated_provider: federated provider url of user.
Raises:
UserNotFoundError: Raised if the user is not logged in and both email
and federated identity are empty.
"""
if _auth_domain is None:
_auth_domain = os.environ.get('AUTH_DOMAIN')
assert _auth_domain
if email is None and federated_identity is None:
email = os.environ.get('USER_EMAIL', email)
_user_id = os.environ.get('USER_ID', _user_id)
federated_identity = os.environ.get('FEDERATED_IDENTITY',
federated_identity)
federated_provider = os.environ.get('FEDERATED_PROVIDER',
federated_provider)
if email is None:
email = ''
if not email and not federated_identity and _strict_mode:
raise UserNotFoundError
self.__email = email
self.__federated_identity = federated_identity
self.__federated_provider = federated_provider
self.__auth_domain = _auth_domain
self.__user_id = _user_id or None
def nickname(self):
"""Return this user's nickname.
The nickname will be a unique, human readable identifier for this user
with respect to this application. It will be an email address for some
users, part of the email address for some users, and the federated identity
for federated users who have not asserted an email address.
"""
if (self.__email and self.__auth_domain and
self.__email.endswith('@' + self.__auth_domain)):
suffix_len = len(self.__auth_domain) + 1
return self.__email[:-suffix_len]
elif self.__federated_identity:
return self.__federated_identity
else:
return self.__email
def email(self):
"""Return this user's email address."""
return self.__email
def user_id(self):
"""Return either a permanent unique identifying string or None.
If the email address was set explicity, this will return None.
"""
return self.__user_id
def auth_domain(self):
"""Return this user's auth domain.
This method is internal and should not be used by client applications.
"""
return self.__auth_domain
def federated_identity(self):
"""Return this user's federated identity, None if not a federated user."""
return self.__federated_identity
def federated_provider(self):
"""Return this user's federated provider, None if not a federated user."""
return self.__federated_provider
def __unicode__(self):
return unicode(self.nickname())
def __str__(self):
return str(self.nickname())
def __repr__(self):
values = []
if self.__email:
values.append("email='%s'" % self.__email)
if self.__federated_identity:
values.append("federated_identity='%s'" % self.__federated_identity)
if self.__user_id:
values.append("_user_id='%s'" % self.__user_id)
return 'users.User(%s)' % ','.join(values)
def __hash__(self):
if self.__federated_identity:
return hash((self.__federated_identity, self.__auth_domain))
else:
return hash((self.__email, self.__auth_domain))
def __cmp__(self, other):
if not isinstance(other, User):
return NotImplemented
if self.__federated_identity:
return cmp((self.__federated_identity, self.__auth_domain),
(other.__federated_identity, other.__auth_domain))
else:
return cmp((self.__email, self.__auth_domain),
(other.__email, other.__auth_domain))
def create_login_url(dest_url=None, _auth_domain=None,
federated_identity=None):
"""Computes the login URL for redirection.
Args:
dest_url: String that is the desired final destination URL for the user
once login is complete. If 'dest_url' does not have a host
specified, we will use the host from the current request.
federated_identity: federated_identity is used to trigger OpenId Login
flow, an empty value will trigger Google OpenID Login
by default.
Returns:
Login URL as a string. If federated_identity is set, this will be
a federated login using the specified identity. If not, this
will use Google Accounts.
"""
req = user_service_pb.CreateLoginURLRequest()
resp = user_service_pb.CreateLoginURLResponse()
if dest_url:
req.set_destination_url(dest_url)
else:
req.set_destination_url('')
if _auth_domain:
req.set_auth_domain(_auth_domain)
if federated_identity:
req.set_federated_identity(federated_identity)
try:
apiproxy_stub_map.MakeSyncCall('user', 'CreateLoginURL', req, resp)
except apiproxy_errors.ApplicationError, e:
if (e.application_error ==
user_service_pb.UserServiceError.REDIRECT_URL_TOO_LONG):
raise RedirectTooLongError
elif (e.application_error ==
user_service_pb.UserServiceError.NOT_ALLOWED):
raise NotAllowedError
else:
raise e
return resp.login_url()
CreateLoginURL = create_login_url
def create_logout_url(dest_url, _auth_domain=None):
"""Computes the logout URL for this request and specified destination URL,
for both federated login App and Google Accounts App.
Args:
dest_url: String that is the desired final destination URL for the user
once logout is complete. If 'dest_url' does not have a host
specified, we will use the host from the current request.
Returns:
Logout URL as a string
"""
req = user_service_pb.CreateLogoutURLRequest()
resp = user_service_pb.CreateLogoutURLResponse()
req.set_destination_url(dest_url)
if _auth_domain:
req.set_auth_domain(_auth_domain)
try:
apiproxy_stub_map.MakeSyncCall('user', 'CreateLogoutURL', req, resp)
except apiproxy_errors.ApplicationError, e:
if (e.application_error ==
user_service_pb.UserServiceError.REDIRECT_URL_TOO_LONG):
raise RedirectTooLongError
else:
raise e
return resp.logout_url()
CreateLogoutURL = create_logout_url
def get_current_user():
try:
return User()
except UserNotFoundError:
return None
GetCurrentUser = get_current_user
def is_current_user_admin():
"""Return true if the user making this request is an admin for this
application, false otherwise.
We specifically make this a separate function, and not a member function of
the User class, because admin status is not persisted in the datastore. It
only exists for the user making this request right now.
"""
return (os.environ.get('USER_IS_ADMIN', '0')) == '1'
IsCurrentUserAdmin = is_current_user_admin