| #!/usr/bin/python |
| # |
| # Copyright (C) 2007 SIOS Technology, Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| __author__ = 'tmatsuo@sios.com (Takashi MATSUO)' |
| |
| try: |
| from xml.etree import cElementTree as ElementTree |
| except ImportError: |
| try: |
| import cElementTree as ElementTree |
| except ImportError: |
| try: |
| from xml.etree import ElementTree |
| except ImportError: |
| from elementtree import ElementTree |
| import urllib |
| import gdata |
| import atom.service |
| import gdata.service |
| import gdata.apps |
| import atom |
| |
| API_VER="2.0" |
| HTTP_OK=200 |
| |
| UNKOWN_ERROR=1000 |
| USER_DELETED_RECENTLY=1100 |
| USER_SUSPENDED=1101 |
| DOMAIN_USER_LIMIT_EXCEEDED=1200 |
| DOMAIN_ALIAS_LIMIT_EXCEEDED=1201 |
| DOMAIN_SUSPENDED=1202 |
| DOMAIN_FEATURE_UNAVAILABLE=1203 |
| ENTITY_EXISTS=1300 |
| ENTITY_DOES_NOT_EXIST=1301 |
| ENTITY_NAME_IS_RESERVED=1302 |
| ENTITY_NAME_NOT_VALID=1303 |
| INVALID_GIVEN_NAME=1400 |
| INVALID_FAMILY_NAME=1401 |
| INVALID_PASSWORD=1402 |
| INVALID_USERNAME=1403 |
| INVALID_HASH_FUNCTION_NAME=1404 |
| INVALID_HASH_DIGGEST_LENGTH=1405 |
| INVALID_EMAIL_ADDRESS=1406 |
| INVALID_QUERY_PARAMETER_VALUE=1407 |
| TOO_MANY_RECIPIENTS_ON_EMAIL_LIST=1500 |
| |
| DEFAULT_QUOTA_LIMIT='2048' |
| |
| |
| class Error(Exception): |
| pass |
| |
| |
| class AppsForYourDomainException(Error): |
| |
| def __init__(self, response): |
| |
| Error.__init__(self, response) |
| try: |
| self.element_tree = ElementTree.fromstring(response['body']) |
| self.error_code = int(self.element_tree[0].attrib['errorCode']) |
| self.reason = self.element_tree[0].attrib['reason'] |
| self.invalidInput = self.element_tree[0].attrib['invalidInput'] |
| except: |
| self.error_code = UNKOWN_ERROR |
| |
| |
| class AppsService(gdata.service.GDataService): |
| """Client for the Google Apps Provisioning service.""" |
| |
| def __init__(self, email=None, password=None, domain=None, source=None, |
| server='apps-apis.google.com', additional_headers=None, |
| **kwargs): |
| """Creates a client for the Google Apps Provisioning service. |
| |
| Args: |
| email: string (optional) The user's email address, used for |
| authentication. |
| password: string (optional) The user's password. |
| domain: string (optional) The Google Apps domain name. |
| source: string (optional) The name of the user's application. |
| server: string (optional) The name of the server to which a connection |
| will be opened. Default value: 'apps-apis.google.com'. |
| **kwargs: The other parameters to pass to gdata.service.GDataService |
| constructor. |
| """ |
| gdata.service.GDataService.__init__( |
| self, email=email, password=password, service='apps', source=source, |
| server=server, additional_headers=additional_headers, **kwargs) |
| self.ssl = True |
| self.port = 443 |
| self.domain = domain |
| |
| def _baseURL(self): |
| return "/a/feeds/%s" % self.domain |
| |
| def AddAllElementsFromAllPages(self, link_finder, func): |
| """retrieve all pages and add all elements""" |
| next = link_finder.GetNextLink() |
| while next is not None: |
| next_feed = self.Get(next.href, converter=func) |
| for a_entry in next_feed.entry: |
| link_finder.entry.append(a_entry) |
| next = next_feed.GetNextLink() |
| return link_finder |
| |
| def RetrievePageOfEmailLists(self, start_email_list_name=None, |
| num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, |
| backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve one page of email list""" |
| uri = "%s/emailList/%s" % (self._baseURL(), API_VER) |
| if start_email_list_name is not None: |
| uri += "?startEmailListName=%s" % start_email_list_name |
| try: |
| return gdata.apps.EmailListFeedFromString(str(self.GetWithRetries( |
| uri, num_retries=num_retries, delay=delay, backoff=backoff))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def GetGeneratorForAllEmailLists( |
| self, num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve a generator for all emaillists in this domain.""" |
| first_page = self.RetrievePageOfEmailLists(num_retries=num_retries, |
| delay=delay, |
| backoff=backoff) |
| return self.GetGeneratorFromLinkFinder( |
| first_page, gdata.apps.EmailListRecipientFeedFromString, |
| num_retries=num_retries, delay=delay, backoff=backoff) |
| |
| def RetrieveAllEmailLists(self): |
| """Retrieve all email list of a domain.""" |
| |
| ret = self.RetrievePageOfEmailLists() |
| # pagination |
| return self.AddAllElementsFromAllPages( |
| ret, gdata.apps.EmailListFeedFromString) |
| |
| def RetrieveEmailList(self, list_name): |
| """Retreive a single email list by the list's name.""" |
| |
| uri = "%s/emailList/%s/%s" % ( |
| self._baseURL(), API_VER, list_name) |
| try: |
| return self.Get(uri, converter=gdata.apps.EmailListEntryFromString) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def RetrieveEmailLists(self, recipient): |
| """Retrieve All Email List Subscriptions for an Email Address.""" |
| |
| uri = "%s/emailList/%s?recipient=%s" % ( |
| self._baseURL(), API_VER, recipient) |
| try: |
| ret = gdata.apps.EmailListFeedFromString(str(self.Get(uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| # pagination |
| return self.AddAllElementsFromAllPages( |
| ret, gdata.apps.EmailListFeedFromString) |
| |
| def RemoveRecipientFromEmailList(self, recipient, list_name): |
| """Remove recipient from email list.""" |
| |
| uri = "%s/emailList/%s/%s/recipient/%s" % ( |
| self._baseURL(), API_VER, list_name, recipient) |
| try: |
| self.Delete(uri) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def RetrievePageOfRecipients(self, list_name, start_recipient=None, |
| num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, |
| backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve one page of recipient of an email list. """ |
| |
| uri = "%s/emailList/%s/%s/recipient" % ( |
| self._baseURL(), API_VER, list_name) |
| |
| if start_recipient is not None: |
| uri += "?startRecipient=%s" % start_recipient |
| try: |
| return gdata.apps.EmailListRecipientFeedFromString(str( |
| self.GetWithRetries( |
| uri, num_retries=num_retries, delay=delay, backoff=backoff))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def GetGeneratorForAllRecipients( |
| self, list_name, num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve a generator for all recipients of a particular emaillist.""" |
| first_page = self.RetrievePageOfRecipients(list_name, |
| num_retries=num_retries, |
| delay=delay, |
| backoff=backoff) |
| return self.GetGeneratorFromLinkFinder( |
| first_page, gdata.apps.EmailListRecipientFeedFromString, |
| num_retries=num_retries, delay=delay, backoff=backoff) |
| |
| def RetrieveAllRecipients(self, list_name): |
| """Retrieve all recipient of an email list.""" |
| |
| ret = self.RetrievePageOfRecipients(list_name) |
| # pagination |
| return self.AddAllElementsFromAllPages( |
| ret, gdata.apps.EmailListRecipientFeedFromString) |
| |
| def AddRecipientToEmailList(self, recipient, list_name): |
| """Add a recipient to a email list.""" |
| |
| uri = "%s/emailList/%s/%s/recipient" % ( |
| self._baseURL(), API_VER, list_name) |
| recipient_entry = gdata.apps.EmailListRecipientEntry() |
| recipient_entry.who = gdata.apps.Who(email=recipient) |
| |
| try: |
| return gdata.apps.EmailListRecipientEntryFromString( |
| str(self.Post(recipient_entry, uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def DeleteEmailList(self, list_name): |
| """Delete a email list""" |
| |
| uri = "%s/emailList/%s/%s" % (self._baseURL(), API_VER, list_name) |
| try: |
| self.Delete(uri) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def CreateEmailList(self, list_name): |
| """Create a email list. """ |
| |
| uri = "%s/emailList/%s" % (self._baseURL(), API_VER) |
| email_list_entry = gdata.apps.EmailListEntry() |
| email_list_entry.email_list = gdata.apps.EmailList(name=list_name) |
| try: |
| return gdata.apps.EmailListEntryFromString( |
| str(self.Post(email_list_entry, uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def DeleteNickname(self, nickname): |
| """Delete a nickname""" |
| |
| uri = "%s/nickname/%s/%s" % (self._baseURL(), API_VER, nickname) |
| try: |
| self.Delete(uri) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def RetrievePageOfNicknames(self, start_nickname=None, |
| num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, |
| backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve one page of nicknames in the domain""" |
| |
| uri = "%s/nickname/%s" % (self._baseURL(), API_VER) |
| if start_nickname is not None: |
| uri += "?startNickname=%s" % start_nickname |
| try: |
| return gdata.apps.NicknameFeedFromString(str(self.GetWithRetries( |
| uri, num_retries=num_retries, delay=delay, backoff=backoff))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def GetGeneratorForAllNicknames( |
| self, num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve a generator for all nicknames in this domain.""" |
| first_page = self.RetrievePageOfNicknames(num_retries=num_retries, |
| delay=delay, |
| backoff=backoff) |
| return self.GetGeneratorFromLinkFinder( |
| first_page, gdata.apps.NicknameFeedFromString, num_retries=num_retries, |
| delay=delay, backoff=backoff) |
| |
| def RetrieveAllNicknames(self): |
| """Retrieve all nicknames in the domain""" |
| |
| ret = self.RetrievePageOfNicknames() |
| # pagination |
| return self.AddAllElementsFromAllPages( |
| ret, gdata.apps.NicknameFeedFromString) |
| |
| def GetGeneratorForAllNicknamesOfAUser( |
| self, user_name, num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve a generator for all nicknames of a particular user.""" |
| uri = "%s/nickname/%s?username=%s" % (self._baseURL(), API_VER, user_name) |
| try: |
| first_page = gdata.apps.NicknameFeedFromString(str(self.GetWithRetries( |
| uri, num_retries=num_retries, delay=delay, backoff=backoff))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| return self.GetGeneratorFromLinkFinder( |
| first_page, gdata.apps.NicknameFeedFromString, num_retries=num_retries, |
| delay=delay, backoff=backoff) |
| |
| def RetrieveNicknames(self, user_name): |
| """Retrieve nicknames of the user""" |
| |
| uri = "%s/nickname/%s?username=%s" % (self._baseURL(), API_VER, user_name) |
| try: |
| ret = gdata.apps.NicknameFeedFromString(str(self.Get(uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| # pagination |
| return self.AddAllElementsFromAllPages( |
| ret, gdata.apps.NicknameFeedFromString) |
| |
| def RetrieveNickname(self, nickname): |
| """Retrieve a nickname. |
| |
| Args: |
| nickname: string The nickname to retrieve |
| |
| Returns: |
| gdata.apps.NicknameEntry |
| """ |
| |
| uri = "%s/nickname/%s/%s" % (self._baseURL(), API_VER, nickname) |
| try: |
| return gdata.apps.NicknameEntryFromString(str(self.Get(uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def CreateNickname(self, user_name, nickname): |
| """Create a nickname""" |
| |
| uri = "%s/nickname/%s" % (self._baseURL(), API_VER) |
| nickname_entry = gdata.apps.NicknameEntry() |
| nickname_entry.login = gdata.apps.Login(user_name=user_name) |
| nickname_entry.nickname = gdata.apps.Nickname(name=nickname) |
| |
| try: |
| return gdata.apps.NicknameEntryFromString( |
| str(self.Post(nickname_entry, uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def DeleteUser(self, user_name): |
| """Delete a user account""" |
| |
| uri = "%s/user/%s/%s" % (self._baseURL(), API_VER, user_name) |
| try: |
| return self.Delete(uri) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def UpdateUser(self, user_name, user_entry): |
| """Update a user account.""" |
| |
| uri = "%s/user/%s/%s" % (self._baseURL(), API_VER, user_name) |
| try: |
| return gdata.apps.UserEntryFromString(str(self.Put(user_entry, uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def CreateUser(self, user_name, family_name, given_name, password, |
| suspended='false', quota_limit=None, |
| password_hash_function=None, |
| change_password=None): |
| """Create a user account. """ |
| |
| uri = "%s/user/%s" % (self._baseURL(), API_VER) |
| user_entry = gdata.apps.UserEntry() |
| user_entry.login = gdata.apps.Login( |
| user_name=user_name, password=password, suspended=suspended, |
| hash_function_name=password_hash_function, |
| change_password=change_password) |
| user_entry.name = gdata.apps.Name(family_name=family_name, |
| given_name=given_name) |
| if quota_limit is not None: |
| user_entry.quota = gdata.apps.Quota(limit=str(quota_limit)) |
| |
| try: |
| return gdata.apps.UserEntryFromString(str(self.Post(user_entry, uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def SuspendUser(self, user_name): |
| user_entry = self.RetrieveUser(user_name) |
| if user_entry.login.suspended != 'true': |
| user_entry.login.suspended = 'true' |
| user_entry = self.UpdateUser(user_name, user_entry) |
| return user_entry |
| |
| def RestoreUser(self, user_name): |
| user_entry = self.RetrieveUser(user_name) |
| if user_entry.login.suspended != 'false': |
| user_entry.login.suspended = 'false' |
| user_entry = self.UpdateUser(user_name, user_entry) |
| return user_entry |
| |
| def RetrieveUser(self, user_name): |
| """Retrieve an user account. |
| |
| Args: |
| user_name: string The user name to retrieve |
| |
| Returns: |
| gdata.apps.UserEntry |
| """ |
| |
| uri = "%s/user/%s/%s" % (self._baseURL(), API_VER, user_name) |
| try: |
| return gdata.apps.UserEntryFromString(str(self.Get(uri))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def RetrievePageOfUsers(self, start_username=None, |
| num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, |
| backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve one page of users in this domain.""" |
| |
| uri = "%s/user/%s" % (self._baseURL(), API_VER) |
| if start_username is not None: |
| uri += "?startUsername=%s" % start_username |
| try: |
| return gdata.apps.UserFeedFromString(str(self.GetWithRetries( |
| uri, num_retries=num_retries, delay=delay, backoff=backoff))) |
| except gdata.service.RequestError, e: |
| raise AppsForYourDomainException(e.args[0]) |
| |
| def GetGeneratorForAllUsers(self, |
| num_retries=gdata.service.DEFAULT_NUM_RETRIES, |
| delay=gdata.service.DEFAULT_DELAY, |
| backoff=gdata.service.DEFAULT_BACKOFF): |
| """Retrieve a generator for all users in this domain.""" |
| first_page = self.RetrievePageOfUsers(num_retries=num_retries, delay=delay, |
| backoff=backoff) |
| return self.GetGeneratorFromLinkFinder( |
| first_page, gdata.apps.UserFeedFromString, num_retries=num_retries, |
| delay=delay, backoff=backoff) |
| |
| def RetrieveAllUsers(self): |
| """Retrieve all users in this domain. OBSOLETE""" |
| |
| ret = self.RetrievePageOfUsers() |
| # pagination |
| return self.AddAllElementsFromAllPages( |
| ret, gdata.apps.UserFeedFromString) |
| |
| |
| class PropertyService(gdata.service.GDataService): |
| """Client for the Google Apps Property service.""" |
| |
| def __init__(self, email=None, password=None, domain=None, source=None, |
| server='apps-apis.google.com', additional_headers=None): |
| gdata.service.GDataService.__init__(self, email=email, password=password, |
| service='apps', source=source, |
| server=server, |
| additional_headers=additional_headers) |
| self.ssl = True |
| self.port = 443 |
| self.domain = domain |
| |
| def AddAllElementsFromAllPages(self, link_finder, func): |
| """retrieve all pages and add all elements""" |
| next = link_finder.GetNextLink() |
| while next is not None: |
| next_feed = self.Get(next.href, converter=func) |
| for a_entry in next_feed.entry: |
| link_finder.entry.append(a_entry) |
| next = next_feed.GetNextLink() |
| return link_finder |
| |
| def _GetPropertyEntry(self, properties): |
| property_entry = gdata.apps.PropertyEntry() |
| property = [] |
| for name, value in properties.iteritems(): |
| if name is not None and value is not None: |
| property.append(gdata.apps.Property(name=name, value=value)) |
| property_entry.property = property |
| return property_entry |
| |
| def _PropertyEntry2Dict(self, property_entry): |
| properties = {} |
| for i, property in enumerate(property_entry.property): |
| properties[property.name] = property.value |
| return properties |
| |
| def _GetPropertyFeed(self, uri): |
| try: |
| return gdata.apps.PropertyFeedFromString(str(self.Get(uri))) |
| except gdata.service.RequestError, e: |
| raise gdata.apps.service.AppsForYourDomainException(e.args[0]) |
| |
| def _GetPropertiesList(self, uri): |
| property_feed = self._GetPropertyFeed(uri) |
| # pagination |
| property_feed = self.AddAllElementsFromAllPages( |
| property_feed, gdata.apps.PropertyFeedFromString) |
| properties_list = [] |
| for property_entry in property_feed.entry: |
| properties_list.append(self._PropertyEntry2Dict(property_entry)) |
| return properties_list |
| |
| def _GetProperties(self, uri): |
| try: |
| return self._PropertyEntry2Dict(gdata.apps.PropertyEntryFromString( |
| str(self.Get(uri)))) |
| except gdata.service.RequestError, e: |
| raise gdata.apps.service.AppsForYourDomainException(e.args[0]) |
| |
| def _PostProperties(self, uri, properties): |
| property_entry = self._GetPropertyEntry(properties) |
| try: |
| return self._PropertyEntry2Dict(gdata.apps.PropertyEntryFromString( |
| str(self.Post(property_entry, uri)))) |
| except gdata.service.RequestError, e: |
| raise gdata.apps.service.AppsForYourDomainException(e.args[0]) |
| |
| def _PutProperties(self, uri, properties): |
| property_entry = self._GetPropertyEntry(properties) |
| try: |
| return self._PropertyEntry2Dict(gdata.apps.PropertyEntryFromString( |
| str(self.Put(property_entry, uri)))) |
| except gdata.service.RequestError, e: |
| raise gdata.apps.service.AppsForYourDomainException(e.args[0]) |
| |
| def _DeleteProperties(self, uri): |
| try: |
| self.Delete(uri) |
| except gdata.service.RequestError, e: |
| raise gdata.apps.service.AppsForYourDomainException(e.args[0]) |
| |
| |
| def _bool2str(b): |
| if b is None: |
| return None |
| return str(b is True).lower() |