blob: 945a05d5865e7f2de43c1dbe6f18bce4743e5165 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import random
import StringIO
import time
import unittest
import httplib2
import mock
import infra_libs
from infra_libs.event_mon import config
from infra_libs.event_mon import router
from infra_libs.event_mon.protos.chrome_infra_log_pb2 import ChromeInfraEvent
from infra_libs.event_mon.protos.log_request_lite_pb2 import LogRequestLite
import infra_libs.event_mon.monitoring
DATA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')
class HttpRouterTests(unittest.TestCase):
def test_without_credentials_smoke(self):
# Use dry_run to avoid code that deals with http.
r = router._HttpRouter({}, 'https://any.where', dry_run=True)
self.assertIsInstance(r._http, infra_libs.InstrumentedHttp)
def test_with_credentials_smoke(self):
cache = {'service_account_creds':
os.path.join(DATA_DIR, 'valid_creds.json'),
'service_accounts_creds_root': 'whatever.the/other/is/absolute'}
r = router._HttpRouter(cache, 'https://any.where', dry_run=True)
self.assertIsInstance(r._http, infra_libs.InstrumentedHttp)
def test_with_nonexisting_credentials(self):
cache = {'service_account_creds':
os.path.join(DATA_DIR, 'no-such-file-8531.json'),
'service_accounts_creds_root': 'whatever.the/other/is/absolute'}
# things should work.
r = router._HttpRouter(cache, 'https://any.where', dry_run=True)
self.assertIsInstance(r._http, infra_libs.InstrumentedHttp)
def test_push_smoke(self):
r = router._HttpRouter({}, 'https://any.where', dry_run=True)
req = LogRequestLite.LogEventLite()
req.event_time_ms = router.time_ms()
req.event_code = 1
req.event_flow_id = 2
self.assertTrue(r.push_event(req))
def test_push_invalid_event(self):
r = router._HttpRouter({}, 'https://any.where', dry_run=True)
self.assertFalse(r.push_event(None))
def test_invalid_url(self):
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
r = router._HttpRouter({}, 'ftp://any.where', dry_run=True)
self.assertFalse(r.push_event(event))
# Below this line, test the send/retry loop.
def test_push_ok(self):
# Successfully push event the first time.
sleep = mock.create_autospec(time.sleep, auto_set=True)
r = router._HttpRouter({}, 'https://bla.bla', _sleep_fn=sleep)
r._http = infra_libs.HttpMock([('https://bla.bla', {'status': 200}, '')])
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
self.assertTrue(r.push_event(event))
self.assertEquals(len(sleep.call_args_list), 0)
def test_push_fail(self):
# Fail to push events even after all retries
sleep = mock.create_autospec(time.sleep, auto_set=True)
r = router._HttpRouter({}, 'https://bla.bla', _sleep_fn=sleep)
r._http = infra_libs.HttpMock([('https://bla.bla', {'status': 403}, '')])
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
self.assertFalse(r.push_event(event))
self.assertEquals(len(sleep.call_args_list), r.try_num)
def test_push_exception(self):
# Fail to push events even after all retries
sleep = mock.create_autospec(time.sleep, auto_set=True)
r = router._HttpRouter({}, 'https://bla.bla', _sleep_fn=sleep)
class FakeHttp(object):
# pylint: disable=unused-argument
def request(self, *args, **kwargs):
raise ValueError()
r._http = FakeHttp()
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
self.assertFalse(r.push_event(event))
self.assertEquals(len(sleep.call_args_list), 3)
@mock.patch('logging.debug', autospec=True)
def test_logs_success(self, logdebug):
r = router._HttpRouter({}, 'https://bla.bla')
r._http = infra_libs.HttpMock([('https://bla.bla', {'status': 200}, '')])
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
self.assertTrue(r.push_event(event))
self.assertIn(mock.call('Succeeded POSTing data after %d attempts', 1),
logdebug.call_args_list)
class TextStreamRouterTests(unittest.TestCase):
def test_stdout_smoke(self):
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
r = router._TextStreamRouter()
self.assertTrue(r.push_event(event))
def test_stringio_stream(self):
config._cache['default_event'] = ChromeInfraEvent()
log_event = infra_libs.event_mon.monitoring._get_service_event(
'START', timestamp_kind='POINT', event_timestamp=1234).log_event()
stream = StringIO.StringIO()
r = router._TextStreamRouter(stream=stream)
self.assertTrue(r.push_event(log_event))
stream.seek(0)
output = stream.read()
self.assertGreater(stream.len, 10)
# Minimal checking because if this is printed the rest is printed as well
# unless google.protobuf is completely broken.
self.assertIn('timestamp_kind: POINT', output)
def test_closed_stream(self):
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
stream = StringIO.StringIO()
stream.close()
r = router._TextStreamRouter(stream=stream)
self.assertFalse(r.push_event(event))
class LocalFileRouterTests(unittest.TestCase):
def test_existing_directory(self):
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
with infra_libs.temporary_directory(prefix='event-mon-') as tempdir:
filename = os.path.join(tempdir, 'output.db')
r = router._LocalFileRouter(filename, dry_run=False)
self.assertTrue(r.push_event(event))
# Check that the file is readable and contains the right data.
with open(filename, 'rb') as f:
req_read = LogRequestLite.FromString(f.read())
self.assertEqual(len(req_read.log_event), 1)
event_read = req_read.log_event[0]
self.assertEqual(event_read.event_time_ms, event.event_time_ms)
self.assertEqual(event_read.event_code, event.event_code)
self.assertEqual(event_read.event_flow_id, event.event_flow_id)
def test_existing_directory_dry_run(self):
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
with infra_libs.temporary_directory(prefix='event-mon-') as tempdir:
filename = os.path.join(tempdir, 'output.db')
r = router._LocalFileRouter(filename, dry_run=True)
self.assertTrue(r.push_event(event))
# Check that the file has not been written
self.assertFalse(os.path.exists(filename))
def test_non_existing_directory(self):
req = LogRequestLite.LogEventLite()
req.event_time_ms = router.time_ms()
req.event_code = 1
req.event_flow_id = 2
r = router._LocalFileRouter(
os.path.join('non_existing_d1r_31401789', 'output.db'), dry_run=False)
self.assertFalse(r.push_event(req))
class BackoffTest(unittest.TestCase):
def test_backoff_time_first_value(self):
t = router.backoff_time(attempt=0, retry_backoff=2.)
random.seed(0)
self.assertTrue(1.5 <= t <= 2.5)
def test_backoff_time_max_value(self):
t = router.backoff_time(attempt=10, retry_backoff=2., max_delay=5)
self.assertTrue(abs(t - 5.) < 0.0001)
class LoggingStreamRouterTests(unittest.TestCase):
@mock.patch('logging.log')
def test_events_are_logged_correctly(self, log_mock):
logger = router._LoggingStreamRouter()
events = []
for i in range(3):
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
infra_event = ChromeInfraEvent()
infra_event.cq_event.issue = str(i + 1)
event.source_extension = infra_event.SerializeToString()
events.append(event)
self.assertTrue(logger.push_event(events))
expected_calls = [
((logging.INFO,
'Sending event_mon event:\ncq_event {\n issue: "1"\n}\n'),),
((logging.INFO,
'Sending event_mon event:\ncq_event {\n issue: "2"\n}\n'),),
((logging.INFO,
'Sending event_mon event:\ncq_event {\n issue: "3"\n}\n'),)]
self.assertEqual(log_mock.call_args_list, expected_calls)
@mock.patch('logging.log')
def test_events_are_logged_with_specified_severity(self, log_mock):
logger = router._LoggingStreamRouter(logging.WARN)
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
self.assertTrue(logger.push_event(event))
self.assertEqual(log_mock.call_args[0], (logging.WARN, mock.ANY))
@mock.patch('logging.exception')
def test_fails_to_log(self, exception_mock):
logger = router._LoggingStreamRouter(logging.WARN)
event = LogRequestLite.LogEventLite()
event.event_time_ms = router.time_ms()
event.event_code = 1
event.event_flow_id = 2
event.source_extension = 'not-a-message'
self.assertFalse(logger.push_event(event))
self.assertEqual(exception_mock.call_args[0], ('Unable to log the events',))