blob: 635ef53dced563c33bbca0bf1d9d29ad4ec40249 [file] [log] [blame]
# Copyright 2023 The Chromium Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common utils for issue tracker client."""
import functools
import json
import logging
import os
from flask import request as flask_request, make_response
import httplib2
import google.auth
import google_auth_httplib2
TOKEN_INFO_ENDPOINT = 'https://oauth2.googleapis.com/tokeninfo'
EMAIL_SCOPE = 'https://www.googleapis.com/auth/userinfo.email'
_STAGING_APP_ID = 'chromeperf-stage'
def IsStagingEnvironment():
return os.environ.get('GOOGLE_CLOUD_PROJECT') == _STAGING_APP_ID
def AllowList():
if IsStagingEnvironment():
return {'chromeperf-stage@appspot.gserviceaccount.com'}
return {'chromeperf@appspot.gserviceaccount.com'}
def ServiceAccountHttp(scope=EMAIL_SCOPE, timeout=None):
"""Returns the Credentials of the service account if available."""
assert scope, "ServiceAccountHttp scope must not be None."
credentials = _GetAppDefaultCredentials(scope)
http = google_auth_httplib2.AuthorizedHttp(credentials)
if timeout:
http.timeout = timeout
return http
def _GetAppDefaultCredentials(scope=None):
try:
credentials, _ = google.auth.default()
if scope and credentials.requires_scopes:
credentials = credentials.with_scopes([scope])
return credentials
except google.auth.exceptions.DefaultCredentialsError as e:
logging.error('Error when getting the application default credentials: %s',
str(e))
return None
def AuthorizeBearerToken(request):
"""
Verifies that the request is authorized by checking the email info from
the bearer token.
"""
# Check that the request includes the `Authorization` header.
if "Authorization" not in request.headers:
logging.warning('Missing "Authorization" header in the request.')
return False
access_token = request.headers["Authorization"].split("Bearer ")[1]
token_info_url = '%s?bearer_token=%s' % (TOKEN_INFO_ENDPOINT, access_token)
response, content = httplib2.Http(timeout=30).request(token_info_url)
if response.status != 200:
logging.warning(
'Invalid response status from token info endpoint: %s. Url: %s',
response, token_info_url)
return False
content_dict = json.loads(content)
email = content_dict.get('email')
if email and content_dict.get('email_verified') and email in AllowList():
return True
logging.warning('No valid email is found in request token.')
return False
def BearerTokenAuthorizer(wrapped_handler):
@functools.wraps(wrapped_handler)
def Wrapper(*args, **kwargs):
if not AuthorizeBearerToken(flask_request):
return make_response('Failed to validate the incoming request.', 403)
return wrapped_handler(*args, **kwargs)
return Wrapper