| # Copyright 2016 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Wrapper library around ts_mon. |
| |
| This library provides some wrapper functionality around ts_mon, to make it more |
| friendly to developers. It also provides import safety, in case ts_mon is not |
| deployed with your code. |
| """ |
| |
| import collections |
| import contextlib |
| from functools import wraps |
| import logging |
| import queue as Queue |
| import ssl |
| import time |
| |
| |
| try: |
| from chromite.third_party.infra_libs import ts_mon |
| except (ImportError, RuntimeError): |
| ts_mon = None |
| |
| |
| # This number is chosen because 1.16^100 seconds is about |
| # 32 days. This is a good compromise between bucket size |
| # and dynamic range. |
| _SECONDS_BUCKET_FACTOR = 1.16 |
| |
| # If none, we create metrics in this process. Otherwise, we send metrics via |
| # this Queue to a dedicated flushing processes. |
| # These attributes are set by chromite.lib.ts_mon_config.SetupTsMonGlobalState. |
| FLUSHING_PROCESS = None |
| MESSAGE_QUEUE = None |
| |
| _MISSING = object() |
| |
| MetricCall = collections.namedtuple( |
| "MetricCall", |
| [ |
| "metric_name", |
| "metric_args", |
| "metric_kwargs", |
| "method", |
| "method_args", |
| "method_kwargs", |
| "reset_after", |
| ], |
| ) |
| |
| |
| def _FlushingProcessClosed(): |
| """Returns whether the metrics flushing process has been closed.""" |
| return ( |
| FLUSHING_PROCESS is not None and FLUSHING_PROCESS.exitcode is not None |
| ) |
| |
| |
| class ProxyMetric: |
| """Redirects any method calls to the message queue.""" |
| |
| def __init__(self, metric, metric_args, metric_kwargs) -> None: |
| self.metric = metric |
| self.metric_args = metric_args |
| self.reset_after = metric_kwargs.pop("reset_after", False) |
| self.metric_kwargs = metric_kwargs |
| |
| def __getattr__(self, method_name: str): |
| """Redirects all method calls to the MESSAGE_QUEUE.""" |
| |
| def enqueue(*args, **kwargs) -> None: |
| if not _FlushingProcessClosed(): |
| try: |
| MESSAGE_QUEUE.put_nowait( |
| MetricCall( |
| metric_name=self.metric, |
| metric_args=self.metric_args, |
| metric_kwargs=self.metric_kwargs, |
| method=method_name, |
| method_args=args, |
| method_kwargs=kwargs, |
| reset_after=self.reset_after, |
| ) |
| ) |
| except Queue.Full: |
| logging.warning( |
| "Metrics queue is full; skipped sending metric '%s'", |
| self.metric, |
| ) |
| else: |
| try: |
| exit_code = FLUSHING_PROCESS.exitcode |
| except AttributeError: |
| exit_code = None |
| logging.warning( |
| "Flushing process has been closed (exit code %s)," |
| " skipped sending metric '%s'", |
| exit_code, |
| self.metric, |
| ) |
| |
| return enqueue |
| |
| |
| def _Indirect(fn): |
| """Decorates a function to be indirect If MESSAGE_QUEUE is set. |
| |
| If MESSAGE_QUEUE is set, the indirect function will return a proxy metrics |
| object; otherwise, it behaves normally. |
| """ |
| |
| @wraps(fn) |
| def AddToQueueIfPresent(*args, **kwargs): |
| if MESSAGE_QUEUE: |
| return ProxyMetric(fn.__name__, args, kwargs) |
| else: |
| # Whether to reset the metric after the flush; this is only used by |
| # |ProxyMetric|, so remove this from the kwargs. |
| kwargs.pop("reset_after", None) |
| return fn(*args, **kwargs) |
| |
| return AddToQueueIfPresent |
| |
| |
| class MockMetric: |
| """Mock metric object, to be returned if ts_mon is not set up.""" |
| |
| def _mock_method(self, *args, **kwargs) -> None: |
| pass |
| |
| def __getattr__(self, _: str): |
| return self._mock_method |
| |
| |
| def _ImportSafe(fn): |
| """Decorator causing |fn| to return MockMetric if ts_mon not imported.""" |
| |
| @wraps(fn) |
| def wrapper(*args, **kwargs): |
| if ts_mon: |
| return fn(*args, **kwargs) |
| else: |
| return MockMetric() |
| |
| return wrapper |
| |
| |
| class FieldSpecAdapter: |
| """Infers the types of fields values to work around field_spec requirement. |
| |
| See: https://chromium-review.googlesource.com/c/432120/ for the change |
| which added a required field_spec argument. This class is a temporary |
| workaround to allow inferring the field_spec if is not provided. |
| """ |
| |
| FIELD_CLASSES = ( |
| {} |
| if ts_mon is None |
| else { |
| bool: ts_mon.BooleanField, |
| int: ts_mon.IntegerField, |
| str: ts_mon.StringField, |
| str: ts_mon.StringField, |
| } |
| ) |
| |
| def __init__(self, metric_cls, *args, **kwargs) -> None: |
| self._metric_cls = metric_cls |
| self._args = args |
| self._kwargs = kwargs |
| self._instance = _MISSING |
| |
| def __getattr__(self, prop: str): |
| """Return a wrapper which constructs the metric object on demand. |
| |
| Args: |
| prop: The property name |
| |
| Returns: |
| If self._instance has been created, the instance's .|prop| property, |
| otherwise, a wrapper function which creates the ._instance and then |
| calls the |prop| method on the instance. |
| """ |
| if self._instance is not _MISSING: |
| return getattr(self._instance, prop) |
| |
| def func(*args, **kwargs): |
| if self._instance is not _MISSING: |
| return getattr(self._instance, prop)(*args, **kwargs) |
| fields = FieldSpecAdapter._InferFields(prop, args, kwargs) |
| self._kwargs["field_spec"] = FieldSpecAdapter._InferFieldSpec( |
| fields |
| ) |
| self._instance = self._metric_cls(*self._args, **self._kwargs) |
| return getattr(self._instance, prop)(*args, **kwargs) |
| |
| func.__name__ = prop |
| return func |
| |
| @staticmethod |
| def _InferFields(method_name, args, kwargs): |
| """Infers the fields argument. |
| |
| Args: |
| method_name: The method called. |
| args: The args list |
| kwargs: The keyword args |
| """ |
| if "fields" in kwargs: |
| return kwargs["fields"] |
| |
| if method_name == "increment" and args: |
| return args[0] |
| |
| if len(args) >= 2: |
| return args[1] |
| |
| @staticmethod |
| def _InferFieldSpec(fields): |
| """Infers the fields types from the given fields. |
| |
| Args: |
| fields: A dictionary with metric fields. |
| """ |
| if not fields or not ts_mon: |
| return None |
| |
| return [ |
| FieldSpecAdapter.FIELD_CLASSES[type(v)](field) |
| for (field, v) in sorted(fields.items()) |
| ] |
| |
| |
| def _OptionalFieldSpec(fn): |
| """Decorates a function to allow an optional description and field_spec.""" |
| |
| @wraps(fn) |
| def wrapper(*args, **kwargs): |
| kwargs = dict(**kwargs) # It's bad practice to mutate **kwargs |
| # Slightly different than .setdefault, this line sets a default even |
| # when the key is present (as long as the value is not truthy). Empty or |
| # None is not allowed for descriptions. |
| kwargs["description"] = kwargs.get("description") or "No description." |
| if "field_spec" in kwargs and kwargs["field_spec"] is not _MISSING: |
| return fn(*args, **kwargs) |
| else: |
| return FieldSpecAdapter(fn, *args, **kwargs) |
| |
| return wrapper |
| |
| |
| def _Metric(fn): |
| """A pipeline of decorators to apply to our metric constructors.""" |
| return _OptionalFieldSpec(_ImportSafe(_Indirect(fn))) |
| |
| |
| # This is needed for the reset_after flag used by @Indirect. |
| # pylint: disable=unused-argument |
| |
| |
| @_Metric |
| def CounterMetric( |
| name, |
| reset_after=False, |
| description=None, |
| field_spec=_MISSING, |
| start_time=None, |
| ): |
| """Returns a metric handle for a counter named |name|.""" |
| return ts_mon.CounterMetric( |
| name, |
| description=description, |
| field_spec=field_spec, |
| start_time=start_time, |
| ) |
| |
| |
| Counter = CounterMetric |
| |
| |
| @_Metric |
| def GaugeMetric(name, reset_after=False, description=None, field_spec=_MISSING): |
| """Returns a metric handle for a gauge named |name|.""" |
| return ts_mon.GaugeMetric( |
| name, description=description, field_spec=field_spec |
| ) |
| |
| |
| Gauge = GaugeMetric |
| |
| |
| @_Metric |
| def CumulativeMetric( |
| name, reset_after=False, description=None, field_spec=_MISSING |
| ): |
| """Returns a metric handle for a cumulative float named |name|.""" |
| return ts_mon.CumulativeMetric( |
| name, description=description, field_spec=field_spec |
| ) |
| |
| |
| @_Metric |
| def StringMetric( |
| name, reset_after=False, description=None, field_spec=_MISSING |
| ): |
| """Returns a metric handle for a string named |name|.""" |
| return ts_mon.StringMetric( |
| name, description=description, field_spec=field_spec |
| ) |
| |
| |
| String = StringMetric |
| |
| |
| @_Metric |
| def BooleanMetric( |
| name, reset_after=False, description=None, field_spec=_MISSING |
| ): |
| """Returns a metric handle for a boolean named |name|.""" |
| return ts_mon.BooleanMetric( |
| name, description=description, field_spec=field_spec |
| ) |
| |
| |
| Boolean = BooleanMetric |
| |
| |
| @_Metric |
| def FloatMetric(name, reset_after=False, description=None, field_spec=_MISSING): |
| """Returns a metric handle for a float named |name|.""" |
| return ts_mon.FloatMetric( |
| name, description=description, field_spec=field_spec |
| ) |
| |
| |
| Float = FloatMetric |
| |
| |
| @_Metric |
| def CumulativeDistributionMetric( |
| name, |
| reset_after=False, |
| description=None, |
| bucketer=None, |
| field_spec=_MISSING, |
| ): |
| """Returns a metric handle for a cumulative distribution named |name|.""" |
| return ts_mon.CumulativeDistributionMetric( |
| name, description=description, bucketer=bucketer, field_spec=field_spec |
| ) |
| |
| |
| CumulativeDistribution = CumulativeDistributionMetric |
| |
| |
| @_Metric |
| def DistributionMetric( |
| name, |
| reset_after=False, |
| description=None, |
| bucketer=None, |
| field_spec=_MISSING, |
| ): |
| """Returns a metric handle for a distribution named |name|.""" |
| return ts_mon.NonCumulativeDistributionMetric( |
| name, description=description, bucketer=bucketer, field_spec=field_spec |
| ) |
| |
| |
| Distribution = DistributionMetric |
| |
| |
| @_Metric |
| def CumulativeSmallIntegerDistribution( |
| name, reset_after=False, description=None, field_spec=_MISSING |
| ): |
| """Returns a metric handle for a cumulative distribution named |name|. |
| |
| This differs slightly from CumulativeDistribution, in that the underlying |
| metric uses a uniform bucketer rather than a geometric one. |
| |
| This metric type is suitable for holding a distribution of numbers that are |
| nonnegative integers in the range of 0 to 100. |
| """ |
| return ts_mon.CumulativeDistributionMetric( |
| name, |
| bucketer=ts_mon.FixedWidthBucketer(1), |
| description=description, |
| field_spec=field_spec, |
| ) |
| |
| |
| @_Metric |
| def CumulativeSecondsDistribution( |
| name, scale=1, reset_after=False, description=None, field_spec=_MISSING |
| ): |
| """Returns a metric handle for a cumulative distribution named |name|. |
| |
| The distribution handle returned by this method is better suited than the |
| default one for recording handling times, in seconds. |
| |
| This metric handle has bucketing that is optimized for time intervals |
| (in seconds) in the range of 1 second to 32 days. Use |scale| to adjust this |
| (e.g. scale=0.1 covers a range from .1 seconds to 3.2 days). |
| |
| Args: |
| name: string name of metric |
| scale: scaling factor of buckets, and size of the first bucket. |
| default: 1 |
| reset_after: Should the metric be reset after reporting. |
| description: A string description of the metric. |
| field_spec: A sequence of ts_mon.Field objects to specify the field |
| schema. |
| """ |
| b = ts_mon.GeometricBucketer( |
| growth_factor=_SECONDS_BUCKET_FACTOR, scale=scale |
| ) |
| return ts_mon.CumulativeDistributionMetric( |
| name, |
| bucketer=b, |
| units=ts_mon.MetricsDataUnits.SECONDS, |
| description=description, |
| field_spec=field_spec, |
| ) |
| |
| |
| SecondsDistribution = CumulativeSecondsDistribution |
| |
| |
| @_Metric |
| def PercentageDistribution( |
| name, |
| num_buckets=1000, |
| reset_after=False, |
| description=None, |
| field_spec=_MISSING, |
| ): |
| """Returns a metric handle for a cumulative distribution for percentage. |
| |
| The distribution handle returned by this method is better suited for |
| reporting percentage values than the default one. The bucketing is optimized |
| for values in [0,100]. |
| |
| Args: |
| name: The name of this metric. |
| num_buckets: This metric buckets the percentage values before |
| reporting. This argument controls the number of the bucket the range |
| [0,100] is divided in. The default gives you 0.1% resolution. |
| reset_after: Should the metric be reset after reporting. |
| description: A string description of the metric. |
| field_spec: A sequence of ts_mon.Field objects to specify the field |
| schema. |
| """ |
| # The last bucket actually covers [100, 100 + 1.0/num_buckets), so it |
| # corresponds to values that exactly match 100%. |
| bucket_width = 100 / num_buckets |
| b = ts_mon.FixedWidthBucketer(bucket_width, num_buckets) |
| return ts_mon.CumulativeDistributionMetric( |
| name, bucketer=b, description=description, field_spec=field_spec |
| ) |
| |
| |
| @contextlib.contextmanager |
| def SecondsTimer( |
| name, |
| fields=None, |
| description=None, |
| field_spec=_MISSING, |
| scale=1, |
| record_on_exception=True, |
| add_exception_field=False, |
| ): |
| """Record the time of an operation to a CumulativeSecondsDistributionMetric. |
| |
| Records the time taken inside the context block, to the |
| CumulativeSecondsDistribution named |name|, with the given fields. |
| |
| Examples: |
| # Time the doSomething() call, with field values that are independent of |
| # the results of the operation. |
| with SecondsTimer( |
| "timer/name", |
| fields={"foo": "bar"}, |
| description="My timer", |
| field_spec=[ |
| ts_mon.StringField("foo"), |
| ts_mon.BooleanField("success"), |
| ], |
| ): |
| doSomething() |
| |
| # Time the doSomethingElse call, with field values that depend on the |
| # results of that operation. Note that it is important that a default |
| # value is specified for these fields, in case an exception is thrown by |
| # doSomethingElse() |
| f = {'success': False, 'foo': 'bar'} |
| with SecondsTimer( |
| "timer/name", |
| fields=f, |
| description="My timer", |
| field_spec=[ts_mon.StringField("foo")], |
| ) as c: |
| doSomethingElse() |
| c["success"] = True |
| |
| # Incorrect Usage! |
| with SecondsTimer('timer/name', description='My timer') as c: |
| doSomething() |
| c['foo'] = bar # 'foo' is not a valid field, because no default |
| # value for it was specified in the context |
| # constructor. It will be silently ignored. |
| |
| Args: |
| name: The name of the metric to create |
| fields: The fields of the metric to create. |
| description: A string description of the metric. |
| field_spec: A sequence of ts_mon.Field objects to specify the field |
| schema. |
| scale: A float to scale the CumulativeSecondsDistribution buckets by. |
| record_on_exception: Whether to record metrics if an exception is |
| raised. |
| add_exception_field: Whether to add a |
| BooleanField('encountered_exception') to the FieldSpec provided, and |
| set its value to True iff an exception was raised in the context. |
| """ |
| if field_spec is not None and field_spec is not _MISSING: |
| field_spec.append(ts_mon.BooleanField("encountered_exception")) |
| |
| m = CumulativeSecondsDistribution( |
| name, scale=scale, description=description, field_spec=field_spec |
| ) |
| f = fields or {} |
| f = dict(f) |
| keys = list(f) |
| t0 = _GetSystemClock() |
| |
| error = True |
| try: |
| yield f |
| error = False |
| finally: |
| if record_on_exception and add_exception_field: |
| keys.append("encountered_exception") |
| f.setdefault("encountered_exception", error) |
| # Filter out keys that were not part of the initial key set. This is to |
| # avoid inconsistent fields. |
| # TODO(akeshet): Doing this filtering isn't super efficient. Would be |
| # better to implement some key-restricted subclass or wrapper around |
| # dict, and just yield that above rather than yielding a regular dict. |
| if record_on_exception or not error: |
| dt = _GetSystemClock() - t0 |
| # TODO(ayatane): Handle backward clock jumps. See _GetSystemClock. |
| if dt >= 0: |
| m.add(dt, fields={k: f[k] for k in keys}) |
| |
| |
| def SecondsTimerDecorator( |
| name, |
| fields=None, |
| description=None, |
| field_spec=_MISSING, |
| scale=1, |
| record_on_exception=True, |
| add_exception_field=False, |
| ): |
| """Decorator to time the duration of function calls. |
| |
| Examples: |
| @SecondsTimerDecorator( |
| "timer/name", |
| fields={"foo": "bar"}, |
| description="My timer", |
| field_spec=[ts_mon.StringField("foo")], |
| ) |
| def Foo(bar): |
| return doStuff() |
| |
| is equivalent to |
| |
| def Foo(bar): |
| with SecondsTimer( |
| "timer/name", |
| fields={"foo": "bar"}, |
| description="My timer", |
| field_spec=[ts_mon.StringField("foo")], |
| ): |
| return doStuff() |
| |
| Args: |
| name: The name of the metric to create. |
| fields: The fields of the metric to create. |
| description: A string description of the metric. |
| field_spec: A sequence of ts_mon.Field objects to specify the field |
| schema. |
| scale: A float to scale the distribution by. |
| record_on_exception: Whether to record metrics if an exception is |
| raised. |
| add_exception_field: Whether to add a |
| BooleanField('encountered_exception') to the FieldSpec provided, and |
| set its value to True iff an exception was raised in the context. |
| """ |
| |
| def decorator(fn): |
| @wraps(fn) |
| def wrapper(*args, **kwargs): |
| with SecondsTimer( |
| name, |
| fields=fields, |
| description=description, |
| field_spec=field_spec, |
| scale=scale, |
| record_on_exception=record_on_exception, |
| add_exception_field=add_exception_field, |
| ): |
| return fn(*args, **kwargs) |
| |
| return wrapper |
| |
| return decorator |
| |
| |
| @contextlib.contextmanager |
| def SecondsInstanceTimer( |
| name, |
| fields=None, |
| description=None, |
| field_spec=_MISSING, |
| record_on_exception=True, |
| add_exception_field=False, |
| ): |
| """Record the time of an operation to a FloatMetric. |
| |
| Records the time taken inside of the context block, to the |
| Float metric named |name|, with the given fields. This is |
| a non-cumulative metric; this represents the absolute time |
| taken for a specific block. The duration is stored in a float |
| to provide flexibility in the future for higher accuracy. |
| |
| Examples: |
| # Time the doSomething() call, with field values that are independent of |
| # the results of the operation. |
| with SecondsInstanceTimer( |
| "timer/name", |
| fields={"foo": "bar"}, |
| description="My timer", |
| field_spec=[ |
| ts_mon.StringField("foo"), |
| ts_mon.BooleanField("success"), |
| ], |
| ): |
| doSomething() |
| |
| # Time the doSomethingElse call, with field values that depend on the |
| # results of that operation. Note that it is important that a default |
| # value is specified for these fields, in case an exception is thrown by |
| # doSomethingElse() |
| f = {"success": False, "foo": "bar"} |
| with SecondsInstanceTimer( |
| "timer/name", |
| fields=f, |
| description="My timer", |
| field_spec=[ts_mon.StringField("foo")], |
| ) as c: |
| doSomethingElse() |
| c["success"] = True |
| |
| # Incorrect Usage! |
| with SecondsInstanceTimer('timer/name', description='My timer') as c: |
| doSomething() |
| c['foo'] = bar # 'foo' is not a valid field, because no default |
| # value for it was specified in the context |
| # constructor. It will be silently ignored. |
| |
| Args: |
| name: The name of the metric to create |
| fields: The fields of the metric to create. |
| description: A string description of the metric. |
| field_spec: A sequence of ts_mon.Field objects to specify the field |
| schema. |
| record_on_exception: Whether to record metrics if an exception is |
| raised. |
| add_exception_field: Whether to add a |
| BooleanField('encountered_exception') to the FieldSpec provided, and |
| set its value to True iff an exception was raised in the context. |
| |
| Yields: |
| Float based metric measuring the duration of execution. |
| """ |
| if field_spec is not None and field_spec is not _MISSING: |
| field_spec.append(ts_mon.BooleanField("encountered_exception")) |
| |
| m = FloatMetric(name, description=description, field_spec=field_spec) |
| f = dict(fields or {}) |
| keys = list(f) |
| t0 = _GetSystemClock() |
| |
| error = True |
| try: |
| yield f |
| error = False |
| finally: |
| if record_on_exception and add_exception_field: |
| keys.append("encountered_exception") |
| f.setdefault("encountered_exception", error) |
| # Filter out keys that were not part of the initial key set. This is to |
| # avoid inconsistent fields. |
| # TODO(akeshet): Doing this filtering isn't super efficient. Would be |
| # better to implement some key-restricted subclass or wrapper around |
| # dict, and just yield that above rather than yielding a regular dict. |
| if record_on_exception or not error: |
| dt = _GetSystemClock() - t0 |
| m.set(dt, fields={k: f[k] for k in keys}) |
| |
| |
| def SecondsInstanceTimerDecorator( |
| name, |
| fields=None, |
| description=None, |
| field_spec=_MISSING, |
| record_on_exception=True, |
| add_exception_field=False, |
| ): |
| """Decorator to time the gauge duration of function calls. |
| |
| Examples: |
| @SecondsInstanceTimerDecorator( |
| "timer/name", |
| fields={"foo": "bar"}, |
| description="My timer", |
| field_spec=[ |
| ts_mon.StringField("foo"), |
| ts_mon.BooleanField("success"), |
| ], |
| ) |
| def Foo(bar): |
| return doStuff() |
| |
| is equivalent to |
| |
| def Foo(bar): |
| with SecondsInstanceTimer( |
| "timer/name", |
| fields={"foo": "bar"}, |
| description="My timer", |
| field_spec=[ |
| ts_mon.StringField("foo"), |
| ts_mon.BooleanField("success"), |
| ], |
| ): |
| return doStuff() |
| |
| Args: |
| name: The name of the metric to create |
| fields: The fields of the metric to create |
| description: A string description of the metric. |
| field_spec: A sequence of ts_mon.Field objects to specify the field |
| schema. |
| record_on_exception: Whether to record metrics if an exception is |
| raised. |
| add_exception_field: Whether to add a |
| BooleanField('encountered_exception') to the FieldSpec provided, and |
| set its value to True iff an exception was raised in the context. |
| |
| Returns: |
| A SecondsInstanceTimer metric decorator. |
| """ |
| |
| def decorator(fn): |
| @wraps(fn) |
| def wrapper(*args, **kwargs): |
| with SecondsInstanceTimer( |
| name, |
| fields=fields, |
| description=description, |
| field_spec=field_spec, |
| record_on_exception=record_on_exception, |
| add_exception_field=add_exception_field, |
| ): |
| return fn(*args, **kwargs) |
| |
| return wrapper |
| |
| return decorator |
| |
| |
| @contextlib.contextmanager |
| def SuccessCounter(name, fields=None, description=None, field_spec=_MISSING): |
| """Create a counter that tracks if something succeeds. |
| |
| Args: |
| name: The name of the metric to create |
| fields: The fields of the metric |
| description: A string description of the metric. |
| field_spec: A sequence of ts_mon.Field objects to specify the field |
| schema. |
| """ |
| c = Counter(name) |
| f = fields or {} |
| f = f.copy() |
| # We add in the additional field success. |
| keys = list(f) + ["success"] |
| success = False |
| try: |
| yield f |
| success = True |
| finally: |
| f.setdefault("success", success) |
| f = {k: f[k] for k in keys} |
| c.increment(fields=f) |
| |
| |
| @contextlib.contextmanager |
| def Presence(name, fields=None, description=None, field_spec=_MISSING): |
| """A counter of 'active' things. |
| |
| This keeps track of how many names are active at any given time. However, |
| it's only suitable for long-running tasks, since the initial true value may |
| never be written out if the task doesn't run for at least a minute. |
| """ |
| b = Boolean(name, description=None, field_spec=field_spec) |
| b.set(True, fields=fields) |
| try: |
| yield |
| finally: |
| b.set(False, fields=fields) |
| |
| |
| class RuntimeBreakdownTimer: |
| """Record the time of an operation and the breakdown into sub-steps. |
| |
| Examples: |
| with RuntimeBreakdownTimer( |
| "timer/name", |
| fields={"foo": "bar"}, |
| description="My timer", |
| field_spec=[ts_mon.StringField("foo")], |
| ) as timer: |
| with timer.Step("first_step"): |
| doFirstStep() |
| with timer.Step("second_step"): |
| doSecondStep() |
| # The time spent next will show up under |
| # .../timer/name/breakdown_no_step |
| doSomeNonStepWork() |
| |
| This will emit the following metrics: |
| - .../timer/name/total_duration - A CumulativeSecondsDistribution metric for |
| the time spent inside the outer with block. |
| - .../timer/name/breakdown/first_step and |
| .../timer/name/breakdown/second_step - PercentageDistribution metrics |
| for the fraction of time devoted to each substep. |
| - .../timer/name/breakdown_unaccounted - PercentageDistribution metric for |
| the fraction of time that is not accounted for in any of the substeps. |
| - .../timer/name/bucketing_loss - PercentageDistribution metric buckets |
| values before reporting them as distributions. This causes small errors |
| in the reported values because they are rounded to the reported buckets |
| lower bound. This is a CumulativeMetric measuring the total rounding |
| error accrued in reporting all the percentages. The worst case bucketing |
| loss for x steps is (x+1)/10. So, if you time across 9 steps, you should |
| expect no more than 1% rounding error. |
| [experimental] |
| - .../timer/name/duration_breakdown - A Float metric, with one stream per |
| Step indicating the ratio of time spent in that step. The different |
| steps are differentiated via a field with key 'step_name'. Since some of |
| the time can be spent outside any steps, these ratios will sum to <= 1. |
| |
| NB: This helper can only be used if the field values are known at the |
| beginning of the outer context and do not change as a result of any of the |
| operations timed. |
| """ |
| |
| PERCENT_BUCKET_COUNT = 1000 |
| |
| _StepMetrics = collections.namedtuple("_StepMetrics", ["name", "time_s"]) |
| |
| def __init__( |
| self, name, fields=None, description=None, field_spec=_MISSING |
| ) -> None: |
| self._name = name |
| self._fields = fields |
| self._field_spec = field_spec |
| self._description = description |
| self._outer_t0 = None |
| self._total_time_s = 0 |
| self._inside_step = False |
| self._step_metrics = [] |
| |
| def __enter__(self): |
| self._outer_t0 = _GetSystemClock() |
| return self |
| |
| def __exit__(self, _type, _value, _traceback) -> None: |
| self._RecordTotalTime() |
| |
| outer_timer = CumulativeSecondsDistribution( |
| "%s/total_duration" % (self._name,), |
| field_spec=self._field_spec, |
| description=self._description, |
| ) |
| outer_timer.add(self._total_time_s, fields=self._fields) |
| |
| for name, percent in self._GetStepBreakdowns().items(): |
| step_metric = PercentageDistribution( |
| "%s/breakdown/%s" % (self._name, name), |
| num_buckets=self.PERCENT_BUCKET_COUNT, |
| field_spec=self._field_spec, |
| description=self._description, |
| ) |
| step_metric.add(percent, fields=self._fields) |
| |
| fields = dict(self._fields) if self._fields is not None else {} |
| fields["step_name"] = name |
| # TODO(pprabhu): Convert _GetStepBreakdowns() to return ratios |
| # instead of percentage when the old PercentageDistribution |
| # reporting is deleted. |
| Float("%s/duration_breakdown" % self._name).set( |
| percent / 100, fields=fields |
| ) |
| |
| unaccounted_metric = PercentageDistribution( |
| "%s/breakdown_unaccounted" % self._name, |
| num_buckets=self.PERCENT_BUCKET_COUNT, |
| field_spec=self._field_spec, |
| description=self._description, |
| ) |
| unaccounted_metric.add( |
| self._GetUnaccountedBreakdown(), fields=self._fields |
| ) |
| |
| bucketing_loss_metric = CumulativeMetric( |
| "%s/bucketing_loss" % self._name, |
| field_spec=self._field_spec, |
| description=self._description, |
| ) |
| bucketing_loss_metric.increment_by( |
| self._GetBucketingLoss(), fields=self._fields |
| ) |
| |
| @contextlib.contextmanager |
| def Step(self, step_name): |
| """Start a new step named step_name in the timed operation. |
| |
| Note that it is not possible to start a step inside a step. i.e., |
| |
| with RuntimeBreakdownTimer('timer') as timer: |
| with timer.Step('outer_step'): |
| with timer.Step('inner_step'): |
| # will by design raise an exception. |
| |
| Args: |
| step_name: The name of the step being timed. |
| """ |
| if self._inside_step: |
| logging.error( |
| "RuntimeBreakdownTimer.Step is not reentrant. " |
| "Dropping step: %s", |
| step_name, |
| ) |
| yield |
| return |
| |
| self._inside_step = True |
| t0 = _GetSystemClock() |
| try: |
| yield |
| finally: |
| self._inside_step = False |
| step_time_s = _GetSystemClock() - t0 |
| # TODO(ayatane): Handle backward clock jumps. See _GetSystemClock. |
| step_time_s = max(0, step_time_s) |
| self._step_metrics.append(self._StepMetrics(step_name, step_time_s)) |
| |
| def _GetStepBreakdowns(self): |
| """Returns percentage of time spent in each step. |
| |
| Must be called after |_RecordTotalTime|. |
| """ |
| if not self._total_time_s: |
| return {} |
| return { |
| x.name: (x.time_s * 100) / self._total_time_s |
| for x in self._step_metrics |
| } |
| |
| def _GetUnaccountedBreakdown(self): |
| """Returns the percentage time spent outside all steps. |
| |
| Must be called after |_RecordTotalTime|. |
| """ |
| breakdown_percentages = sum(self._GetStepBreakdowns().values()) |
| return max(0, 100 - breakdown_percentages) |
| |
| def _GetBucketingLoss(self): |
| """Compute the actual loss in reported percentages due to bucketing. |
| |
| Must be called after |_RecordTotalTime|. |
| """ |
| reported = list(self._GetStepBreakdowns().values()) |
| reported.append(self._GetUnaccountedBreakdown()) |
| bucket_width = 100 / self.PERCENT_BUCKET_COUNT |
| return sum(x % bucket_width for x in reported) |
| |
| def _RecordTotalTime(self) -> None: |
| self._total_time_s = _GetSystemClock() - self._outer_t0 |
| # TODO(ayatane): Handle backward clock jumps. See _GetSystemClock. |
| self._total_time_s = max(0, self._total_time_s) |
| |
| |
| def _GetSystemClock(): |
| """Return a clock time. |
| |
| The only thing that the return value can be used for is to subtract from |
| other instances to determine time elapsed. |
| """ |
| # TODO(ayatane): We should use a monotonic clock to measure this, |
| # but Python 2 does not have one. |
| return time.time() |
| |
| |
| def Flush(reset_after=()) -> None: |
| """Flushes metrics, but warns on transient errors. |
| |
| Args: |
| reset_after: A list of metrics to reset after flushing. |
| """ |
| if not ts_mon: |
| return |
| |
| try: |
| ts_mon.flush() |
| while reset_after: |
| reset_after.pop().reset() |
| except ssl.SSLError as e: |
| logging.warning("Caught transient network error while flushing: %s", e) |
| except Exception as e: |
| logging.error("Caught exception while flushing: %s", e) |