Add HttpsMonitor

BUG=625226
R=sergeyberezin@chromium.org

Review-Url: https://codereview.chromium.org/2114073002
Cr-Mirrored-From: https://chromium.googlesource.com/infra/infra
Cr-Mirrored-Commit: 2f08d02cb4cb2bff8b9c8885487792cc213675cf
diff --git a/__init__.py b/__init__.py
index 7257843..8af1717 100644
--- a/__init__.py
+++ b/__init__.py
@@ -6,7 +6,7 @@
 from infra_libs.httplib2_utils import AuthError
 from infra_libs.httplib2_utils import get_authenticated_http
 from infra_libs.httplib2_utils import get_signed_jwt_assertion_credentials
-from infra_libs.httplib2_utils import InstrumentedHttp, HttpMock
+from infra_libs.httplib2_utils import RetriableHttp, InstrumentedHttp, HttpMock
 from infra_libs.httplib2_utils import SERVICE_ACCOUNTS_CREDS_ROOT
 from infra_libs.utils import read_json_as_utf8
 from infra_libs.utils import rmtree
diff --git a/httplib2_utils.py b/httplib2_utils.py
index fcb9fe3..ff8a985 100644
--- a/httplib2_utils.py
+++ b/httplib2_utils.py
@@ -15,6 +15,7 @@
 import httplib2
 import oauth2client.client
 
+from googleapiclient import errors
 from infra_libs.ts_mon.common import http_metrics
 
 DEFAULT_SCOPES = ['email']
@@ -161,6 +162,46 @@
     http = httplib2.Http(timeout=timeout)
   return creds.authorize(http)
 
+class RetriableHttp(httplib2.Http):
+  """A httplib2.Http object that retries on failure."""
+
+  def __init__(self, max_tries=5, retrying_statuses_fn=None, **kwargs):
+    """
+    Args:
+      http_obj: an httplib2.Http instance
+      max_tries: a number of maximum tries
+      retrying_statuses_fn: a function that returns True if a given status
+                            should be retried
+    """
+    super(RetriableHttp, self).__init__(**kwargs)
+    self._max_tries = max_tries
+    self._retrying_statuses_fn = retrying_statuses_fn or \
+                                 set(range(500,599)).__contains__
+
+  def request(self, uri, method='GET', body=None, *args, **kwargs):
+    for i in range(1, self._max_tries + 1):
+      try:
+        response, content = super(RetriableHttp, self).request(
+            uri, method, body, *args, **kwargs)
+
+        if self._retrying_statuses_fn(response.status):
+          logging.info('RetriableHttp: attempt %d receiving status %d, %s',
+                       i, response.status,
+                       'final attempt' if i == self._max_tries else \
+                       'will retry')
+        else:
+          break
+      except (ValueError, errors.Error,
+              socket.timeout, socket.error, socket.herror, socket.gaierror,
+              httplib2.HttpLib2Error) as error:
+        logging.info('RetriableHttp: attempt %d received exception: %s, %s',
+                     i, error, 'final attempt' if i == self._max_tries else \
+                     'will retry')
+        if i == self._max_tries:
+          raise
+
+    return response, content
+
 
 class InstrumentedHttp(httplib2.Http):
   """A httplib2.Http object that reports ts_mon metrics about its requests."""
diff --git a/test/httplib2_utils_test.py b/test/httplib2_utils_test.py
index 0725656..0c6bdc8 100644
--- a/test/httplib2_utils_test.py
+++ b/test/httplib2_utils_test.py
@@ -129,6 +129,43 @@
         'creds_malformed.json',
         service_accounts_creds_root=DATA_DIR)
 
