blob: f72552faf65ea53eb173bc9e2fa2ab97624a2310 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
import copy
import json
import logging
import os
import re
import socket
import sys
import time
from six.moves import http_client
from six.moves import urllib
from six import string_types
import httplib2
import oauth2client.client
from googleapiclient import errors
from infra_libs.ts_mon.common import http_metrics
from infra_libs.utils import parse_rfc3339_epoch
# default timeout for http requests, in seconds
DEFAULT_TIMEOUT = 30
# This is part of the API.
if sys.platform.startswith('win'): # pragma: no cover
SERVICE_ACCOUNTS_CREDS_ROOT = 'C:\\creds\\service_accounts'
else:
SERVICE_ACCOUNTS_CREDS_ROOT = '/creds/service_accounts'
class AuthError(Exception):
pass
class DelegateServiceAccountCredentials(
oauth2client.client.OAuth2Credentials):
"""Authorizes an HTTP client with a service account for which we are an actor.
This class uses the IAM:generateAccessToken API to obtain an access token for
a service account for which we have the "Service Account Token Creator" role.
"""
def __init__(self, http, service_account_email, scopes, project='-'):
"""
Args:
http: An httplib2.Http object that is authorized by another
oauth2client.client.OAuth2Credentials with credentials that have the
service account token creator role on the service_account_email.
service_account_email: The email address of the service account for which
to obtain an access token.
scopes: The desired scopes for the token.
project: The cloud project to which service_account_email belongs. The
default of '-' makes the IAM API figure it out for us.
"""
super(DelegateServiceAccountCredentials, self).__init__(
None, None, None, None, None, None, None)
self._http = http
self._sa_email = service_account_email
self._scopes = self._canonicalize_scopes(scopes)
self._project = project
self.token_expiry = None
self.access_token = None
def _canonicalize_scopes(self, scopes):
if isinstance(scopes, string_types):
return [scopes]
return scopes
def _refresh(self, _):
# Use IAM:generateAccessToken API to create an access token
req = {
"scope": self._scopes,
}
response, content = (
self._http.request(
uri='https://iamcredentials.googleapis.com/v1/projects/%s/'
'serviceAccounts/%s:generateAccessToken' % (urllib.parse.quote_plus(
self._project), urllib.parse.quote_plus(self._sa_email)),
method='POST',
body=json.dumps(req),
headers={
'Content-Type': 'application/json',
},
))
if response.status != 200:
msg = ('Unable to generate access token,'
'http error code %d, content: %s'
% (response.status, content))
logging.error(msg)
raise AuthError('Code: %d, Reason: %s'
% (response.status, response.reason))
resp = json.loads(content)
self.access_token = resp['accessToken']
self.token_expiry = parse_rfc3339_epoch(resp['expireTime'])
class RetriableHttp(object):
"""A httplib2.Http object that retries on failure."""
def __init__(self, http, max_tries=5, backoff_time=1,
retrying_statuses_fn=None):
"""
Args:
http: an httplib2.Http instance
max_tries: a number of maximum tries
backoff_time: a number of seconds to sleep between retries
retrying_statuses_fn: a function that returns True if a given status
should be retried
"""
self._http = http
self._max_tries = max_tries
self._backoff_time = backoff_time
self._retrying_statuses_fn = retrying_statuses_fn or \
set(range(500,599)).__contains__
def request(self, uri, method='GET', body=None, *args, **kwargs):
for i in range(1, self._max_tries + 1):
try:
response, content = self._http.request(uri, method, body, *args,
**kwargs)
if self._retrying_statuses_fn(response.status):
logging.info('RetriableHttp: attempt %d receiving status %d, %s',
i, response.status,
'final attempt' if i == self._max_tries else \
'will retry')
else:
break
except (ValueError, errors.Error,
socket.timeout, socket.error, socket.herror, socket.gaierror,
httplib2.HttpLib2Error) as error:
logging.info('RetriableHttp: attempt %d received exception: %s, %s',
i, error, 'final attempt' if i == self._max_tries else \
'will retry')
if i == self._max_tries:
raise
time.sleep(self._backoff_time)
return response, content
def __getattr__(self, name):
return getattr(self._http, name)
def __setattr__(self, name, value):
if name in ('request', '_http', '_max_tries', '_backoff_time',
'_retrying_statuses_fn'):
self.__dict__[name] = value
else:
setattr(self._http, name, value)
class InstrumentedHttp(httplib2.Http):
"""A httplib2.Http object that reports ts_mon metrics about its requests."""
def __init__(self, name, time_fn=time.time, timeout=DEFAULT_TIMEOUT,
**kwargs):
"""
Args:
name: An identifier for the HTTP requests made by this object.
time_fn: Function returning the current time in seconds. Use for testing
purposes only.
"""
super(InstrumentedHttp, self).__init__(timeout=timeout, **kwargs)
self.fields = {'name': name, 'client': 'httplib2'}
self.time_fn = time_fn
def _update_metrics(self, status, start_time):
status_fields = {'status': status}
status_fields.update(self.fields)
http_metrics.response_status.increment(fields=status_fields)
duration_msec = (self.time_fn() - start_time) * 1000
http_metrics.durations.add(duration_msec, fields=self.fields)
def request(self, uri, method="GET", body=None, *args, **kwargs):
request_bytes = 0
if body is not None:
request_bytes = len(body)
http_metrics.request_bytes.add(request_bytes, fields=self.fields)
start_time = self.time_fn()
try:
response, content = super(InstrumentedHttp, self).request(
uri, method, body, *args, **kwargs)
except socket.timeout:
self._update_metrics(http_metrics.STATUS_TIMEOUT, start_time)
raise
except (socket.error, socket.herror, socket.gaierror):
self._update_metrics(http_metrics.STATUS_ERROR, start_time)
raise
except (http_client.HTTPException, httplib2.HttpLib2Error) as ex:
status = http_metrics.STATUS_EXCEPTION
if 'Deadline exceeded while waiting for HTTP response' in str(ex):
# Raised on Appengine (gae_override/httplib.py).
status = http_metrics.STATUS_TIMEOUT
self._update_metrics(status, start_time)
raise
http_metrics.response_bytes.add(len(content), fields=self.fields)
self._update_metrics(response.status, start_time)
return response, content
class HttpMock(object):
"""Mock of httplib2.Http"""
HttpCall = collections.namedtuple('HttpCall', ('uri', 'method', 'body',
'headers'))
def __init__(self, uris):
"""
Args:
uris(dict): list of (uri, headers, body). `uri` is a regexp for
matching the requested uri, (headers, body) gives the values returned
by the mock. Uris are tested in the order from `uris`.
`headers` is a dict mapping headers to value. The 'status' key is
mandatory. `body` is a string.
Ex: [('.*', {'status': 200}, 'nicely done.')]
"""
self._uris = []
self.requests_made = []
for value in uris:
if not isinstance(value, (list, tuple)) or len(value) != 3:
raise ValueError("'uris' must be a sequence of (uri, headers, body)")
uri, headers, body = value
compiled_uri = re.compile(uri)
if not isinstance(headers, dict):
raise TypeError("'headers' must be a dict")
if not 'status' in headers:
raise ValueError("'headers' must have 'status' as a key")
new_headers = copy.copy(headers)
new_headers['status'] = int(new_headers['status'])
if not isinstance(body, string_types):
raise TypeError("'body' must be a string, got %s" % type(body))
self._uris.append((compiled_uri, new_headers, body))
# pylint: disable=unused-argument
def request(self, uri,
method='GET',
body=None,
headers=None,
redirections=1,
connection_type=None):
self.requests_made.append(self.HttpCall(uri, method, body, headers))
headers = None
body = None
for candidate in self._uris:
if candidate[0].match(uri):
_, headers, body = candidate
break
if not headers:
raise AssertionError("Unexpected request to %s" % uri)
return httplib2.Response(headers), body