blob: 44fe2d7739d2bb3219e2dc78ad710a618fda7297 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import copy
import datetime
import gae_ts_mon
import mock
from .test_support import test_case
from infra_libs.ts_mon import config
from infra_libs.ts_mon import exporter
from infra_libs.ts_mon import shared
from infra_libs.ts_mon.common import interface
from infra_libs.ts_mon.common import targets
class ExporterTest(test_case.TestCase):
def setUp(self):
super(ExporterTest, self).setUp()
config.reset_for_unittest()
target = targets.TaskTarget('test_service', 'test_job', 'test_region',
'test_host')
self.mock_state = interface.State(target=target)
self.mock_state.metrics = copy.copy(interface.state.metrics)
mock.patch(
'infra_libs.ts_mon.common.interface.state',
new=self.mock_state).start()
mock.patch(
'infra_libs.ts_mon.common.monitors.HttpsMonitor',
autospec=True).start()
def tearDown(self):
config.reset_for_unittest()
mock.patch.stopall()
super(ExporterTest, self).tearDown()
def test_reset_cumulative_metrics(self):
gauge = gae_ts_mon.GaugeMetric('gauge', 'foo', None)
counter = gae_ts_mon.CounterMetric('counter', 'foo', None)
gauge.set(5)
counter.increment()
self.assertEqual(5, gauge.get())
self.assertEqual(1, counter.get())
exporter._reset_cumulative_metrics()
self.assertEqual(5, gauge.get())
self.assertIsNone(counter.get())
def test_flush_metrics_without_target(self):
time_now = 10000
interface.state.target = None
self.assertFalse(exporter.flush_metrics_if_needed(time_now))
def test_flush_metrics_no_task_num(self):
# We are not assigned task_num yet; cannot send metrics.
time_now = 10000
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
more_than_min_ago = datetime_now - datetime.timedelta(seconds=61)
interface.state.last_flushed = more_than_min_ago
entity = shared.get_instance_entity()
entity.task_num = -1
interface.state.target.task_num = -1
self.assertFalse(exporter.flush_metrics_if_needed(time_now))
def test_flush_metrics_no_task_num_too_long(self):
# We are not assigned task_num for too long; cannot send metrics.
time_now = 10000
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
too_long_ago = datetime_now - datetime.timedelta(
seconds=shared.INSTANCE_EXPECTED_TO_HAVE_TASK_NUM_SEC + 1)
interface.state.last_flushed = too_long_ago
entity = shared.get_instance_entity()
entity.task_num = -1
entity.last_updated = too_long_ago
interface.state.target.task_num = -1
self.assertFalse(exporter.flush_metrics_if_needed(time_now))
def test_flush_metrics_purged(self):
# We lost our task_num; cannot send metrics.
time_now = 10000
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
more_than_min_ago = datetime_now - datetime.timedelta(seconds=61)
interface.state.last_flushed = more_than_min_ago
entity = shared.get_instance_entity()
entity.task_num = -1
interface.state.target.task_num = 2
self.assertFalse(exporter.flush_metrics_if_needed(time_now))
def test_flush_metrics_too_early(self):
# Too early to send metrics.
time_now = 10000
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
less_than_min_ago = datetime_now - datetime.timedelta(seconds=59)
interface.state.last_flushed = less_than_min_ago
entity = shared.get_instance_entity()
entity.task_num = 2
self.assertFalse(exporter.flush_metrics_if_needed(time_now))
@mock.patch('infra_libs.ts_mon.common.interface.flush', autospec=True)
def test_flush_metrics_successfully(self, mock_flush):
# We have task_num and due for sending metrics.
time_now = 10000
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
more_than_min_ago = datetime_now - datetime.timedelta(seconds=61)
interface.state.last_flushed = more_than_min_ago
entity = shared.get_instance_entity()
entity.task_num = 2
# Global metrics must be erased after exporter.
test_global_metric = gae_ts_mon.GaugeMetric('test', 'foo', None)
test_global_metric.set(42)
interface.register_global_metrics([test_global_metric])
self.assertEqual(42, test_global_metric.get())
self.assertTrue(exporter.flush_metrics_if_needed(time_now))
self.assertEqual(None, test_global_metric.get())
mock_flush.assert_called_once_with()
@mock.patch('infra_libs.ts_mon.common.interface.flush', autospec=True)
def test_flush_metrics_disabled(self, mock_flush):
# We have task_num and due for sending metrics, but ts_mon is disabled.
time_now = 10000
datetime_now = datetime.datetime.utcfromtimestamp(time_now)
more_than_min_ago = datetime_now - datetime.timedelta(seconds=61)
interface.state.last_flushed = more_than_min_ago
interface.state.flush_enabled_fn = lambda: False
entity = shared.get_instance_entity()
entity.task_num = 2
self.assertFalse(exporter.flush_metrics_if_needed(time_now))
self.assertEqual(0, mock_flush.call_count)
@mock.patch('threading.Thread', autospec=True)
@mock.patch(
'gae_ts_mon.exporter.need_to_flush_metrics',
autospec=True,
return_value=True)
def test_parallel_flush_start_thread(self, _, mock_thread):
time_now = 10000
with exporter.parallel_flush(time_now) as thread:
self.assertNotEqual(None, thread)
# test if the thread instance was created with the expected param values.
mock_thread.assert_called_once_with(
args=(time_now,), target=exporter._flush_metrics)
# test if thread.start() has been invoked.
thread.start.assert_called_once()
# thread.join() should be called when the context exits.
thread.join.assert_called_once()
@mock.patch('threading.Thread', autospec=True)
@mock.patch(
'gae_ts_mon.exporter.need_to_flush_metrics',
autospec=True,
return_value=False)
def test_create_flush_thread_if_needed_skip_creating_thread(
self, _, mock_thread):
time_now = 10000
with exporter.parallel_flush(time_now) as thread:
self.assertEqual(None, thread)
mock_thread.assert_not_called()