blob: e39cdfe63503584c8622f1c96bc837f7d77fdfc3 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Classes representing the monitoring interface for tasks or devices."""
import base64
import httplib2
import json
import logging
import socket
import traceback
from googleapiclient import discovery
from googleapiclient import errors
from infra_libs import httplib2_utils
from infra_libs.ts_mon.common import http_metrics
from infra_libs.ts_mon.common import pb_to_popo
from infra_libs.ts_mon.protos import metrics_pb2
from oauth2client import gce
from oauth2client.client import GoogleCredentials
from oauth2client.file import Storage
# Special string that can be passed through as the credentials path to use the
# default Appengine or GCE service account.
APPENGINE_CREDENTIALS = ':appengine'
GCE_CREDENTIALS = ':gce'
class Monitor(object):
"""Abstract base class encapsulating the ability to collect and send metrics.
This is a singleton class. There should only be one instance of a Monitor at
a time. It will be created and initialized by process_argparse_options. It
must exist in order for any metrics to be sent, although both Targets and
Metrics may be initialized before the underlying Monitor. If it does not exist
at the time that a Metric is sent, an exception will be raised.
"""
_SCOPES = []
@staticmethod
def _wrap_proto(data):
"""Normalize MetricsData, list(MetricsData), and MetricsCollection.
Args:
input: A MetricsData, list of MetricsData, or a MetricsCollection.
Returns:
A MetricsCollection with the appropriate data attribute set.
"""
if isinstance(data, metrics_pb2.MetricsCollection):
ret = data
elif isinstance(data, list):
ret = metrics_pb2.MetricsCollection(data=data)
else:
ret = metrics_pb2.MetricsCollection(data=[data])
return ret
def _load_credentials(self, credentials_file_path):
if credentials_file_path == GCE_CREDENTIALS:
return gce.AppAssertionCredentials(self._SCOPES)
if credentials_file_path == APPENGINE_CREDENTIALS: # pragma: no cover
# This import doesn't work outside appengine, so delay it until it's used.
from oauth2client import appengine
from google.appengine.api import app_identity
logging.info('Initializing with service account %s',
app_identity.get_service_account_name())
return appengine.AppAssertionCredentials(self._SCOPES)
with open(credentials_file_path, 'r') as credentials_file:
credentials_json = json.load(credentials_file)
if credentials_json.get('type', None):
credentials = GoogleCredentials.from_stream(credentials_file_path)
credentials = credentials.create_scoped(self._SCOPES)
return credentials
return Storage(credentials_file_path).get()
def send(self, metric_pb):
raise NotImplementedError()
class HttpsMonitor(Monitor):
_SCOPES = [
'https://www.googleapis.com/auth/prodxmon'
]
def __init__(self, endpoint, credentials_file_path, http=None):
self._endpoint = endpoint
credentials = self._load_credentials(credentials_file_path)
if http is None:
http = httplib2_utils.RetriableHttp(
httplib2_utils.InstrumentedHttp('acq-mon-api'))
self._http = credentials.authorize(http)
def encodeToJson(self, metric_pb):
return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) })
def send(self, metric_pb):
body = self.encodeToJson(self._wrap_proto(metric_pb))
try:
resp, content = self._http.request(self._endpoint, method='POST',
body=body)
if resp.status != 200:
logging.warning('HttpsMonitor.send received status %d: %s', resp.status,
content)
except (ValueError, errors.Error,
socket.timeout, socket.error, socket.herror, socket.gaierror,
httplib2.HttpLib2Error):
logging.warning('HttpsMonitor.send failed: %s\n',
traceback.format_exc())
class PubSubMonitor(Monitor):
"""Class which publishes metrics to a Cloud Pub/Sub topic."""
_SCOPES = [
'https://www.googleapis.com/auth/pubsub',
]
TIMEOUT = 10 # seconds
def _initialize(self):
creds = self._load_credentials(self._credsfile)
creds.authorize(self._http)
self._api = discovery.build('pubsub', 'v1', http=self._http)
def _update_init_metrics(self, status):
if not self._use_instrumented_http:
return
fields = {'name': 'acq-mon-api-pubsub',
'client': 'discovery',
'status': status}
http_metrics.response_status.increment(fields=fields)
def _check_initialize(self):
if self._api:
return True
try:
self._initialize()
except (ValueError, errors.Error,
socket.timeout, socket.error, socket.herror, socket.gaierror,
httplib2.HttpLib2Error, EnvironmentError):
# Log a warning, not error, to avoid false alarms in AppEngine apps.
logging.warning('PubSubMonitor._initialize failed:\n%s',
traceback.format_exc())
self._api = None
self._update_init_metrics(http_metrics.STATUS_ERROR)
return False
self._update_init_metrics(http_metrics.STATUS_OK)
return True
def __init__(self, credsfile, project, topic, use_instrumented_http=True):
"""Process monitoring related command line flags and initialize api.
Args:
credsfile (str): path to the credentials json file
project (str): the name of the Pub/Sub project to publish to.
topic (str): the name of the Pub/Sub topic to publish to.
use_instrumented_http (bool): whether to record monitoring metrics for
HTTP requests made to the pubsub API.
"""
# Do not call self._check_initialize() in the constructor. This
# class is constructed during app initialization on AppEngine, and
# network calls are especially flaky during that time.
self._api = None
self._use_instrumented_http = use_instrumented_http
if use_instrumented_http:
self._http = httplib2_utils.InstrumentedHttp(
'acq-mon-api-pubsub', timeout=self.TIMEOUT)
else:
self._http = httplib2.Http(timeout=self.TIMEOUT)
self._credsfile = credsfile
self._topic = 'projects/%s/topics/%s' % (project, topic)
def send(self, metric_pb):
"""Send a metric proto to the monitoring api.
Args:
metric_pb (MetricsData or MetricsCollection): the metric protobuf to send
"""
if not self._check_initialize():
return
proto = self._wrap_proto(metric_pb)
logging.debug('ts_mon: sending %d metrics to PubSub', len(proto.data))
body = {
'messages': [
{'data': base64.b64encode(proto.SerializeToString())},
],
}
# Occasionally, client fails to receive a proper internal JSON
# from the server and raises ValueError trying to parse it. Other
# times we may fail with a network error. This is not fatal, we'll
# resend metrics next time.
try:
self._api.projects().topics().publish(
topic=self._topic,
body=body).execute(num_retries=5)
except (ValueError, errors.Error,
socket.timeout, socket.error, socket.herror, socket.gaierror,
httplib2.HttpLib2Error):
# Log a warning, not error, to avoid false alarms in AppEngine apps.
logging.warning('PubSubMonitor.send failed:\n%s',
traceback.format_exc())
class DebugMonitor(Monitor):
"""Class which writes metrics to logs or a local file for debugging."""
def __init__(self, filepath=None):
if filepath is None:
self._fh = None
else:
self._fh = open(filepath, 'a')
def send(self, metric_pb):
text = str(self._wrap_proto(metric_pb))
logging.info('Flushing monitoring metrics:\n%s', text)
if self._fh is not None:
self._fh.write(text + '\n\n')
self._fh.flush()
class NullMonitor(Monitor):
"""Class that doesn't send metrics anywhere."""
def send(self, metric_pb):
pass