+class RetriableHttplib2Test(unittest.TestCase):
+  def setUp(self):
+    super(RetriableHttplib2Test, self).setUp()
+    self.http = infra_libs.RetriableHttp()
+    self.http._request = mock.create_autospec(self.http._request, spec_set=True)
+
+  _MOCK_REQUEST = mock.call(*([mock.ANY] * 9))
+
+  def test_succeed(self):
+    self.http._request.return_value = (
+        httplib2.Response({'status': 400}), 'content')
+    response, _ = self.http.request('http://foo/')
+    self.assertEqual(400, response.status)
+    self.http._request.assert_has_calls([ self._MOCK_REQUEST ])
+
+  def test_retry_succeed(self):
+    self.http._request.side_effect = iter([
+      (httplib2.Response({'status': 500}), 'content'),
+      httplib2.HttpLib2Error,
+      (httplib2.Response({'status': 200}), 'content')
+    ])
+    response, _ = self.http.request('http://foo/')
+    self.assertEqual(200, response.status)
+    self.http._request.assert_has_calls([ self._MOCK_REQUEST ] * 3)
+
+  def test_fail_exception(self):
+    self.http._request.side_effect = httplib2.HttpLib2Error()
+    self.assertRaises(httplib2.HttpLib2Error, self.http.request, 'http://foo/')
+    self.http._request.assert_has_calls([ self._MOCK_REQUEST ] * 5)
+
+  def test_fail_status_code(self):
+    self.http._request.return_value = (
+        httplib2.Response({'status': 500}), 'content')
+    response, _ = self.http.request('http://foo/')
+    self.assertEqual(500, response.status)
+    self.http._request.assert_has_calls([ self._MOCK_REQUEST ] * 5)
+
 
 class InstrumentedHttplib2Test(unittest.TestCase):
   def setUp(self):
diff --git a/ts_mon/common/monitors.py b/ts_mon/common/monitors.py
index 110508a..bed8f18 100644
--- a/ts_mon/common/monitors.py
+++ b/ts_mon/common/monitors.py
@@ -6,6 +6,7 @@
 
 
 import base64
+import httplib2
 import json
 import logging
 import socket
@@ -13,15 +14,13 @@
 
 from googleapiclient import discovery
 from googleapiclient import errors
+from infra_libs import httplib2_utils
+from infra_libs.ts_mon.common import http_metrics
+from infra_libs.ts_mon.common import pb_to_popo
+from infra_libs.ts_mon.protos import metrics_pb2
 from oauth2client import gce
 from oauth2client.client import GoogleCredentials
 from oauth2client.file import Storage
-import httplib2
-
-from infra_libs import httplib2_utils
-from infra_libs.ts_mon.common import http_metrics
-from infra_libs.ts_mon.protos import metrics_pb2
-
 
 # Special string that can be passed through as the credentials path to use the
 # default Appengine or GCE service account.
@@ -38,6 +37,9 @@
   Metrics may be initialized before the underlying Monitor. If it does not exist
   at the time that a Metric is sent, an exception will be raised.
   """
+
+  _SCOPES = []
+
   @staticmethod
   def _wrap_proto(data):
     """Normalize MetricsData, list(MetricsData), and MetricsCollection.
@@ -56,19 +58,6 @@
       ret = metrics_pb2.MetricsCollection(data=[data])
     return ret
 
-  def send(self, metric_pb):
-    raise NotImplementedError()
-
-
-class PubSubMonitor(Monitor):
-  """Class which publishes metrics to a Cloud Pub/Sub topic."""
-
-  _SCOPES = [
-      'https://www.googleapis.com/auth/pubsub',
-  ]
-
-  TIMEOUT = 10  # seconds
-
   def _load_credentials(self, credentials_file_path):
     if credentials_file_path == GCE_CREDENTIALS:
       return gce.AppAssertionCredentials(self._SCOPES)
@@ -88,6 +77,51 @@
       return credentials
     return Storage(credentials_file_path).get()
 
