| #!/usr/bin/env python |
| # Copyright 2013 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """Client tool to perform various authentication related tasks.""" |
| |
| __version__ = '0.4' |
| |
| import logging |
| import optparse |
| import sys |
| |
| from utils import tools |
| tools.force_local_third_party() |
| |
| # third_party/ |
| import colorama |
| from depot_tools import fix_encoding |
| from depot_tools import subcommand |
| |
| # pylint: disable=ungrouped-imports |
| from utils import logging_utils |
| from utils import on_error |
| from utils import net |
| from utils import oauth |
| from utils import subprocess42 |
| |
| |
| class AuthServiceError(Exception): |
| """Unexpected response from authentication service.""" |
| |
| |
| class AuthService: |
| """Represents remote Authentication service.""" |
| |
| def __init__(self, url): |
| self._service = net.get_http_service(url) |
| |
| def login(self, allow_user_interaction): |
| """Refreshes cached access token or creates a new one.""" |
| return self._service.login(allow_user_interaction) |
| |
| def logout(self): |
| """Purges cached access token.""" |
| return self._service.logout() |
| |
| def get_current_identity(self): |
| """Returns identity associated with currently used credentials. |
| |
| Identity is a string: |
| user:<email> - if using OAuth or cookie based authentication. |
| bot:<id> - if using HMAC based authentication. |
| anonymous:anonymous - if not authenticated. |
| """ |
| identity = self._service.json_request('/auth/api/v1/accounts/self') |
| if not identity: |
| raise AuthServiceError('Failed to fetch identity') |
| return identity['identity'] |
| |
| |
| def add_auth_options(parser): |
| """Adds command line options related to authentication.""" |
| oauth.add_oauth_options(parser) |
| |
| |
| def process_auth_options(parser, options): |
| """Configures process-wide authentication parameters based on |options|.""" |
| try: |
| net.set_oauth_config(oauth.extract_oauth_config_from_options(options)) |
| except ValueError as exc: |
| parser.error(str(exc)) |
| |
| |
| def normalize_host_url(url): |
| """Makes sure URL starts with http:// or https://.""" |
| url = url.lower().rstrip('/') |
| if url.startswith('https://'): |
| return url |
| if url.startswith('http://'): |
| allowed = ('http://localhost:', 'http://127.0.0.1:', 'http://[::1]:') |
| if not url.startswith(allowed): |
| raise ValueError( |
| 'URL must start with https:// or be on localhost with port number') |
| return url |
| return 'https://' + url |
| |
| |
| def ensure_logged_in(server_url): |
| """Checks that user is logged in, asking to do it if not. |
| |
| Raises: |
| ValueError if the server_url is not acceptable. |
| """ |
| # It's just a waste of time on a headless bot (it can't do interactive login). |
| if tools.is_headless() or net.get_oauth_config().disabled: |
| return None |
| server_url = normalize_host_url(server_url) |
| service = AuthService(server_url) |
| try: |
| service.login(False) |
| except IOError: |
| raise ValueError('Failed to contact %s' % server_url) |
| try: |
| identity = service.get_current_identity() |
| except AuthServiceError: |
| raise ValueError('Failed to fetch identify from %s' % server_url) |
| if identity == 'anonymous:anonymous': |
| raise ValueError( |
| 'Please login to %s: \n' |
| ' python auth.py login --service=%s' % (server_url, server_url)) |
| email = identity.split(':')[1] |
| logging.info('Logged in to %s: %s', server_url, email) |
| return email |
| |
| |
| @subcommand.usage('[options]') |
| def CMDlogin(parser, args): |
| """Runs interactive login flow and stores auth token/cookie on disk.""" |
| (options, args) = parser.parse_args(args) |
| process_auth_options(parser, options) |
| service = AuthService(options.service) |
| if service.login(True): |
| print('Logged in as \'%s\'.' % service.get_current_identity()) |
| return 0 |
| print('Login failed or canceled.') |
| return 1 |
| |
| |
| @subcommand.usage('[options]') |
| def CMDlogout(parser, args): |
| """Purges cached auth token/cookie.""" |
| (options, args) = parser.parse_args(args) |
| process_auth_options(parser, options) |
| service = AuthService(options.service) |
| service.logout() |
| return 0 |
| |
| |
| @subcommand.usage('[options]') |
| def CMDcheck(parser, args): |
| """Shows identity associated with currently cached auth token/cookie.""" |
| (options, args) = parser.parse_args(args) |
| process_auth_options(parser, options) |
| service = AuthService(options.service) |
| service.login(False) |
| print(service.get_current_identity()) |
| return 0 |
| |
| |
| class OptionParserAuth(logging_utils.OptionParserWithLogging): |
| |
| def __init__(self, **kwargs): |
| logging_utils.OptionParserWithLogging.__init__( |
| self, prog='auth.py', **kwargs) |
| self.server_group = optparse.OptionGroup(self, 'Server') |
| self.server_group.add_option( |
| '-S', '--service', metavar='URL', default='', help='Service to use') |
| self.add_option_group(self.server_group) |
| add_auth_options(self) |
| |
| def parse_args(self, *args, **kwargs): |
| options, args = logging_utils.OptionParserWithLogging.parse_args( |
| self, *args, **kwargs) |
| if not options.service: |
| self.error('--service is required.') |
| try: |
| options.service = normalize_host_url(options.service) |
| except ValueError as exc: |
| self.error(str(exc)) |
| return options, args |
| |
| |
| def main(args): |
| dispatcher = subcommand.CommandDispatcher(__name__) |
| return dispatcher.execute(OptionParserAuth(version=__version__), args) |
| |
| |
| if __name__ == '__main__': |
| subprocess42.inhibit_os_error_reporting() |
| fix_encoding.fix_encoding() |
| tools.disable_buffering() |
| colorama.init() |
| sys.exit(main(sys.argv[1:])) |