#!/usr/bin/env python | |
# | |
# Copyright (C) 2007, 2008 Google Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
__author__ = 'api.jscudder (Jeff Scudder)' | |
import re | |
import unittest | |
import urllib | |
import gdata.auth | |
CONSUMER_KEY = 'www.yourwebapp.com' | |
CONSUMER_SECRET = 'qB1P2kCFDpRjF+/Iww4' | |
RSA_KEY = """-----BEGIN RSA PRIVATE KEY----- | |
MIICXAIBAAKBgQDVbOaFW+KXecfFJn1PIzYHnNXFxhaQ36QM0K5uSb0Y8NeQUlD2 | |
6t8aKgnm6mcb4vaopHjjdIGWgAzM5Dt0oPIiDXo+jSQbvCIXRduuAt+0cFGb2d+L | |
hALk4AwB8IVIkDJWwgo5Z2OLsP2r/wQlUYKm/tnvQaevK24jNYMLWVJl2QIDAQAB | |
AoGAU93ERBlUVEPFjaJPUX67p4gotNvfWDSZiXOjZ7FQPnG9s3e1WyH2Y5irZXMs | |
61dnp+NhobfRiGtvHEB/YJgyLRk/CJDnMKslo95e7o65IE9VkcyY6Yvt7YTslsRX | |
Eu7T0xLEA7ON46ypCwNLeWxpJ9SWisEKu2yZJnWauCXEsgUCQQD7b2ZuhGx3msoP | |
YEnwvucp0UxneCvb68otfERZ1J6NfNP47QJw6OwD3r1sWCJ27QZmpvtQH1f8sCk9 | |
t22anGG7AkEA2UzXdtQ8H1uLAN/XXX2qoLuvJK5jRswHS4GeOg4pnnDSiHg3Vbva | |
AxmMIL93ufvIy/xdoENwDPfcI4CbYlrDewJAGWy7W+OSIEoLsqBW+bwkHetnIXNa | |
ZAOkzxKoyrigS8hamupEe+xhqUaFuwXyfjobkpfCA+kXeZrKoM4CjEbR7wJAHMbf | |
Vd4/ZAu0edYq6DenLAgO5rWtcge9A5PTx25utovMZcQ917273mM4unGAwoGEkvcF | |
0x57LUx5u73hVgIdFwJBAKWGuHRwGPgTWYvhpHM0qveH+8KdU9BUt/kV4ONxIVDB | |
ftetEmJirqOGLECbImoLcUwQrgfMW4ZCxOioJMz/gY0= | |
-----END RSA PRIVATE KEY----- | |
""" | |
class AuthModuleUtilitiesTest(unittest.TestCase): | |
def testGenerateClientLoginRequestBody(self): | |
body = gdata.auth.GenerateClientLoginRequestBody('jo@gmail.com', | |
'password', 'test service', 'gdata.auth test') | |
expected_parameters = {'Email':r'jo%40gmail.com', 'Passwd':'password', | |
'service':'test+service', 'source':'gdata.auth+test', | |
'accountType':'HOSTED_OR_GOOGLE'} | |
self.__matchBody(body, expected_parameters) | |
body = gdata.auth.GenerateClientLoginRequestBody('jo@gmail.com', | |
'password', 'test service', 'gdata.auth test', account_type='A TEST', | |
captcha_token='12345', captcha_response='test') | |
expected_parameters['accountType'] = 'A+TEST' | |
expected_parameters['logintoken'] = '12345' | |
expected_parameters['logincaptcha'] = 'test' | |
self.__matchBody(body, expected_parameters) | |
def __matchBody(self, body, expected_name_value_pairs): | |
parameters = body.split('&') | |
for param in parameters: | |
(name, value) = param.split('=') | |
self.assert_(expected_name_value_pairs[name] == value) | |
def testGenerateClientLoginAuthToken(self): | |
http_body = ('SID=DQAAAGgA7Zg8CTN\r\n' | |
'LSID=DQAAAGsAlk8BBbG\r\n' | |
'Auth=DQAAAGgAdk3fA5N') | |
self.assert_(gdata.auth.GenerateClientLoginAuthToken(http_body) == | |
'GoogleLogin auth=DQAAAGgAdk3fA5N') | |
class GenerateClientLoginRequestBodyTest(unittest.TestCase): | |
def testPostBodyShouldMatchShortExample(self): | |
auth_body = gdata.auth.GenerateClientLoginRequestBody('johndoe@gmail.com', | |
'north23AZ', 'cl', 'Gulp-CalGulp-1.05') | |
self.assert_(-1 < auth_body.find('Email=johndoe%40gmail.com')) | |
self.assert_(-1 < auth_body.find('Passwd=north23AZ')) | |
self.assert_(-1 < auth_body.find('service=cl')) | |
self.assert_(-1 < auth_body.find('source=Gulp-CalGulp-1.05')) | |
def testPostBodyShouldMatchLongExample(self): | |
auth_body = gdata.auth.GenerateClientLoginRequestBody('johndoe@gmail.com', | |
'north23AZ', 'cl', 'Gulp-CalGulp-1.05', | |
captcha_token='DQAAAGgA...dkI1', captcha_response='brinmar') | |
self.assert_(-1 < auth_body.find('logintoken=DQAAAGgA...dkI1')) | |
self.assert_(-1 < auth_body.find('logincaptcha=brinmar')) | |
def testEquivalenceWithOldLogic(self): | |
email = 'jo@gmail.com' | |
password = 'password' | |
account_type = 'HOSTED' | |
service = 'test' | |
source = 'auth test' | |
old_request_body = urllib.urlencode({'Email': email, | |
'Passwd': password, | |
'accountType': account_type, | |
'service': service, | |
'source': source}) | |
new_request_body = gdata.auth.GenerateClientLoginRequestBody(email, | |
password, service, source, account_type=account_type) | |
for parameter in old_request_body.split('&'): | |
self.assert_(-1 < new_request_body.find(parameter)) | |
class GenerateAuthSubUrlTest(unittest.TestCase): | |
def testDefaultParameters(self): | |
url = gdata.auth.GenerateAuthSubUrl('http://example.com/xyz?x=5', | |
'http://www.google.com/test/feeds') | |
self.assert_(-1 < url.find( | |
r'scope=http%3A%2F%2Fwww.google.com%2Ftest%2Ffeeds')) | |
self.assert_(-1 < url.find( | |
r'next=http%3A%2F%2Fexample.com%2Fxyz%3Fx%3D5')) | |
self.assert_(-1 < url.find('secure=0')) | |
self.assert_(-1 < url.find('session=1')) | |
def testAllParameters(self): | |
url = gdata.auth.GenerateAuthSubUrl('http://example.com/xyz?x=5', | |
'http://www.google.com/test/feeds', secure=True, session=False, | |
request_url='https://example.com/auth') | |
self.assert_(-1 < url.find( | |
r'scope=http%3A%2F%2Fwww.google.com%2Ftest%2Ffeeds')) | |
self.assert_(-1 < url.find( | |
r'next=http%3A%2F%2Fexample.com%2Fxyz%3Fx%3D5')) | |
self.assert_(-1 < url.find('secure=1')) | |
self.assert_(-1 < url.find('session=0')) | |
self.assert_(url.startswith('https://example.com/auth')) | |
class GenerateOAuthRequestTokenUrlTest(unittest.TestCase): | |
def testDefaultParameters(self): | |
oauth_input_params = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY, | |
rsa_key=RSA_KEY) | |
scopes = [ | |
'http://abcd.example.com/feeds', | |
'http://www.example.com/abcd/feeds' | |
] | |
url = gdata.auth.GenerateOAuthRequestTokenUrl( | |
oauth_input_params, scopes=scopes) | |
self.assertEquals('https', url.protocol) | |
self.assertEquals('www.google.com', url.host) | |
self.assertEquals('/accounts/OAuthGetRequestToken', url.path) | |
self.assertEquals('1.0', url.params['oauth_version']) | |
self.assertEquals('RSA-SHA1', url.params['oauth_signature_method']) | |
self.assert_(url.params['oauth_nonce']) | |
self.assert_(url.params['oauth_timestamp']) | |
actual_scopes = url.params['scope'].split(' ') | |
self.assertEquals(2, len(actual_scopes)) | |
for scope in actual_scopes: | |
self.assert_(scope in scopes) | |
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key']) | |
self.assert_(url.params['oauth_signature']) | |
def testAllParameters(self): | |
oauth_input_params = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY, | |
consumer_secret=CONSUMER_SECRET) | |
scopes = ['http://abcd.example.com/feeds'] | |
url = gdata.auth.GenerateOAuthRequestTokenUrl( | |
oauth_input_params, scopes=scopes, | |
request_token_url='https://www.example.com/accounts/OAuthRequestToken', | |
extra_parameters={'oauth_version': '2.0', 'my_param': 'my_value'}) | |
self.assertEquals('https', url.protocol) | |
self.assertEquals('www.example.com', url.host) | |
self.assertEquals('/accounts/OAuthRequestToken', url.path) | |
self.assertEquals('2.0', url.params['oauth_version']) | |
self.assertEquals('HMAC-SHA1', url.params['oauth_signature_method']) | |
self.assert_(url.params['oauth_nonce']) | |
self.assert_(url.params['oauth_timestamp']) | |
actual_scopes = url.params['scope'].split(' ') | |
self.assertEquals(1, len(actual_scopes)) | |
for scope in actual_scopes: | |
self.assert_(scope in scopes) | |
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key']) | |
self.assert_(url.params['oauth_signature']) | |
self.assertEquals('my_value', url.params['my_param']) | |
class GenerateOAuthAuthorizationUrlTest(unittest.TestCase): | |
def testDefaultParameters(self): | |
token_key = 'ABCDDSFFDSG' | |
token_secret = 'SDFDSGSDADADSAF' | |
request_token = gdata.auth.OAuthToken(key=token_key, secret=token_secret) | |
url = gdata.auth.GenerateOAuthAuthorizationUrl(request_token) | |
self.assertEquals('https', url.protocol) | |
self.assertEquals('www.google.com', url.host) | |
self.assertEquals('/accounts/OAuthAuthorizeToken', url.path) | |
self.assertEquals(token_key, url.params['oauth_token']) | |
def testAllParameters(self): | |
token_key = 'ABCDDSFFDSG' | |
token_secret = 'SDFDSGSDADADSAF' | |
scopes = [ | |
'http://abcd.example.com/feeds', | |
'http://www.example.com/abcd/feeds' | |
] | |
request_token = gdata.auth.OAuthToken(key=token_key, secret=token_secret, | |
scopes=scopes) | |
url = gdata.auth.GenerateOAuthAuthorizationUrl( | |
request_token, | |
authorization_url='https://www.example.com/accounts/OAuthAuthToken', | |
callback_url='http://www.yourwebapp.com/print', | |
extra_params={'permission': '1'}, | |
include_scopes_in_callback=True, scopes_param_prefix='token_scope') | |
self.assertEquals('https', url.protocol) | |
self.assertEquals('www.example.com', url.host) | |
self.assertEquals('/accounts/OAuthAuthToken', url.path) | |
self.assertEquals(token_key, url.params['oauth_token']) | |
expected_callback_url = ('http://www.yourwebapp.com/print?' | |
'token_scope=http%3A%2F%2Fabcd.example.com%2Ffeeds' | |
'+http%3A%2F%2Fwww.example.com%2Fabcd%2Ffeeds') | |
self.assertEquals(expected_callback_url, url.params['oauth_callback']) | |
class GenerateOAuthAccessTokenUrlTest(unittest.TestCase): | |
def testDefaultParameters(self): | |
token_key = 'ABCDDSFFDSG' | |
token_secret = 'SDFDSGSDADADSAF' | |
authorized_request_token = gdata.auth.OAuthToken(key=token_key, | |
secret=token_secret) | |
oauth_input_params = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY, | |
consumer_secret=CONSUMER_SECRET) | |
url = gdata.auth.GenerateOAuthAccessTokenUrl(authorized_request_token, | |
oauth_input_params) | |
self.assertEquals('https', url.protocol) | |
self.assertEquals('www.google.com', url.host) | |
self.assertEquals('/accounts/OAuthGetAccessToken', url.path) | |
self.assertEquals(token_key, url.params['oauth_token']) | |
self.assertEquals('1.0', url.params['oauth_version']) | |
self.assertEquals('HMAC-SHA1', url.params['oauth_signature_method']) | |
self.assert_(url.params['oauth_nonce']) | |
self.assert_(url.params['oauth_timestamp']) | |
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key']) | |
self.assert_(url.params['oauth_signature']) | |
def testAllParameters(self): | |
token_key = 'ABCDDSFFDSG' | |
authorized_request_token = gdata.auth.OAuthToken(key=token_key) | |
oauth_input_params = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY, | |
rsa_key=RSA_KEY) | |
url = gdata.auth.GenerateOAuthAccessTokenUrl( | |
authorized_request_token, oauth_input_params, | |
access_token_url='https://www.example.com/accounts/OAuthGetAccessToken', | |
oauth_version= '2.0') | |
self.assertEquals('https', url.protocol) | |
self.assertEquals('www.example.com', url.host) | |
self.assertEquals('/accounts/OAuthGetAccessToken', url.path) | |
self.assertEquals(token_key, url.params['oauth_token']) | |
self.assertEquals('2.0', url.params['oauth_version']) | |
self.assertEquals('RSA-SHA1', url.params['oauth_signature_method']) | |
self.assert_(url.params['oauth_nonce']) | |
self.assert_(url.params['oauth_timestamp']) | |
self.assertEquals(CONSUMER_KEY, url.params['oauth_consumer_key']) | |
self.assert_(url.params['oauth_signature']) | |
class ExtractAuthSubTokensTest(unittest.TestCase): | |
def testGetTokenFromUrl(self): | |
url = 'http://www.yourwebapp.com/showcalendar.html?token=CKF50YzIH' | |
self.assert_(gdata.auth.AuthSubTokenFromUrl(url) == | |
'AuthSub token=CKF50YzIH') | |
self.assert_(gdata.auth.TokenFromUrl(url) == 'CKF50YzIH') | |
url = 'http://www.yourwebapp.com/showcalendar.html?token==tokenCKF50YzIH=' | |
self.assert_(gdata.auth.AuthSubTokenFromUrl(url) == | |
'AuthSub token==tokenCKF50YzIH=') | |
self.assert_(gdata.auth.TokenFromUrl(url) == '=tokenCKF50YzIH=') | |
def testGetTokenFromHttpResponse(self): | |
response_body = ('Token=DQAA...7DCTN\r\n' | |
'Expiration=20061004T123456Z') | |
self.assert_(gdata.auth.AuthSubTokenFromHttpBody(response_body) == | |
'AuthSub token=DQAA...7DCTN') | |
class CreateAuthSubTokenFlowTest(unittest.TestCase): | |
def testGenerateRequest(self): | |
request_url = gdata.auth.generate_auth_sub_url(next='http://example.com', | |
scopes=['http://www.blogger.com/feeds/', | |
'http://www.google.com/base/feeds/']) | |
self.assertEquals(request_url.protocol, 'https') | |
self.assertEquals(request_url.host, 'www.google.com') | |
self.assertEquals(request_url.params['scope'], | |
'http://www.blogger.com/feeds/ http://www.google.com/base/feeds/') | |
self.assertEquals(request_url.params['hd'], 'default') | |
self.assert_(request_url.params['next'].find('auth_sub_scopes') > -1) | |
self.assert_(request_url.params['next'].startswith('http://example.com')) | |
# Use a more complicated 'next' URL. | |
request_url = gdata.auth.generate_auth_sub_url( | |
next='http://example.com/?token_scope=http://www.blogger.com/feeds/', | |
scopes=['http://www.blogger.com/feeds/', | |
'http://www.google.com/base/feeds/']) | |
self.assert_(request_url.params['next'].find('auth_sub_scopes') > -1) | |
self.assert_(request_url.params['next'].find('token_scope') > -1) | |
self.assert_(request_url.params['next'].startswith('http://example.com/')) | |
def testParseNextUrl(self): | |
url = ('http://example.com/?auth_sub_scopes=http%3A%2F%2Fwww.blogger.com' | |
'%2Ffeeds%2F+http%3A%2F%2Fwww.google.com%2Fbase%2Ffeeds%2F&' | |
'token=my_nifty_token') | |
token = gdata.auth.extract_auth_sub_token_from_url(url) | |
self.assertEquals(token.get_token_string(), 'my_nifty_token') | |
self.assert_(isinstance(token, gdata.auth.AuthSubToken)) | |
self.assert_(token.valid_for_scope('http://www.blogger.com/feeds/')) | |
self.assert_(token.valid_for_scope('http://www.google.com/base/feeds/')) | |
self.assert_( | |
not token.valid_for_scope('http://www.google.com/calendar/feeds/')) | |
# Parse a more complicated response. | |
url = ('http://example.com/?auth_sub_scopes=http%3A%2F%2Fwww.blogger.com' | |
'%2Ffeeds%2F+http%3A%2F%2Fwww.google.com%2Fbase%2Ffeeds%2F&' | |
'token_scope=http%3A%2F%2Fwww.blogger.com%2Ffeeds%2F&' | |
'token=second_token') | |
token = gdata.auth.extract_auth_sub_token_from_url(url) | |
self.assertEquals(token.get_token_string(), 'second_token') | |
self.assert_(isinstance(token, gdata.auth.AuthSubToken)) | |
self.assert_(token.valid_for_scope('http://www.blogger.com/feeds/')) | |
self.assert_(token.valid_for_scope('http://www.google.com/base/feeds/')) | |
self.assert_( | |
not token.valid_for_scope('http://www.google.com/calendar/feeds/')) | |
def testParseNextWithNoToken(self): | |
token = gdata.auth.extract_auth_sub_token_from_url('http://example.com/') | |
self.assert_(token is None) | |
token = gdata.auth.extract_auth_sub_token_from_url( | |
'http://example.com/?no_token=foo&other=1') | |
self.assert_(token is None) | |
class ExtractClientLoginTokenTest(unittest.TestCase): | |
def testExtractFromBodyWithScopes(self): | |
http_body_string = ('SID=DQAAAGgA7Zg8CTN\r\n' | |
'LSID=DQAAAGsAlk8BBbG\r\n' | |
'Auth=DQAAAGgAdk3fA5N') | |
token = gdata.auth.extract_client_login_token(http_body_string, | |
['http://docs.google.com/feeds/']) | |
self.assertEquals(token.get_token_string(), 'DQAAAGgAdk3fA5N') | |
self.assert_(isinstance(token, gdata.auth.ClientLoginToken)) | |
self.assert_(token.valid_for_scope('http://docs.google.com/feeds/')) | |
self.assert_(not token.valid_for_scope('http://www.blogger.com/feeds')) | |
class ExtractOAuthTokensTest(unittest.TestCase): | |
def testOAuthTokenFromUrl(self): | |
scope_1 = 'http://docs.google.com/feeds/' | |
scope_2 = 'http://www.blogger.com/feeds/' | |
# Case 1: token and scopes both are present. | |
url = ('http://dummy.com/?oauth_token_scope=http%3A%2F%2Fwww.blogger.com' | |
'%2Ffeeds%2F+http%3A%2F%2Fdocs.google.com%2Ffeeds%2F&' | |
'oauth_token=CMns6t7MCxDz__8B') | |
token = gdata.auth.OAuthTokenFromUrl(url) | |
self.assertEquals('CMns6t7MCxDz__8B', token.key) | |
self.assertEquals(2, len(token.scopes)) | |
self.assert_(scope_1 in token.scopes) | |
self.assert_(scope_2 in token.scopes) | |
# Case 2: token and scopes both are present but scope_param_prefix | |
# passed does not match the one present in the URL. | |
url = ('http://dummy.com/?oauth_token_scope=http%3A%2F%2Fwww.blogger.com' | |
'%2Ffeeds%2F+http%3A%2F%2Fdocs.google.com%2Ffeeds%2F&' | |
'oauth_token=CMns6t7MCxDz__8B') | |
token = gdata.auth.OAuthTokenFromUrl(url, | |
scopes_param_prefix='token_scope') | |
self.assertEquals('CMns6t7MCxDz__8B', token.key) | |
self.assert_(not token.scopes) | |
# Case 3: None present. | |
url = ('http://dummy.com/?no_oauth_token_scope=http%3A%2F%2Fwww.blogger.com' | |
'%2Ffeeds%2F+http%3A%2F%2Fdocs.google.com%2Ffeeds%2F&' | |
'no_oauth_token=CMns6t7MCxDz__8B') | |
token = gdata.auth.OAuthTokenFromUrl(url) | |
self.assert_(token is None) | |
def testOAuthTokenFromHttpBody(self): | |
token_key = 'ABCD' | |
token_secret = 'XYZ' | |
# Case 1: token key and secret both present single time. | |
http_body = 'oauth_token=%s&oauth_token_secret=%s' % (token_key, | |
token_secret) | |
token = gdata.auth.OAuthTokenFromHttpBody(http_body) | |
self.assertEquals(token_key, token.key) | |
self.assertEquals(token_secret, token.secret) | |
class OAuthInputParametersTest(unittest.TestCase): | |
def setUp(self): | |
self.oauth_input_parameters_hmac = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY, | |
consumer_secret=CONSUMER_SECRET) | |
self.oauth_input_parameters_rsa = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY, | |
rsa_key=RSA_KEY) | |
def testGetSignatureMethod(self): | |
self.assertEquals( | |
'HMAC-SHA1', | |
self.oauth_input_parameters_hmac.GetSignatureMethod().get_name()) | |
rsa_signature_method = self.oauth_input_parameters_rsa.GetSignatureMethod() | |
self.assertEquals('RSA-SHA1', rsa_signature_method.get_name()) | |
self.assertEquals(RSA_KEY, rsa_signature_method._fetch_private_cert(None)) | |
def testGetConsumer(self): | |
self.assertEquals(CONSUMER_KEY, | |
self.oauth_input_parameters_hmac.GetConsumer().key) | |
self.assertEquals(CONSUMER_KEY, | |
self.oauth_input_parameters_rsa.GetConsumer().key) | |
self.assertEquals(CONSUMER_SECRET, | |
self.oauth_input_parameters_hmac.GetConsumer().secret) | |
self.assert_(self.oauth_input_parameters_rsa.GetConsumer().secret is None) | |
class TokenClassesTest(unittest.TestCase): | |
def testClientLoginToAndFromString(self): | |
token = gdata.auth.ClientLoginToken() | |
token.set_token_string('foo') | |
self.assertEquals(token.get_token_string(), 'foo') | |
self.assertEquals(token.auth_header, '%s%s' % ( | |
gdata.auth.PROGRAMMATIC_AUTH_LABEL, 'foo')) | |
token.set_token_string(token.get_token_string()) | |
self.assertEquals(token.get_token_string(), 'foo') | |
def testAuthSubToAndFromString(self): | |
token = gdata.auth.AuthSubToken() | |
token.set_token_string('foo') | |
self.assertEquals(token.get_token_string(), 'foo') | |
self.assertEquals(token.auth_header, '%s%s' % ( | |
gdata.auth.AUTHSUB_AUTH_LABEL, 'foo')) | |
token.set_token_string(token.get_token_string()) | |
self.assertEquals(token.get_token_string(), 'foo') | |
def testSecureAuthSubToAndFromString(self): | |
# Case 1: no token. | |
token = gdata.auth.SecureAuthSubToken(RSA_KEY) | |
token.set_token_string('foo') | |
self.assertEquals(token.get_token_string(), 'foo') | |
token.set_token_string(token.get_token_string()) | |
self.assertEquals(token.get_token_string(), 'foo') | |
self.assertEquals(str(token), 'foo') | |
# Case 2: token is a string | |
token = gdata.auth.SecureAuthSubToken(RSA_KEY, token_string='foo') | |
self.assertEquals(token.get_token_string(), 'foo') | |
token.set_token_string(token.get_token_string()) | |
self.assertEquals(token.get_token_string(), 'foo') | |
self.assertEquals(str(token), 'foo') | |
def testOAuthToAndFromString(self): | |
token_key = 'ABCD' | |
token_secret = 'XYZ' | |
# Case 1: token key and secret both present single time. | |
token_string = 'oauth_token=%s&oauth_token_secret=%s' % (token_key, | |
token_secret) | |
token = gdata.auth.OAuthToken() | |
token.set_token_string(token_string) | |
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[0])) | |
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[1])) | |
self.assertEquals(token_key, token.key) | |
self.assertEquals(token_secret, token.secret) | |
# Case 2: token key and secret both present multiple times with unwanted | |
# parameters. | |
token_string = ('oauth_token=%s&oauth_token_secret=%s&' | |
'oauth_token=%s&ExtraParams=GarbageString' % (token_key, | |
token_secret, | |
'LMNO')) | |
token = gdata.auth.OAuthToken() | |
token.set_token_string(token_string) | |
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[0])) | |
self.assert_(-1 < token.get_token_string().find(token_string.split('&')[1])) | |
self.assertEquals(token_key, token.key) | |
self.assertEquals(token_secret, token.secret) | |
# Case 3: Only token key present. | |
token_string = 'oauth_token=%s' % (token_key,) | |
token = gdata.auth.OAuthToken() | |
token.set_token_string(token_string) | |
self.assertEquals(token_string, token.get_token_string()) | |
self.assertEquals(token_key, token.key) | |
self.assert_(not token.secret) | |
# Case 4: Only token key present. | |
token_string = 'oauth_token_secret=%s' % (token_secret,) | |
token = gdata.auth.OAuthToken() | |
token.set_token_string(token_string) | |
self.assertEquals(token_string, token.get_token_string()) | |
self.assertEquals(token_secret, token.secret) | |
self.assert_(not token.key) | |
# Case 5: None present. | |
token_string = '' | |
token = gdata.auth.OAuthToken() | |
token.set_token_string(token_string) | |
self.assert_(token.get_token_string() is None) | |
self.assert_(not token.key) | |
self.assert_(not token.secret) | |
def testSecureAuthSubGetAuthHeader(self): | |
# Case 1: Presence of OAuth token (in case of 3-legged OAuth) | |
url = 'http://dummy.com/?q=notebook&s=true' | |
token = gdata.auth.SecureAuthSubToken(RSA_KEY, token_string='foo') | |
auth_header = token.GetAuthHeader('GET', url) | |
self.assert_('Authorization' in auth_header) | |
header_value = auth_header['Authorization'] | |
self.assert_(header_value.startswith(r'AuthSub token="foo"')) | |
self.assert_(-1 < header_value.find(r'sigalg="rsa-sha1"')) | |
self.assert_(-1 < header_value.find(r'data="')) | |
self.assert_(-1 < header_value.find(r'sig="')) | |
m = re.search(r'data="(.*?)"', header_value) | |
self.assert_(m is not None) | |
data = m.group(1) | |
self.assert_(data.startswith('GET')) | |
self.assert_(-1 < data.find(url)) | |
def testOAuthGetAuthHeader(self): | |
# Case 1: Presence of OAuth token (in case of 3-legged OAuth) | |
oauth_input_params = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.RSA_SHA1, CONSUMER_KEY, | |
rsa_key=RSA_KEY) | |
token = gdata.auth.OAuthToken(key='ABCDDSFFDSG', | |
oauth_input_params=oauth_input_params) | |
auth_header = token.GetAuthHeader('GET', | |
'http://dummy.com/?q=notebook&s=true', | |
realm='http://dummy.com') | |
self.assert_('Authorization' in auth_header) | |
header_value = auth_header['Authorization'] | |
self.assert_(-1 < header_value.find(r'OAuth realm="http://dummy.com"')) | |
self.assert_(-1 < header_value.find(r'oauth_version="1.0"')) | |
self.assert_(-1 < header_value.find(r'oauth_token="ABCDDSFFDSG"')) | |
self.assert_(-1 < header_value.find(r'oauth_nonce="')) | |
self.assert_(-1 < header_value.find(r'oauth_timestamp="')) | |
self.assert_(-1 < header_value.find(r'oauth_signature="')) | |
self.assert_(-1 < header_value.find( | |
r'oauth_consumer_key="%s"' % CONSUMER_KEY)) | |
self.assert_(-1 < header_value.find(r'oauth_signature_method="RSA-SHA1"')) | |
# Case 2: Absence of OAuth token (in case of 2-legged OAuth) | |
oauth_input_params = gdata.auth.OAuthInputParams( | |
gdata.auth.OAuthSignatureMethod.HMAC_SHA1, CONSUMER_KEY, | |
consumer_secret=CONSUMER_SECRET) | |
token = gdata.auth.OAuthToken(oauth_input_params=oauth_input_params) | |
auth_header = token.GetAuthHeader( | |
'GET', 'http://dummy.com/?xoauth_requestor_id=user@gmail.com&q=book') | |
self.assert_('Authorization' in auth_header) | |
header_value = auth_header['Authorization'] | |
self.assert_(-1 < header_value.find(r'OAuth realm=""')) | |
self.assert_(-1 < header_value.find(r'oauth_version="1.0"')) | |
self.assertEquals(-1, header_value.find(r'oauth_token=')) | |
self.assert_(-1 < header_value.find(r'oauth_nonce="')) | |
self.assert_(-1 < header_value.find(r'oauth_timestamp="')) | |
self.assert_(-1 < header_value.find(r'oauth_signature="')) | |
self.assert_(-1 < header_value.find( | |
r'oauth_consumer_key="%s"' % CONSUMER_KEY)) | |
self.assert_(-1 < header_value.find(r'oauth_signature_method="HMAC-SHA1"')) | |
if __name__ == '__main__': | |
unittest.main() |