+  def send(self, metric_pb):
+    raise NotImplementedError()
+
+class HttpsMonitor(Monitor):
+
+  _SCOPES = [
+    'https://www.googleapis.com/auth/prodxmon'
+  ]
+
+  def __init__(self, endpoint, credentials_file_path, http=None):
+    self._endpoint = endpoint
+    credentials = self._load_credentials(credentials_file_path)
+    if http is None:
+      http = httplib2_utils.RetriableHttp(
+          httplib2_utils.InstrumentedHttp('acq-mon-api'))
+    self._http = credentials.authorize(http)
+
+  def encodeToJson(self, metric_pb):
+    return json.dumps({ 'resource': pb_to_popo.convert(metric_pb) })
+
+  def send(self, metric_pb):
+    body = self.encodeToJson(self._wrap_proto(metric_pb))
+
+    try:
+      resp, content = self._http.request(self._endpoint, method='POST',
+                                         body=body)
+      if resp.status != 200:
+        logging.warning('HttpsMonitor.send received status %d: %s', resp,
+                        content)
+    except (ValueError, errors.Error,
+            socket.timeout, socket.error, socket.herror, socket.gaierror,
+            httplib2.HttpLib2Error):
+      logging.warning('HttpsMonitor.send failed: %s\n',
+                      traceback.format_exc())
+
+
+class PubSubMonitor(Monitor):
+  """Class which publishes metrics to a Cloud Pub/Sub topic."""
+
+  _SCOPES = [
+      'https://www.googleapis.com/auth/pubsub',
+  ]
+
+  TIMEOUT = 10  # seconds
+
   def _initialize(self):
     creds = self._load_credentials(self._credsfile)
     creds.authorize(self._http)
diff --git a/ts_mon/common/pb_to_popo.py b/ts_mon/common/pb_to_popo.py
new file mode 100644
index 0000000..8266f9a
--- /dev/null
+++ b/ts_mon/common/pb_to_popo.py
@@ -0,0 +1,43 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+
+from google.protobuf.descriptor import FieldDescriptor as fd
+
+def convert(pb):
+  """Convert protobuf to plain-old-python-object"""
+  obj = {}
+  for field, value in pb.ListFields():
+    if field.label == fd.LABEL_REPEATED:
+      obj[field.name] = list(_get_json_func(field.type)(v) for v in value)
+    else:
+      obj[field.name] = _get_json_func(field.type)(value)
+  return obj
+
+def _get_json_func(field_type):
+  if field_type in _FD_TO_JSON:
+    return _FD_TO_JSON[field_type]
+  else: # pragma: no cover
+    logging.warning("pb_to_popo doesn't support converting %s", field_type)
+    return unicode
+
+_FD_TO_JSON  = {
+  fd.TYPE_BOOL: bool,
+  fd.TYPE_DOUBLE: float,
+  fd.TYPE_ENUM: int,
+  fd.TYPE_FIXED32: float,
+  fd.TYPE_FIXED64: float,
+  fd.TYPE_FLOAT: float,
+  fd.TYPE_INT32: int,
+  fd.TYPE_INT64: long,
+  fd.TYPE_SFIXED32: float,
+  fd.TYPE_SFIXED64: float,
+  fd.TYPE_SINT32: int,
+  fd.TYPE_SINT64: long,
+  fd.TYPE_STRING: unicode,
+  fd.TYPE_UINT32: int,
+  fd.TYPE_UINT64: long,
+  fd.TYPE_MESSAGE: convert
+}
diff --git a/ts_mon/common/test/monitors_test.py b/ts_mon/common/test/monitors_test.py
index 289f0b3..4f9ef6a 100644
--- a/ts_mon/common/test/monitors_test.py
+++ b/ts_mon/common/test/monitors_test.py
@@ -3,6 +3,8 @@
 # found in the LICENSE file.
 
 import base64
+import httplib2
+import json
 import os
 import tempfile
 import unittest
@@ -12,6 +14,7 @@
 
 from infra_libs.ts_mon.common import interface
 from infra_libs.ts_mon.common import monitors
+from infra_libs.ts_mon.common import pb_to_popo
 from infra_libs.ts_mon.common import targets
 from infra_libs.ts_mon.protos import metrics_pb2
 import infra_libs
