blob: b2a9025c0cf3c702707a020a1c629ab2a6cf3fcd [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright (C) 2007, 2008 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'api.jscudder (Jeff Scudder)'
import re
import unittest
import urllib
import gdata.auth
CONSUMER_KEY = 'www.yourwebapp.com'
CONSUMER_SECRET = 'qB1P2kCFDpRjF+/Iww4'
RSA_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQDVbOaFW+KXecfFJn1PIzYHnNXFxhaQ36QM0K5uSb0Y8NeQUlD2
6t8aKgnm6mcb4vaopHjjdIGWgAzM5Dt0oPIiDXo+jSQbvCIXRduuAt+0cFGb2d+L
hALk4AwB8IVIkDJWwgo5Z2OLsP2r/wQlUYKm/tnvQaevK24jNYMLWVJl2QIDAQAB
AoGAU93ERBlUVEPFjaJPUX67p4gotNvfWDSZiXOjZ7FQPnG9s3e1WyH2Y5irZXMs
61dnp+NhobfRiGtvHEB/YJgyLRk/CJDnMKslo95e7o65IE9VkcyY6Yvt7YTslsRX
Eu7T0xLEA7ON46ypCwNLeWxpJ9SWisEKu2yZJnWauCXEsgUCQQD7b2ZuhGx3msoP
YEnwvucp0UxneCvb68otfERZ1J6NfNP47QJw6OwD3r1sWCJ27QZmpvtQH1f8sCk9
t22anGG7AkEA2UzXdtQ8H1uLAN/XXX2qoLuvJK5jRswHS4GeOg4pnnDSiHg3Vbva
AxmMIL93ufvIy/xdoENwDPfcI4CbYlrDewJAGWy7W+OSIEoLsqBW+bwkHetnIXNa
ZAOkzxKoyrigS8hamupEe+xhqUaFuwXyfjobkpfCA+kXeZrKoM4CjEbR7wJAHMbf
Vd4/ZAu0edYq6DenLAgO5rWtcge9A5PTx25utovMZcQ917273mM4unGAwoGEkvcF
0x57LUx5u73hVgIdFwJBAKWGuHRwGPgTWYvhpHM0qveH+8KdU9BUt/kV4ONxIVDB
ftetEmJirqOGLECbImoLcUwQrgfMW4ZCxOioJMz/gY0=
-----END RSA PRIVATE KEY-----
"""
class AuthModuleUtilitiesTest(unittest.TestCase):
def testGenerateClientLoginRequestBody(self):
body = gdata.auth.GenerateClientLoginRequestBody('jo@gmail.com',
'password', 'test service', 'gdata.auth test')
expected_parameters = {'Email':r'jo%40gmail.com', 'Passwd':'password',
'service':'test+service', 'source':'gdata.auth+test',
'accountType':'HOSTED_OR_GOOGLE'}
self.__matchBody(body, expected_parameters)
body = gdata.auth.GenerateClientLoginRequestBody('jo@gmail.com',
'password', 'test service', 'gdata.auth test', account_type='A TEST',
captcha_token='12345', captcha_response='test')
expected_parameters['accountType'] = 'A+TEST'
expected_parameters['logintoken'] = '12345'
expected_parameters['logincaptcha'] = 'test'
self.__matchBody(body, expected_parameters)
def __matchBody(self, body, expected_name_value_pairs):
parameters = body.split('&')
for param in parameters:
(name, value) = param.split('=')
self.assert_(expected_name_value_pairs[name] == value)
def testGenerateClientLoginAuthToken(self):
http_body = ('SID=DQAAAGgA7Zg8CTN\r\n'
'LSID=DQAAAGsAlk8BBbG\r\n'
'Auth=DQAAAGgAdk3fA5N')
self.assert_(gdata.auth.GenerateClientLoginAuthToken(http_body) ==
'GoogleLogin auth=DQAAAGgAdk3fA5N')
class GenerateClientLoginRequestBodyTest(unittest.TestCase):
def testPostBodyShouldMatchShortExample(self):
auth_body = gdata.auth.GenerateClientLoginRequestBody('johndoe@gmail.com',
'north23AZ', 'cl', 'Gulp-CalGulp-1.05')
self.assert_(-1 < auth_body.find('Email=johndoe%40gmail.com'))
self.assert_(-1 < auth_body.find('Passwd=north23AZ'))
self.assert_(-1 < auth_body.find('service=cl'))
self.assert_(-1 < auth_body.find('source=Gulp-CalGulp-1.05'))
def testPostBodyShouldMatchLongExample(self):
auth_body = gdata.auth.GenerateClientLoginRequestBody('johndoe@gmail.com',
'north23AZ', 'cl', 'Gulp-CalGulp-1.05',
captcha_token='DQAAAGgA...dkI1', captcha_response='brinmar')
self.assert_(-1 < auth_body.find('logintoken=DQAAAGgA...dkI1'))
self.assert_(-1 < auth_body.find('logincaptcha=brinmar'))
def testEquivalenceWithOldLogic(self):
email = 'jo@gmail.com'
password = 'password'
account_type = 'HOSTED'
service = 'test'
source = 'auth test'
old_request_body = urllib.urlencode({'Email': email,
'Passwd': password,
'accountType': account_type,
'service': service,
'source': source})
new_request_body = gdata.auth.GenerateClientLoginRequestBody(email,
password, service, source, account_type=account_type)
for parameter in old_request_body.split('&'):
self.assert_(-1 < new_request_body.find(parameter))
class GenerateAuthSubUrlTest(unittest.TestCase):
def testDefaultParameters(self):
url = gdata.auth.GenerateAuthSubUrl('http://example.com/xyz?x=5',
'http://www.google.com/test/feeds')
self.assert_(-1 < url.find(
r'scope=http%3A%2F%2Fwww.google.com%2Ftest%2Ffeeds'))
self.assert_(-1 < url.find(
r'next=http%3A%2F%2Fexample.com%2Fxyz%3Fx%3D5'))
self.assert_(-1 < url.find('secure=0'))
self.assert_(-1 < url.find('session=1'))
def testAllParameters(self):
url = gdata.auth.GenerateAuthSubUrl('http://example.com/xyz?x=5',
'http://www.google.com/test/feeds', secure=True, session=False,
request_url='https://example.com/auth')
self.assert_(-1 < url.find(
r'scope=http%3A%2F%2Fwww.google.com%2Ftest%2Ffeeds'))
self.assert_(-1 < url.find(
r'next=http%3A%2F%2Fexample.com%2Fxyz%3Fx%3D5'))
self.assert_(-1 < url.find('secure=1'))
self.assert_(-1 < url.find('session=0'))
self.assert_(url.startswith('https://example.com/auth'))
class GenerateOAuthRequestTokenUrlTest(unittest.TestCase):
def testDefaultParameters(self):
oauth_input_params = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY,
rsa_key=RSA_KEY)
scopes = [
'http://abcd.example.com/feeds',
'http://www.example.com/abcd/feeds'
]
url = gdata.auth.GenerateOAuthRequestTokenUrl(
oauth_input_params, scopes=scopes)
self.assertEquals('https', url.protocol)
self.assertEquals('www.google.com', url.host)
self.assertEquals('/accounts/OAuthGetRequestToken', url.path)
self.assertEquals('1.0', url.params['oauth_version'])
self.assertEquals('RSA-SHA1', url.params['oauth_signature_method'])
self.assert_(url.params['oauth_nonce'])
self.assert_(url.params['oauth_timestamp'])
actual_scopes = url.params['scope'].split(' ')
self.assertEquals(2, len(actual_scopes))
for scope in actual_scopes:
self.assert_(scope in scopes)
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key'])
self.assert_(url.params['oauth_signature'])
def testAllParameters(self):
oauth_input_params = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET)
scopes = ['http://abcd.example.com/feeds']
url = gdata.auth.GenerateOAuthRequestTokenUrl(
oauth_input_params, scopes=scopes,
request_token_url='https://www.example.com/accounts/OAuthRequestToken',
extra_parameters={'oauth_version': '2.0', 'my_param': 'my_value'})
self.assertEquals('https', url.protocol)
self.assertEquals('www.example.com', url.host)
self.assertEquals('/accounts/OAuthRequestToken', url.path)
self.assertEquals('2.0', url.params['oauth_version'])
self.assertEquals('HMAC-SHA1', url.params['oauth_signature_method'])
self.assert_(url.params['oauth_nonce'])
self.assert_(url.params['oauth_timestamp'])
actual_scopes = url.params['scope'].split(' ')
self.assertEquals(1, len(actual_scopes))
for scope in actual_scopes:
self.assert_(scope in scopes)
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key'])
self.assert_(url.params['oauth_signature'])
self.assertEquals('my_value', url.params['my_param'])
class GenerateOAuthAuthorizationUrlTest(unittest.TestCase):
def testDefaultParameters(self):
token_key = 'ABCDDSFFDSG'
token_secret = 'SDFDSGSDADADSAF'
request_token = gdata.auth.OAuthToken(key=token_key, secret=token_secret)
url = gdata.auth.GenerateOAuthAuthorizationUrl(request_token)
self.assertEquals('https', url.protocol)
self.assertEquals('www.google.com', url.host)
self.assertEquals('/accounts/OAuthAuthorizeToken', url.path)
self.assertEquals(token_key, url.params['oauth_token'])
def testAllParameters(self):
token_key = 'ABCDDSFFDSG'
token_secret = 'SDFDSGSDADADSAF'
scopes = [
'http://abcd.example.com/feeds',
'http://www.example.com/abcd/feeds'
]
request_token = gdata.auth.OAuthToken(key=token_key, secret=token_secret,
scopes=scopes)
url = gdata.auth.GenerateOAuthAuthorizationUrl(
request_token,
authorization_url='https://www.example.com/accounts/OAuthAuthToken',
callback_url='http://www.yourwebapp.com/print',
extra_params={'permission': '1'},
include_scopes_in_callback=True, scopes_param_prefix='token_scope')
self.assertEquals('https', url.protocol)
self.assertEquals('www.example.com', url.host)
self.assertEquals('/accounts/OAuthAuthToken', url.path)
self.assertEquals(token_key, url.params['oauth_token'])
expected_callback_url = ('http://www.yourwebapp.com/print?'
'token_scope=http%3A%2F%2Fabcd.example.com%2Ffeeds'
'+http%3A%2F%2Fwww.example.com%2Fabcd%2Ffeeds')
self.assertEquals(expected_callback_url, url.params['oauth_callback'])
class GenerateOAuthAccessTokenUrlTest(unittest.TestCase):
def testDefaultParameters(self):
token_key = 'ABCDDSFFDSG'
token_secret = 'SDFDSGSDADADSAF'
authorized_request_token = gdata.auth.OAuthToken(key=token_key,
secret=token_secret)
oauth_input_params = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET)
url = gdata.auth.GenerateOAuthAccessTokenUrl(authorized_request_token,
oauth_input_params)
self.assertEquals('https', url.protocol)
self.assertEquals('www.google.com', url.host)
self.assertEquals('/accounts/OAuthGetAccessToken', url.path)
self.assertEquals(token_key, url.params['oauth_token'])
self.assertEquals('1.0', url.params['oauth_version'])
self.assertEquals('HMAC-SHA1', url.params['oauth_signature_method'])
self.assert_(url.params['oauth_nonce'])
self.assert_(url.params['oauth_timestamp'])
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key'])
self.assert_(url.params['oauth_signature'])
def testAllParameters(self):
token_key = 'ABCDDSFFDSG'
authorized_request_token = gdata.auth.OAuthToken(key=token_key)
oauth_input_params = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY,
rsa_key=RSA_KEY)
url = gdata.auth.GenerateOAuthAccessTokenUrl(
authorized_request_token, oauth_input_params,
access_token_url='https://www.example.com/accounts/OAuthGetAccessToken',
oauth_version= '2.0')
self.assertEquals('https', url.protocol)
self.assertEquals('www.example.com', url.host)
self.assertEquals('/accounts/OAuthGetAccessToken', url.path)
self.assertEquals(token_key, url.params['oauth_token'])
self.assertEquals('2.0', url.params['oauth_version'])
self.assertEquals('RSA-SHA1', url.params['oauth_signature_method'])
self.assert_(url.params['oauth_nonce'])
self.assert_(url.params['oauth_timestamp'])
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key'])
self.assert_(url.params['oauth_signature'])
class ExtractAuthSubTokensTest(unittest.TestCase):
def testGetTokenFromUrl(self):
url = 'http://www.yourwebapp.com/showcalendar.html?token=CKF50YzIH'
self.assert_(gdata.auth.AuthSubTokenFromUrl(url) ==
'AuthSub token=CKF50YzIH')
self.assert_(gdata.auth.TokenFromUrl(url) == 'CKF50YzIH')
url = 'http://www.yourwebapp.com/showcalendar.html?token==tokenCKF50YzIH='
self.assert_(gdata.auth.AuthSubTokenFromUrl(url) ==
'AuthSub token==tokenCKF50YzIH=')
self.assert_(gdata.auth.TokenFromUrl(url) == '=tokenCKF50YzIH=')
def testGetTokenFromHttpResponse(self):
response_body = ('Token=DQAA...7DCTN\r\n'
'Expiration=20061004T123456Z')
self.assert_(gdata.auth.AuthSubTokenFromHttpBody(response_body) ==
'AuthSub token=DQAA...7DCTN')
class CreateAuthSubTokenFlowTest(unittest.TestCase):
def testGenerateRequest(self):
request_url = gdata.auth.generate_auth_sub_url(next='http://example.com',
scopes=['http://www.blogger.com/feeds/',
'http://www.google.com/base/feeds/'])
self.assertEquals(request_url.protocol, 'https')
self.assertEquals(request_url.host, 'www.google.com')
self.assertEquals(request_url.params['scope'],
'http://www.blogger.com/feeds/ http://www.google.com/base/feeds/')
self.assertEquals(request_url.params['hd'], 'default')
self.assert_(request_url.params['next'].find('auth_sub_scopes') > -1)
self.assert_(request_url.params['next'].startswith('http://example.com'))
# Use a more complicated 'next' URL.
request_url = gdata.auth.generate_auth_sub_url(
next='http://example.com/?token_scope=http://www.blogger.com/feeds/',
scopes=['http://www.blogger.com/feeds/',
'http://www.google.com/base/feeds/'])
self.assert_(request_url.params['next'].find('auth_sub_scopes') > -1)
self.assert_(request_url.params['next'].find('token_scope') > -1)
self.assert_(request_url.params['next'].startswith('http://example.com/'))
def testParseNextUrl(self):
url = ('http://example.com/?auth_sub_scopes=http%3A%2F%2Fwww.blogger.com'
'%2Ffeeds%2F+http%3A%2F%2Fwww.google.com%2Fbase%2Ffeeds%2F&'
'token=my_nifty_token')
token = gdata.auth.extract_auth_sub_token_from_url(url)
self.assertEquals(token.get_token_string(), 'my_nifty_token')
self.assert_(isinstance(token, gdata.auth.AuthSubToken))
self.assert_(token.valid_for_scope('http://www.blogger.com/feeds/'))
self.assert_(token.valid_for_scope('http://www.google.com/base/feeds/'))
self.assert_(
not token.valid_for_scope('http://www.google.com/calendar/feeds/'))
# Parse a more complicated response.
url = ('http://example.com/?auth_sub_scopes=http%3A%2F%2Fwww.blogger.com'
'%2Ffeeds%2F+http%3A%2F%2Fwww.google.com%2Fbase%2Ffeeds%2F&'
'token_scope=http%3A%2F%2Fwww.blogger.com%2Ffeeds%2F&'
'token=second_token')
token = gdata.auth.extract_auth_sub_token_from_url(url)
self.assertEquals(token.get_token_string(), 'second_token')
self.assert_(isinstance(token, gdata.auth.AuthSubToken))
self.assert_(token.valid_for_scope('http://www.blogger.com/feeds/'))
self.assert_(token.valid_for_scope('http://www.google.com/base/feeds/'))
self.assert_(
not token.valid_for_scope('http://www.google.com/calendar/feeds/'))
def testParseNextWithNoToken(self):
token = gdata.auth.extract_auth_sub_token_from_url('http://example.com/')
self.assert_(token is None)
token = gdata.auth.extract_auth_sub_token_from_url(
'http://example.com/?no_token=foo&other=1')
self.assert_(token is None)
class ExtractClientLoginTokenTest(unittest.TestCase):
def testExtractFromBodyWithScopes(self):
http_body_string = ('SID=DQAAAGgA7Zg8CTN\r\n'
'LSID=DQAAAGsAlk8BBbG\r\n'
'Auth=DQAAAGgAdk3fA5N')
token = gdata.auth.extract_client_login_token(http_body_string,
['http://docs.google.com/feeds/'])
self.assertEquals(token.get_token_string(), 'DQAAAGgAdk3fA5N')
self.assert_(isinstance(token, gdata.auth.ClientLoginToken))
self.assert_(token.valid_for_scope('http://docs.google.com/feeds/'))
self.assert_(not token.valid_for_scope('http://www.blogger.com/feeds'))
class ExtractOAuthTokensTest(unittest.TestCase):
def testOAuthTokenFromUrl(self):
scope_1 = 'http://docs.google.com/feeds/'
scope_2 = 'http://www.blogger.com/feeds/'
# Case 1: token and scopes both are present.
url = ('http://dummy.com/?oauth_token_scope=http%3A%2F%2Fwww.blogger.com'
'%2Ffeeds%2F+http%3A%2F%2Fdocs.google.com%2Ffeeds%2F&'
'oauth_token=CMns6t7MCxDz__8B')
token = gdata.auth.OAuthTokenFromUrl(url)
self.assertEquals('CMns6t7MCxDz__8B', token.key)
self.assertEquals(2, len(token.scopes))
self.assert_(scope_1 in token.scopes)
self.assert_(scope_2 in token.scopes)
# Case 2: token and scopes both are present but scope_param_prefix
# passed does not match the one present in the URL.
url = ('http://dummy.com/?oauth_token_scope=http%3A%2F%2Fwww.blogger.com'
'%2Ffeeds%2F+http%3A%2F%2Fdocs.google.com%2Ffeeds%2F&'
'oauth_token=CMns6t7MCxDz__8B')
token = gdata.auth.OAuthTokenFromUrl(url,
scopes_param_prefix='token_scope')
self.assertEquals('CMns6t7MCxDz__8B', token.key)
self.assert_(not token.scopes)
# Case 3: None present.
url = ('http://dummy.com/?no_oauth_token_scope=http%3A%2F%2Fwww.blogger.com'
'%2Ffeeds%2F+http%3A%2F%2Fdocs.google.com%2Ffeeds%2F&'
'no_oauth_token=CMns6t7MCxDz__8B')
token = gdata.auth.OAuthTokenFromUrl(url)
self.assert_(token is None)
def testOAuthTokenFromHttpBody(self):
token_key = 'ABCD'
token_secret = 'XYZ'
# Case 1: token key and secret both present single time.
http_body = 'oauth_token=%s&oauth_token_secret=%s' % (token_key,
token_secret)
token = gdata.auth.OAuthTokenFromHttpBody(http_body)
self.assertEquals(token_key, token.key)
self.assertEquals(token_secret, token.secret)
class OAuthInputParametersTest(unittest.TestCase):
def setUp(self):
self.oauth_input_parameters_hmac = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET)
self.oauth_input_parameters_rsa = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY,
rsa_key=RSA_KEY)
def testGetSignatureMethod(self):
self.assertEquals(
'HMAC-SHA1',
self.oauth_input_parameters_hmac.GetSignatureMethod().get_name())
rsa_signature_method = self.oauth_input_parameters_rsa.GetSignatureMethod()
self.assertEquals('RSA-SHA1', rsa_signature_method.get_name())
self.assertEquals(RSA_KEY, rsa_signature_method._fetch_private_cert(None))
def testGetConsumer(self):
self.assertEquals(CONSUMER_KEY,
self.oauth_input_parameters_hmac.GetConsumer().key)
self.assertEquals(CONSUMER_KEY,
self.oauth_input_parameters_rsa.GetConsumer().key)
self.assertEquals(CONSUMER_SECRET,
self.oauth_input_parameters_hmac.GetConsumer().secret)
self.assert_(self.oauth_input_parameters_rsa.GetConsumer().secret is None)
class TokenClassesTest(unittest.TestCase):
def testClientLoginToAndFromString(self):
token = gdata.auth.ClientLoginToken()
token.set_token_string('foo')
self.assertEquals(token.get_token_string(), 'foo')
self.assertEquals(token.auth_header, '%s%s' % (
gdata.auth.PROGRAMMATIC_AUTH_LABEL, 'foo'))
token.set_token_string(token.get_token_string())
self.assertEquals(token.get_token_string(), 'foo')
def testAuthSubToAndFromString(self):
token = gdata.auth.AuthSubToken()
token.set_token_string('foo')
self.assertEquals(token.get_token_string(), 'foo')
self.assertEquals(token.auth_header, '%s%s' % (
gdata.auth.AUTHSUB_AUTH_LABEL, 'foo'))
token.set_token_string(token.get_token_string())
self.assertEquals(token.get_token_string(), 'foo')
def testSecureAuthSubToAndFromString(self):
# Case 1: no token.
token = gdata.auth.SecureAuthSubToken(RSA_KEY)
token.set_token_string('foo')
self.assertEquals(token.get_token_string(), 'foo')
token.set_token_string(token.get_token_string())
self.assertEquals(token.get_token_string(), 'foo')
self.assertEquals(str(token), 'foo')
# Case 2: token is a string
token = gdata.auth.SecureAuthSubToken(RSA_KEY, token_string='foo')
self.assertEquals(token.get_token_string(), 'foo')
token.set_token_string(token.get_token_string())
self.assertEquals(token.get_token_string(), 'foo')
self.assertEquals(str(token), 'foo')
def testOAuthToAndFromString(self):
token_key = 'ABCD'
token_secret = 'XYZ'
# Case 1: token key and secret both present single time.
token_string = 'oauth_token=%s&oauth_token_secret=%s' % (token_key,
token_secret)
token = gdata.auth.OAuthToken()
token.set_token_string(token_string)
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[0]))
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[1]))
self.assertEquals(token_key, token.key)
self.assertEquals(token_secret, token.secret)
# Case 2: token key and secret both present multiple times with unwanted
# parameters.
token_string = ('oauth_token=%s&oauth_token_secret=%s&'
'oauth_token=%s&ExtraParams=GarbageString' % (token_key,
token_secret,
'LMNO'))
token = gdata.auth.OAuthToken()
token.set_token_string(token_string)
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[0]))
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[1]))
self.assertEquals(token_key, token.key)
self.assertEquals(token_secret, token.secret)
# Case 3: Only token key present.
token_string = 'oauth_token=%s' % (token_key,)
token = gdata.auth.OAuthToken()
token.set_token_string(token_string)
self.assertEquals(token_string, token.get_token_string())
self.assertEquals(token_key, token.key)
self.assert_(not token.secret)
# Case 4: Only token key present.
token_string = 'oauth_token_secret=%s' % (token_secret,)
token = gdata.auth.OAuthToken()
token.set_token_string(token_string)
self.assertEquals(token_string, token.get_token_string())
self.assertEquals(token_secret, token.secret)
self.assert_(not token.key)
# Case 5: None present.
token_string = ''
token = gdata.auth.OAuthToken()
token.set_token_string(token_string)
self.assert_(token.get_token_string() is None)
self.assert_(not token.key)
self.assert_(not token.secret)
def testSecureAuthSubGetAuthHeader(self):
# Case 1: Presence of OAuth token (in case of 3-legged OAuth)
url = 'http://dummy.com/?q=notebook&s=true'
token = gdata.auth.SecureAuthSubToken(RSA_KEY, token_string='foo')
auth_header = token.GetAuthHeader('GET', url)
self.assert_('Authorization' in auth_header)
header_value = auth_header['Authorization']
self.assert_(header_value.startswith(r'AuthSub token="foo"'))
self.assert_(-1 < header_value.find(r'sigalg="rsa-sha1"'))
self.assert_(-1 < header_value.find(r'data="'))
self.assert_(-1 < header_value.find(r'sig="'))
m = re.search(r'data="(.*?)"', header_value)
self.assert_(m is not None)
data = m.group(1)
self.assert_(data.startswith('GET'))
self.assert_(-1 < data.find(url))
def testOAuthGetAuthHeader(self):
# Case 1: Presence of OAuth token (in case of 3-legged OAuth)
oauth_input_params = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY,
rsa_key=RSA_KEY)
token = gdata.auth.OAuthToken(key='ABCDDSFFDSG',
oauth_input_params=oauth_input_params)
auth_header = token.GetAuthHeader('GET',
'http://dummy.com/?q=notebook&s=true',
realm='http://dummy.com')
self.assert_('Authorization' in auth_header)
header_value = auth_header['Authorization']
self.assert_(-1 < header_value.find(r'OAuth realm="http://dummy.com"'))
self.assert_(-1 < header_value.find(r'oauth_version="1.0"'))
self.assert_(-1 < header_value.find(r'oauth_token="ABCDDSFFDSG"'))
self.assert_(-1 < header_value.find(r'oauth_nonce="'))
self.assert_(-1 < header_value.find(r'oauth_timestamp="'))
self.assert_(-1 < header_value.find(r'oauth_signature="'))
self.assert_(-1 < header_value.find(
r'oauth_consumer_key="%s"' % CONSUMER_KEY))
self.assert_(-1 < header_value.find(r'oauth_signature_method="RSA-SHA1"'))
# Case 2: Absence of OAuth token (in case of 2-legged OAuth)
oauth_input_params = gdata.auth.OAuthInputParams(
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET)
token = gdata.auth.OAuthToken(oauth_input_params=oauth_input_params)
auth_header = token.GetAuthHeader(
'GET', 'http://dummy.com/?xoauth_requestor_id=user@gmail.com&q=book')
self.assert_('Authorization' in auth_header)
header_value = auth_header['Authorization']
self.assert_(-1 < header_value.find(r'OAuth realm=""'))
self.assert_(-1 < header_value.find(r'oauth_version="1.0"'))
self.assertEquals(-1, header_value.find(r'oauth_token='))
self.assert_(-1 < header_value.find(r'oauth_nonce="'))
self.assert_(-1 < header_value.find(r'oauth_timestamp="'))
self.assert_(-1 < header_value.find(r'oauth_signature="'))
self.assert_(-1 < header_value.find(
r'oauth_consumer_key="%s"' % CONSUMER_KEY))
self.assert_(-1 < header_value.find(r'oauth_signature_method="HMAC-SHA1"'))
if __name__ == '__main__':
unittest.main()