Add HttpsMonitor
BUG=625226
R=sergeyberezin@chromium.org
Review-Url: https://codereview.chromium.org/2114073002
Cr-Mirrored-From: https://chromium.googlesource.com/infra/infra
Cr-Mirrored-Commit: 2f08d02cb4cb2bff8b9c8885487792cc213675cf
diff --git a/__init__.py b/__init__.py
index 7257843..8af1717 100644
--- a/__init__.py
+++ b/__init__.py
@@ -6,7 +6,7 @@
from infra_libs.httplib2_utils import AuthError
from infra_libs.httplib2_utils import get_authenticated_http
from infra_libs.httplib2_utils import get_signed_jwt_assertion_credentials
-from infra_libs.httplib2_utils import InstrumentedHttp, HttpMock
+from infra_libs.httplib2_utils import RetriableHttp, InstrumentedHttp, HttpMock
from infra_libs.httplib2_utils import SERVICE_ACCOUNTS_CREDS_ROOT
from infra_libs.utils import read_json_as_utf8
from infra_libs.utils import rmtree
diff --git a/httplib2_utils.py b/httplib2_utils.py
index fcb9fe3..ff8a985 100644
--- a/httplib2_utils.py
+++ b/httplib2_utils.py
@@ -15,6 +15,7 @@
import httplib2
import oauth2client.client
+from googleapiclient import errors
from infra_libs.ts_mon.common import http_metrics
DEFAULT_SCOPES = ['email']
@@ -161,6 +162,46 @@
http = httplib2.Http(timeout=timeout)
return creds.authorize(http)
+class RetriableHttp(httplib2.Http):
+ """A httplib2.Http object that retries on failure."""
+
+ def __init__(self, max_tries=5, retrying_statuses_fn=None, **kwargs):
+ """
+ Args:
+ http_obj: an httplib2.Http instance
+ max_tries: a number of maximum tries
+ retrying_statuses_fn: a function that returns True if a given status
+ should be retried
+ """
+ super(RetriableHttp, self).__init__(**kwargs)
+ self._max_tries = max_tries
+ self._retrying_statuses_fn = retrying_statuses_fn or \
+ set(range(500,599)).__contains__
+
+ def request(self, uri, method='GET', body=None, *args, **kwargs):
+ for i in range(1, self._max_tries + 1):
+ try:
+ response, content = super(RetriableHttp, self).request(
+ uri, method, body, *args, **kwargs)
+
+ if self._retrying_statuses_fn(response.status):
+ logging.info('RetriableHttp: attempt %d receiving status %d, %s',
+ i, response.status,
+ 'final attempt' if i == self._max_tries else \
+ 'will retry')
+ else:
+ break
+ except (ValueError, errors.Error,
+ socket.timeout, socket.error, socket.herror, socket.gaierror,
+ httplib2.HttpLib2Error) as error:
+ logging.info('RetriableHttp: attempt %d received exception: %s, %s',
+ i, error, 'final attempt' if i == self._max_tries else \
+ 'will retry')
+ if i == self._max_tries:
+ raise
+
+ return response, content
+
class InstrumentedHttp(httplib2.Http):
"""A httplib2.Http object that reports ts_mon metrics about its requests."""
diff --git a/test/httplib2_utils_test.py b/test/httplib2_utils_test.py
index 0725656..0c6bdc8 100644
--- a/test/httplib2_utils_test.py
+++ b/test/httplib2_utils_test.py
@@ -129,6 +129,43 @@
'creds_malformed.json',
service_accounts_creds_root=DATA_DIR)
+class RetriableHttplib2Test(unittest.TestCase):
+ def setUp(self):
+ super(RetriableHttplib2Test, self).setUp()
+ self.http = infra_libs.RetriableHttp()
+ self.http._request = mock.create_autospec(self.http._request, spec_set=True)
+
+ _MOCK_REQUEST = mock.call(*([mock.ANY] * 9))
+
+ def test_succeed(self):
+ self.http._request.return_value = (
+ httplib2.Response({'status': 400}), 'content')
+ response, _ = self.http.request('http://foo/')
+ self.assertEqual(400, response.status)
+ self.http._request.assert_has_calls([ self._MOCK_REQUEST ])
+
+ def test_retry_succeed(self):
+ self.http._request.side_effect = iter([
+ (httplib2.Response({'status': 500}), 'content'),
+ httplib2.HttpLib2Error,
+ (httplib2.Response({'status': 200}), 'content')
+ ])
+ response, _ = self.http.request('http://foo/')
+ self.assertEqual(200, response.status)
+ self.http._request.assert_has_calls([ self._MOCK_REQUEST ] * 3)
+
+ def test_fail_exception(self):
+ self.http._request.side_effect = httplib2.HttpLib2Error()
+ self.assertRaises(httplib2.HttpLib2Error, self.http.request, 'http://foo/')
+ self.http._request.assert_has_calls([ self._MOCK_REQUEST ] * 5)
+
+ def test_fail_status_code(self):
+ self.http._request.return_value = (
+ httplib2.Response({'status': 500}), 'content')
+ response, _ = self.http.request('http://foo/')
+ self.assertEqual(500, response.status)
+ self.http._request.assert_has_calls([ self._MOCK_REQUEST ] * 5)
+
class InstrumentedHttplib2Test(unittest.TestCase):
def setUp(self):
diff --git a/ts_mon/common/monitors.py b/ts_mon/common/monitors.py
index 110508a..bed8f18 100644
--- a/ts_mon/common/monitors.py
+++ b/ts_mon/common/monitors.py
@@ -6,6 +6,7 @@
import base64
+import httplib2
import json
import logging
import socket
@@ -13,15 +14,13 @@
from googleapiclient import discovery
from googleapiclient import errors
+from infra_libs import httplib2_utils
+from infra_libs.ts_mon.common import http_metrics
+from infra_libs.ts_mon.common import pb_to_popo
+from infra_libs.ts_mon.protos import metrics_pb2
from oauth2client import gce
from oauth2client.client import GoogleCredentials
from oauth2client.file import Storage
-import httplib2
-
-from infra_libs import httplib2_utils
-from infra_libs.ts_mon.common import http_metrics
-from infra_libs.ts_mon.protos import metrics_pb2
-
# Special string that can be passed through as the credentials path to use the
# default Appengine or GCE service account.
@@ -38,6 +37,9 @@
Metrics may be initialized before the underlying Monitor. If it does not exist
at the time that a Metric is sent, an exception will be raised.
"""
+
+ _SCOPES = []
+
@staticmethod
def _wrap_proto(data):
"""Normalize MetricsData, list(MetricsData), and MetricsCollection.
@@ -56,19 +58,6 @@
ret = metrics_pb2.MetricsCollection(data=[data])
return ret
- def send(self, metric_pb):
- raise NotImplementedError()
-
-
-class PubSubMonitor(Monitor):
- """Class which publishes metrics to a Cloud Pub/Sub topic."""
-
- _SCOPES = [
- 'https://www.googleapis.com/auth/pubsub',
- ]
-
- TIMEOUT = 10 # seconds
-
def _load_credentials(self, credentials_file_path):
if credentials_file_path == GCE_CREDENTIALS:
return gce.AppAssertionCredentials(self._SCOPES)
@@ -88,6 +77,51 @@
return credentials
return Storage(credentials_file_path).get()
+ def send(self, metric_pb):
+ raise NotImplementedError()
+
+class HttpsMonitor(Monitor):
+
+ _SCOPES = [
+ 'https://www.googleapis.com/auth/prodxmon'
+ ]
+
+ def __init__(self, endpoint, credentials_file_path, http=None):
+ self._endpoint = endpoint
+ credentials = self._load_credentials(credentials_file_path)
+ if http is None:
+ http = httplib2_utils.RetriableHttp(
+ httplib2_utils.InstrumentedHttp('acq-mon-api'))
+ self._http = credentials.authorize(http)
+
+ def encodeToJson(self, metric_pb):
+ return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) })
+
+ def send(self, metric_pb):
+ body = self.encodeToJson(self._wrap_proto(metric_pb))
+
+ try:
+ resp, content = self._http.request(self._endpoint, method='POST',
+ body=body)
+ if resp.status != 200:
+ logging.warning('HttpsMonitor.send received status %d: %s', resp,
+ content)
+ except (ValueError, errors.Error,
+ socket.timeout, socket.error, socket.herror, socket.gaierror,
+ httplib2.HttpLib2Error):
+ logging.warning('HttpsMonitor.send failed: %s\n',
+ traceback.format_exc())
+
+
+class PubSubMonitor(Monitor):
+ """Class which publishes metrics to a Cloud Pub/Sub topic."""
+
+ _SCOPES = [
+ 'https://www.googleapis.com/auth/pubsub',
+ ]
+
+ TIMEOUT = 10 # seconds
+
def _initialize(self):
creds = self._load_credentials(self._credsfile)
creds.authorize(self._http)
diff --git a/ts_mon/common/pb_to_popo.py b/ts_mon/common/pb_to_popo.py
new file mode 100644
index 0000000..8266f9a
--- /dev/null
+++ b/ts_mon/common/pb_to_popo.py
@@ -0,0 +1,43 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+
+from google.protobuf.descriptor import FieldDescriptor as fd
+
+def convert(pb):
+ """Convert protobuf to plain-old-python-object"""
+ obj = {}
+ for field, value in pb.ListFields():
+ if field.label == fd.LABEL_REPEATED:
+ obj[field.name] = list(_get_json_func(field.type)(v) for v in value)
+ else:
+ obj[field.name] = _get_json_func(field.type)(value)
+ return obj
+
+def _get_json_func(field_type):
+ if field_type in _FD_TO_JSON:
+ return _FD_TO_JSON[field_type]
+ else: # pragma: no cover
+ logging.warning("pb_to_popo doesn't support converting %s", field_type)
+ return unicode
+
+_FD_TO_JSON = {
+ fd.TYPE_BOOL: bool,
+ fd.TYPE_DOUBLE: float,
+ fd.TYPE_ENUM: int,
+ fd.TYPE_FIXED32: float,
+ fd.TYPE_FIXED64: float,
+ fd.TYPE_FLOAT: float,
+ fd.TYPE_INT32: int,
+ fd.TYPE_INT64: long,
+ fd.TYPE_SFIXED32: float,
+ fd.TYPE_SFIXED64: float,
+ fd.TYPE_SINT32: int,
+ fd.TYPE_SINT64: long,
+ fd.TYPE_STRING: unicode,
+ fd.TYPE_UINT32: int,
+ fd.TYPE_UINT64: long,
+ fd.TYPE_MESSAGE: convert
+}
diff --git a/ts_mon/common/test/monitors_test.py b/ts_mon/common/test/monitors_test.py
index 289f0b3..4f9ef6a 100644
--- a/ts_mon/common/test/monitors_test.py
+++ b/ts_mon/common/test/monitors_test.py
@@ -3,6 +3,8 @@
# found in the LICENSE file.
import base64
+import httplib2
+import json
import os
import tempfile
import unittest
@@ -12,6 +14,7 @@
from infra_libs.ts_mon.common import interface
from infra_libs.ts_mon.common import monitors
+from infra_libs.ts_mon.common import pb_to_popo
from infra_libs.ts_mon.common import targets
from infra_libs.ts_mon.protos import metrics_pb2
import infra_libs
@@ -25,6 +28,69 @@
with self.assertRaises(NotImplementedError):
m.send(metric1)
+class HttpsMonitorTest(unittest.TestCase):
+
+ def setUp(self):
+ super(HttpsMonitorTest, self).setUp()
+
+ def message(self, pb):
+ pb = monitors.Monitor._wrap_proto(pb)
+ return json.dumps({'resource': pb_to_popo.convert(pb) })
+
+ def _test_send(self, http):
+ mon = monitors.HttpsMonitor('endpoint', '/path/to/creds.p8.json', http=http)
+ resp = mock.MagicMock(spec=httplib2.Response, status=200)
+ mon._http.request = mock.MagicMock(return_value=[resp, ""])
+
+ metric1 = metrics_pb2.MetricsData(name='m1')
+ mon.send(metric1)
+ metric2 = metrics_pb2.MetricsData(name='m2')
+ mon.send([metric1, metric2])
+ collection = metrics_pb2.MetricsCollection(data=[metric1, metric2])
+ mon.send(collection)
+
+ mon._http.request.assert_has_calls([
+ mock.call('endpoint', method='POST', body=self.message(metric1)),
+ mock.call('endpoint', method='POST',
+ body=self.message([metric1, metric2])),
+ mock.call('endpoint', method='POST', body=self.message(collection)),
+ ])
+
+ @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+ '_load_credentials', autospec=True)
+ def test_default_send(self, _load_creds):
+ self._test_send(None)
+
+ @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+ '_load_credentials', autospec=True)
+ def test_nondefault_send(self, _load_creds):
+ self._test_send(httplib2.Http())
+
+ @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+ '_load_credentials', autospec=True)
+ def test_send_resp_failure(self, _load_creds):
+ mon = monitors.HttpsMonitor('endpoint', '/path/to/creds.p8.json')
+ resp = mock.MagicMock(spec=httplib2.Response, status=400)
+ mon._http.request = mock.MagicMock(return_value=[resp, ""])
+
+ metric1 = metrics_pb2.MetricsData(name='m1')
+ mon.send(metric1)
+
+ mon._http.request.assert_called_once_with('endpoint', method='POST',
+ body=self.message(metric1))
+
+ @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+ '_load_credentials', autospec=True)
+ def test_send_http_failure(self, _load_creds):
+ mon = monitors.HttpsMonitor('endpoint', '/path/to/creds.p8.json')
+ mon._http.request = mock.MagicMock(side_effect=ValueError())
+
+ metric1 = metrics_pb2.MetricsData(name='m1')
+ mon.send(metric1)
+
+ mon._http.request.assert_called_once_with('endpoint', method='POST',
+ body=self.message(metric1))
+
class PubSubMonitorTest(unittest.TestCase):
diff --git a/ts_mon/common/test/pb_to_popo_test.py b/ts_mon/common/test/pb_to_popo_test.py
new file mode 100644
index 0000000..3373567
--- /dev/null
+++ b/ts_mon/common/test/pb_to_popo_test.py
@@ -0,0 +1,45 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import unittest
+
+from infra_libs.ts_mon.common import monitors
+from infra_libs.ts_mon.common import pb_to_popo
+from infra_libs.ts_mon.protos import acquisition_network_device_pb2
+from infra_libs.ts_mon.protos import acquisition_task_pb2
+from infra_libs.ts_mon.protos import metrics_pb2
+
+class PbToPopoTest(unittest.TestCase):
+
+ def test_convert(self):
+ task = acquisition_task_pb2.Task(service_name='service')
+ network_device = acquisition_network_device_pb2.NetworkDevice(
+ hostname='host', alertable=True)
+ metric1 = metrics_pb2.MetricsData(
+ name='m1', counter=200, task=task,
+ units=metrics_pb2.MetricsData.Units.Value('SECONDS'))
+ metric2 = metrics_pb2.MetricsData(name='m2', network_device=network_device,
+ cumulative_double_value=123.456)
+ collection = metrics_pb2.MetricsCollection(data=[metric1, metric2],
+ start_timestamp_us=12345)
+
+ popo = pb_to_popo.convert(collection)
+ expected = {
+ 'data': [
+ {
+ 'name': 'm1',
+ 'counter': 200L,
+ 'task': { 'service_name': 'service' },
+ 'units': 1
+ },
+ {
+ 'name': 'm2',
+ 'cumulative_double_value': 123.456,
+ 'network_device': { 'hostname': 'host', 'alertable': True }
+ }
+ ],
+ 'start_timestamp_us': 12345L
+ }
+ self.assertDictEqual(expected, popo)
+
diff --git a/ts_mon/config.py b/ts_mon/config.py
index 8b8e75f..b13e21d 100644
--- a/ts_mon/config.py
+++ b/ts_mon/config.py
@@ -83,8 +83,9 @@
'whitelisting and deployment of credentials. (default: %(default)s)')
parser.add_argument(
'--ts-mon-endpoint',
- help='url (including file://, pubsub://project/topic) to post monitoring '
- 'metrics to. If set, overrides the value in --ts-mon-config-file')
+ help='url (including file://, pubsub://project/topic, https://) to post '
+ 'monitoring metrics to. If set, overrides the value in '
+ '--ts-mon-config-file')
parser.add_argument(
'--ts-mon-credentials',
help='path to a pkcs8 json credential file. If set, overrides the value '
@@ -232,6 +233,9 @@
else:
logging.error('ts_mon monitoring is disabled because credentials are not '
'available')
+ elif endpoint.startswith('https://'):
+ interface.state.global_monitor = monitors.HttpsMonitor(endpoint,
+ credentials)
elif endpoint.lower() == 'none':
logging.info('ts_mon monitoring has been explicitly disabled')
else:
diff --git a/ts_mon/test/config_test.py b/ts_mon/test/config_test.py
index 9bef02f..c736acd 100644
--- a/ts_mon/test/config_test.py
+++ b/ts_mon/test/config_test.py
@@ -65,6 +65,32 @@
@mock.patch('requests.get', autospec=True)
@mock.patch('socket.getfqdn', autospec=True)
+ @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+ '_load_credentials', autospec=True)
+ def test_https_monitor_args(self, _load_creds, fake_fqdn, fake_get):
+ print [_load_creds, fake_fqdn, fake_get]
+ fake_fqdn.return_value = 'slave1-a1.reg.tld'
+ fake_get.return_value.side_effect = requests.exceptions.ConnectionError
+ p = argparse.ArgumentParser()
+ config.add_argparse_options(p)
+ args = p.parse_args([
+ '--ts-mon-credentials', '/path/to/creds.p8.json',
+ '--ts-mon-endpoint', 'https://test/random:insert'])
+
+ config.process_argparse_options(args)
+
+ self.assertIsInstance(interface.state.global_monitor,
+ monitors.HttpsMonitor)
+
+ self.assertIsInstance(interface.state.target, targets.DeviceTarget)
+ self.assertEquals(interface.state.target.hostname, 'slave1-a1')
+ self.assertEquals(interface.state.target.region, 'reg')
+ self.assertEquals(args.ts_mon_flush, 'auto')
+ self.assertIsNotNone(interface.state.flush_thread)
+ self.assertTrue(standard_metrics.up.get())
+
+ @mock.patch('requests.get', autospec=True)
+ @mock.patch('socket.getfqdn', autospec=True)
def test_default_target_uppercase_fqdn(self, fake_fqdn, fake_get):
fake_fqdn.return_value = 'SLAVE1-A1.REG.TLD'
fake_get.return_value.side_effect = requests.exceptions.ConnectionError