@@ -25,6 +28,69 @@
     with self.assertRaises(NotImplementedError):
       m.send(metric1)
 
+class HttpsMonitorTest(unittest.TestCase):
+
+  def setUp(self):
+    super(HttpsMonitorTest, self).setUp()
+
+  def message(self, pb):
+    pb = monitors.Monitor._wrap_proto(pb)
+    return json.dumps({'resource': pb_to_popo.convert(pb) })
+
+  def _test_send(self, http):
+    mon = monitors.HttpsMonitor('endpoint', '/path/to/creds.p8.json', http=http)
+    resp = mock.MagicMock(spec=httplib2.Response, status=200)
+    mon._http.request = mock.MagicMock(return_value=[resp, ""])
+
+    metric1 = metrics_pb2.MetricsData(name='m1')
+    mon.send(metric1)
+    metric2 = metrics_pb2.MetricsData(name='m2')
+    mon.send([metric1, metric2])
+    collection = metrics_pb2.MetricsCollection(data=[metric1, metric2])
+    mon.send(collection)
+
+    mon._http.request.assert_has_calls([
+      mock.call('endpoint', method='POST', body=self.message(metric1)),
+      mock.call('endpoint', method='POST',
+                body=self.message([metric1, metric2])),
+      mock.call('endpoint', method='POST', body=self.message(collection)),
+    ])
+
+  @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+              '_load_credentials', autospec=True)
+  def test_default_send(self, _load_creds):
+    self._test_send(None)
+
+  @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+              '_load_credentials', autospec=True)
+  def test_nondefault_send(self, _load_creds):
+    self._test_send(httplib2.Http())
+
+  @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+              '_load_credentials', autospec=True)
+  def test_send_resp_failure(self, _load_creds):
+    mon = monitors.HttpsMonitor('endpoint', '/path/to/creds.p8.json')
+    resp = mock.MagicMock(spec=httplib2.Response, status=400)
+    mon._http.request = mock.MagicMock(return_value=[resp, ""])
+
+    metric1 = metrics_pb2.MetricsData(name='m1')
+    mon.send(metric1)
+
+    mon._http.request.assert_called_once_with('endpoint', method='POST',
+                                              body=self.message(metric1))
+
+  @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+              '_load_credentials', autospec=True)
+  def test_send_http_failure(self, _load_creds):
+    mon = monitors.HttpsMonitor('endpoint', '/path/to/creds.p8.json')
+    mon._http.request = mock.MagicMock(side_effect=ValueError())
+
+    metric1 = metrics_pb2.MetricsData(name='m1')
+    mon.send(metric1)
+
+    mon._http.request.assert_called_once_with('endpoint', method='POST',
+                                              body=self.message(metric1))
+
 
 class PubSubMonitorTest(unittest.TestCase):
 
diff --git a/ts_mon/common/test/pb_to_popo_test.py b/ts_mon/common/test/pb_to_popo_test.py
new file mode 100644
index 0000000..3373567
--- /dev/null
+++ b/ts_mon/common/test/pb_to_popo_test.py
@@ -0,0 +1,45 @@
+# Copyright 2015 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import unittest
+
+from infra_libs.ts_mon.common import monitors
+from infra_libs.ts_mon.common import pb_to_popo
+from infra_libs.ts_mon.protos import acquisition_network_device_pb2
+from infra_libs.ts_mon.protos import acquisition_task_pb2
+from infra_libs.ts_mon.protos import metrics_pb2
+
+class PbToPopoTest(unittest.TestCase):
+
+  def test_convert(self):
+    task = acquisition_task_pb2.Task(service_name='service')
+    network_device = acquisition_network_device_pb2.NetworkDevice(
+                         hostname='host', alertable=True)
+    metric1 = metrics_pb2.MetricsData(
+        name='m1', counter=200, task=task,
+        units=metrics_pb2.MetricsData.Units.Value('SECONDS'))
+    metric2 = metrics_pb2.MetricsData(name='m2', network_device=network_device,
+                                      cumulative_double_value=123.456)
+    collection = metrics_pb2.MetricsCollection(data=[metric1, metric2],
+                                               start_timestamp_us=12345)
+
+    popo = pb_to_popo.convert(collection)
+    expected = {
+      'data': [
+        {
+          'name': 'm1',
+          'counter': 200L,
+          'task': { 'service_name': 'service' },
+          'units': 1
+        },
+        {
+          'name': 'm2',
+          'cumulative_double_value': 123.456,
+          'network_device': { 'hostname': 'host', 'alertable': True }
+        }
+      ],
+      'start_timestamp_us': 12345L
+    }
+    self.assertDictEqual(expected, popo)
+
diff --git a/ts_mon/config.py b/ts_mon/config.py
index 8b8e75f..b13e21d 100644
--- a/ts_mon/config.py
+++ b/ts_mon/config.py
@@ -83,8 +83,9 @@
            'whitelisting and deployment of credentials. (default: %(default)s)')
   parser.add_argument(
       '--ts-mon-endpoint',
-      help='url (including file://, pubsub://project/topic) to post monitoring '
-           'metrics to. If set, overrides the value in --ts-mon-config-file')
+      help='url (including file://, pubsub://project/topic, https://) to post '
+           'monitoring metrics to. If set, overrides the value in '
+           '--ts-mon-config-file')
   parser.add_argument(
       '--ts-mon-credentials',
       help='path to a pkcs8 json credential file. If set, overrides the value '
@@ -232,6 +233,9 @@
     else:
       logging.error('ts_mon monitoring is disabled because credentials are not '
                     'available')
+  elif endpoint.startswith('https://'):
+    interface.state.global_monitor = monitors.HttpsMonitor(endpoint,
+                                                           credentials)
   elif endpoint.lower() == 'none':
     logging.info('ts_mon monitoring has been explicitly disabled')
   else:
diff --git a/ts_mon/test/config_test.py b/ts_mon/test/config_test.py
index 9bef02f..c736acd 100644
--- a/ts_mon/test/config_test.py
+++ b/ts_mon/test/config_test.py
@@ -65,6 +65,32 @@
 
   @mock.patch('requests.get', autospec=True)
   @mock.patch('socket.getfqdn', autospec=True)
+  @mock.patch('infra_libs.ts_mon.common.monitors.HttpsMonitor.'
+              '_load_credentials', autospec=True)
+  def test_https_monitor_args(self, _load_creds, fake_fqdn, fake_get):
+    print [_load_creds, fake_fqdn, fake_get]
+    fake_fqdn.return_value = 'slave1-a1.reg.tld'
+    fake_get.return_value.side_effect = requests.exceptions.ConnectionError
+    p = argparse.ArgumentParser()
+    config.add_argparse_options(p)
+    args = p.parse_args([
+        '--ts-mon-credentials', '/path/to/creds.p8.json',
+        '--ts-mon-endpoint', 'https://test/random:insert'])
+
+    config.process_argparse_options(args)
+
+    self.assertIsInstance(interface.state.global_monitor,
+                          monitors.HttpsMonitor)
+
+    self.assertIsInstance(interface.state.target, targets.DeviceTarget)
+    self.assertEquals(interface.state.target.hostname, 'slave1-a1')
+    self.assertEquals(interface.state.target.region, 'reg')
+    self.assertEquals(args.ts_mon_flush, 'auto')
+    self.assertIsNotNone(interface.state.flush_thread)
+    self.assertTrue(standard_metrics.up.get())
+
+  @mock.patch('requests.get', autospec=True)
+  @mock.patch('socket.getfqdn', autospec=True)
   def test_default_target_uppercase_fqdn(self, fake_fqdn, fake_get):
     fake_fqdn.return_value = 'SLAVE1-A1.REG.TLD'
     fake_get.return_value.side_effect = requests.exceptions.ConnectionError