| # Copyright 2016 The LUCI Authors. All rights reserved. |
| # Use of this source code is governed under the Apache License, Version 2.0 |
| # that can be found in the LICENSE file. |
| |
| """Timeseries metrics.""" |
| |
| from collections import defaultdict |
| import datetime |
| import json |
| import logging |
| |
| from google.appengine.datastore.datastore_query import Cursor |
| |
| from components import utils |
| import gae_ts_mon |
| |
| from server import bot_management |
| from server import task_result |
| |
| # - android_devices is a side effect of the health of each Android devices |
| # connected to the bot. |
| # - caches has an unbounded matrix. |
| # - server_version is the current server version. It'd be good to have but the |
| # current monitoring pipeline is not adapted for this. |
| # - id is unique for each bot. |
| # - temp_band is android specific. |
| # Keep in sync with ../swarming_bot/bot_code/bot_main.py |
| _IGNORED_DIMENSIONS = ('android_devices', 'caches', 'id', 'server_version', |
| 'temp_band') |
| # Real timeout is 60s, keep it slightly under to bail out early. |
| _REQUEST_TIMEOUT_SEC = 50 |
| # Cap the max number of items per taskqueue task, to keep the total |
| # number of collected streams managable within each instance. |
| _EXECUTORS_PER_SHARD = 500 |
| _JOBS_PER_SHARD = 500 |
| |
| # Override default target fields for app-global metrics. |
| _TARGET_FIELDS = { |
| 'job_name': '', # module name |
| 'hostname': '', # version |
| 'task_num': 0, # instance ID |
| } |
| |
| |
| ### All the metrics. |
| |
| |
| # Custom bucketer with 12% resolution in the range of 1..10**5. Used for job |
| # cycle times. |
| _bucketer = gae_ts_mon.GeometricBucketer(growth_factor=10**0.05, |
| num_finite_buckets=100) |
| |
| # Regular (instance-local) metrics: jobs/completed and jobs/durations. |
| # Both have the following metric fields: |
| # - project_id: e.g. 'chromium'. |
| # - subproject_id: e.g. 'blink'. Set to empty string if not used. |
| # - pool: e.g. 'Chrome'. |
| # - spec_name: name of a job specification. |
| # - result: one of 'success', 'failure', or 'infra-failure'. |
| _jobs_completed = gae_ts_mon.CounterMetric( |
| 'jobs/completed', |
| 'Number of completed jobs.', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| gae_ts_mon.StringField('result'), |
| gae_ts_mon.StringField('status'), |
| ]) |
| |
| |
| _jobs_durations = gae_ts_mon.CumulativeDistributionMetric( |
| 'jobs/durations', |
| 'Cycle times of completed jobs, in seconds.', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| gae_ts_mon.StringField('result'), |
| ], |
| bucketer=_bucketer) |
| |
| |
| # Similar to jobs/completed and jobs/duration, but with a dedup field. |
| # - project_id: e.g. 'chromium' |
| # - subproject_id: e.g. 'blink'. Set to empty string if not used. |
| # - pool: e.g. 'Chrome' |
| # - spec_name: name of a job specification. |
| # - deduped: boolean describing whether the job was deduped or not. |
| _jobs_requested = gae_ts_mon.CounterMetric( |
| 'jobs/requested', |
| 'Number of requested jobs over time.', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| gae_ts_mon.BooleanField('deduped'), |
| ]) |
| |
| |
| # Swarming-specific metric. Metric fields: |
| # - project_id: e.g. 'chromium'. |
| # - subproject_id: e.g. 'blink'. Set to empty string if not used. |
| # - pool: e.g. 'Chrome'. |
| # - spec_name: name of a job specification. |
| _tasks_expired = gae_ts_mon.CounterMetric( |
| 'swarming/tasks/expired', 'Number of expired tasks', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| ]) |
| |
| |
| # Swarming-specific metric. Metric fields: |
| # - project_id: e.g. 'chromium-swarm' |
| _tasks_expiration_delay = gae_ts_mon.CumulativeDistributionMetric( |
| 'swarming/tasks/expiration_delay', |
| 'Delay of task expiration, in seconds.', [ |
| gae_ts_mon.StringField('project_id'), |
| ]) |
| |
| |
| # Swarming-specific metric. Metric fields: |
| # - project_id: e.g. 'chromium-swarm' |
| _tasks_slice_expiration_delay = gae_ts_mon.CumulativeDistributionMetric( |
| 'swarming/tasks/slice_expiration_delay', |
| 'Delay of task slice expiration, in seconds.', |
| [ |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.IntegerField('slice_index'), |
| ], |
| bucketer=gae_ts_mon.FixedWidthBucketer(width=30), |
| ) |
| |
| |
| _task_bots_runnable = gae_ts_mon.CumulativeDistributionMetric( |
| 'swarming/tasks/bots_runnable', |
| 'Number of bots available to run tasks.', [ |
| gae_ts_mon.StringField('pool'), |
| ], |
| ) |
| |
| |
| # Global metric. Metric fields:. |
| # - project_id: e.g. 'chromium'. |
| # - subproject_id: e.g. 'blink'. Set to empty string if not used. |
| # - pool: e.g. 'Chrome'. |
| # - spec_name: name of a job specification. |
| # Override target field: |
| # - hostname: 'autogen:<executor_id>': name of the bot that executed a job, |
| # or an empty string. e.g. 'autogen:swarm42-m4'. |
| # Value should be 'pending' or 'running'. Completed / canceled jobs should not |
| # send this metric. |
| _jobs_running = gae_ts_mon.BooleanMetric( |
| 'jobs/running', 'Presence metric for a running job.', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| ]) |
| |
| # Global metric. Metric fields: |
| # - project_id: e.g. 'chromium'. |
| # - subproject_id: e.g. 'blink'. Set to empty string if not used. |
| # - pool: e.g. 'Chrome'. |
| # - spec_name: name of a job specification. |
| # - status: 'pending' or 'running'. |
| _jobs_active = gae_ts_mon.GaugeMetric( |
| 'jobs/active', 'Number of running, pending or otherwise active jobs.', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| gae_ts_mon.StringField('status'), |
| ]) |
| |
| |
| # Global metric. Target field: hostname = 'autogen:<executor_id>' (bot id). |
| _executors_pool = gae_ts_mon.StringMetric( |
| 'executors/pool', |
| 'Pool name for a given job executor.', |
| None) |
| |
| |
| # Global metric. Target fields: |
| # - hostname = 'autogen:<executor_id>' (bot id). |
| # Status value must be 'ready', 'running', or anything else, possibly |
| # swarming-specific, when it cannot run a job. E.g. 'quarantined' or |
| # 'dead'. |
| _executors_status = gae_ts_mon.StringMetric( |
| 'executors/status', |
| 'Status of a job executor.', |
| None) |
| |
| |
| # Global metric. Target fields: |
| # - hostname = 'autogen:<executor_id>' (bot id). |
| # Status value must be 'ready', 'running', or anything else, possibly |
| # swarming-specific, when it cannot run a job. E.g. 'quarantined' or |
| # 'dead'. |
| # Note that 'running' will report data as long as the job is running, |
| # so it is best to restrict data to status == 'pending.' |
| _jobs_pending_durations = gae_ts_mon.NonCumulativeDistributionMetric( |
| 'jobs/pending_durations', |
| 'Pending times of active jobs, in seconds.', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| gae_ts_mon.StringField('status'), |
| ], |
| bucketer=_bucketer) |
| |
| |
| # Global metric. Target fields: |
| # - hostname = 'autogen:<executor_id>' (bot id). |
| # Status value must be 'ready', 'running', or anything else, possibly |
| # swarming-specific, when it cannot run a job. E.g. 'quarantined' or |
| # 'dead'. |
| # Note that 'running' will report data as long as the job is running, |
| # so it is best to restrict data to status == 'pending.' |
| _jobs_max_pending_duration = gae_ts_mon.FloatMetric( |
| 'jobs/max_pending_duration', 'Maximum pending seconds of pending jobs.', [ |
| gae_ts_mon.StringField('spec_name'), |
| gae_ts_mon.StringField('project_id'), |
| gae_ts_mon.StringField('subproject_id'), |
| gae_ts_mon.StringField('pool'), |
| gae_ts_mon.StringField('status'), |
| ]) |
| |
| |
| # Instance metric. Metric fields: |
| # - auth_method = one of 'luci_token', 'service_account', 'ip_whitelist'. |
| # - condition = depends on the auth method (e.g. email for 'service_account'). |
| _bot_auth_successes = gae_ts_mon.CounterMetric( |
| 'swarming/bot_auth/success', |
| 'Number of successful bot authentication events', [ |
| gae_ts_mon.StringField('auth_method'), |
| gae_ts_mon.StringField('condition'), |
| ]) |
| |
| |
| ### Private stuff. |
| |
| |
| def _pool_from_dimensions(dimensions): |
| """Return a canonical string of flattened dimensions.""" |
| pairs = [] |
| for key, values in dimensions.items(): |
| if key in _IGNORED_DIMENSIONS: |
| continue |
| # Strip all the prefixes of other values. values is already sorted. |
| for i, value in enumerate(values): |
| if not any(v.startswith(value) for v in values[i+1:]): |
| pairs.append(u'%s:%s' % (key, value)) |
| return u'|'.join(sorted(pairs)) |
| |
| |
| def _set_jobs_metrics(payload): |
| params = _ShardParams(payload) |
| |
| state_map = {task_result.State.RUNNING: 'running', |
| task_result.State.PENDING: 'pending'} |
| jobs_counts = defaultdict(lambda: 0) |
| jobs_total = 0 |
| jobs_pending_distributions = defaultdict( |
| lambda: gae_ts_mon.Distribution(_bucketer)) |
| jobs_max_pending_durations = defaultdict( |
| lambda: 0.0) |
| |
| query_iter = task_result.get_result_summaries_query( |
| None, None, 'created_ts', 'pending_running', None).iter( |
| produce_cursors=True, start_cursor=params.cursor) |
| |
| while query_iter.has_next(): |
| runtime = (utils.utcnow() - params.start_time).total_seconds() |
| if jobs_total >= _JOBS_PER_SHARD or runtime > _REQUEST_TIMEOUT_SEC: |
| params.cursor = query_iter.cursor_after() |
| params.task_count += 1 |
| utils.enqueue_task( |
| '/internal/taskqueue/monitoring/tsmon/jobs', |
| 'tsmon', |
| payload=params.json()) |
| params.task_count -= 1 # For accurate logging below. |
| break |
| |
| params.count += 1 |
| jobs_total += 1 |
| summary = query_iter.next() |
| status = state_map.get(summary.state, '') |
| fields = _extract_job_fields(_tags_to_dict(summary.tags)) |
| target_fields = dict(_TARGET_FIELDS) |
| if summary.bot_id: |
| target_fields['hostname'] = 'autogen:' + summary.bot_id |
| if summary.bot_id and status == 'running': |
| _jobs_running.set(True, target_fields=target_fields, fields=fields) |
| fields['status'] = status |
| |
| key = tuple(sorted(fields.items())) |
| |
| jobs_counts[key] += 1 |
| |
| pending_duration = summary.pending_now(utils.utcnow()) |
| if pending_duration is not None: |
| jobs_pending_distributions[key].add(pending_duration.total_seconds()) |
| jobs_max_pending_durations[key] = max( |
| jobs_max_pending_durations[key], |
| pending_duration.total_seconds()) |
| |
| logging.debug( |
| '_set_jobs_metrics: task %d started at %s, processed %d jobs (%d total)', |
| params.task_count, params.task_start, jobs_total, params.count) |
| |
| # Global counts are sharded by task_num and aggregated in queries. |
| target_fields = dict(_TARGET_FIELDS) |
| target_fields['task_num'] = params.task_count |
| |
| for key, count in jobs_counts.items(): |
| _jobs_active.set(count, target_fields=target_fields, fields=dict(key)) |
| |
| for key, distribution in jobs_pending_distributions.items(): |
| _jobs_pending_durations.set( |
| distribution, target_fields=target_fields, fields=dict(key)) |
| |
| for key, val in jobs_max_pending_durations.items(): |
| _jobs_max_pending_duration.set( |
| val, target_fields=target_fields, fields=dict(key)) |
| |
| |
| def _set_executors_metrics(payload): |
| params = _ShardParams(payload) |
| query_iter = bot_management.BotInfo.query().iter( |
| produce_cursors=True, start_cursor=params.cursor) |
| |
| executors_count = 0 |
| while query_iter.has_next(): |
| runtime = (utils.utcnow() - params.start_time).total_seconds() |
| if (executors_count >= _EXECUTORS_PER_SHARD or |
| runtime > _REQUEST_TIMEOUT_SEC): |
| params.cursor = query_iter.cursor_after() |
| params.task_count += 1 |
| utils.enqueue_task( |
| '/internal/taskqueue/monitoring/tsmon/executors', |
| 'tsmon', |
| payload=params.json()) |
| params.task_count -= 1 # For accurate logging below. |
| break |
| |
| params.count += 1 |
| executors_count += 1 |
| bot_info = query_iter.next() |
| status = 'ready' |
| if bot_info.task_id: |
| status = 'running' |
| elif bot_info.quarantined: |
| status = 'quarantined' |
| elif bot_info.is_dead: |
| status = 'dead' |
| elif bot_info.state and bot_info.state.get('maintenance', False): |
| status = 'maintenance' |
| |
| target_fields = dict(_TARGET_FIELDS) |
| target_fields['hostname'] = 'autogen:' + bot_info.id |
| |
| _executors_status.set(status, target_fields=target_fields) |
| _executors_pool.set( |
| _pool_from_dimensions(bot_info.dimensions), target_fields=target_fields) |
| |
| logging.debug( |
| '%s: task %d started at %s, processed %d bots (%d total)', |
| '_set_executors_metrics', params.task_count, params.task_start, |
| executors_count, params.count) |
| |
| |
| def _set_global_metrics(): |
| utils.enqueue_task('/internal/taskqueue/monitoring/tsmon/jobs', 'tsmon') |
| utils.enqueue_task('/internal/taskqueue/monitoring/tsmon/executors', 'tsmon') |
| |
| |
| class _ShardParams(object): |
| """Parameters for a chain of taskqueue tasks.""" |
| def __init__(self, payload): |
| self.start_time = utils.utcnow() |
| self.cursor = None |
| self.task_start = self.start_time |
| self.task_count = 0 |
| self.count = 0 |
| if not payload: |
| return |
| try: |
| params = json.loads(payload) |
| if params['cursor']: |
| self.cursor = Cursor(urlsafe=params['cursor']) |
| self.task_start = datetime.datetime.strptime( |
| params['task_start'], utils.DATETIME_FORMAT) |
| self.task_count = params['task_count'] |
| self.count = params['count'] |
| except (ValueError, KeyError) as e: |
| logging.error('_ShardParams: bad JSON: %s: %s', payload, e) |
| # Stop the task chain and let the request fail. |
| raise |
| |
| def json(self): |
| return utils.encode_to_json({ |
| 'cursor': self.cursor.urlsafe() if self.cursor else None, |
| 'task_start': self.task_start, |
| 'task_count': self.task_count, |
| 'count': self.count, |
| }) |
| |
| |
| def _tags_to_dict(tags): |
| """Converts list of string tags to dict. |
| |
| Args: |
| tags (list of str): list of 'key:value' strings. |
| """ |
| tags_dict = {} |
| for tag in tags: |
| try: |
| key, value = tag.split(':', 1) |
| tags_dict[key] = value |
| except ValueError: |
| pass |
| return tags_dict |
| |
| |
| def _extract_job_fields(tags_dict): |
| """Extracts common job's metric fields from TaskResultSummary. |
| |
| Args: |
| tags_dict: tags dictionary. |
| """ |
| spec_name = tags_dict.get('spec_name') |
| if not spec_name: |
| spec_name = tags_dict.get('buildername', '') |
| if tags_dict.get('build_is_experimental') == 'true': |
| spec_name += ':experimental' |
| |
| fields = { |
| 'project_id': tags_dict.get('project', ''), |
| 'subproject_id': tags_dict.get('subproject', ''), |
| 'pool': tags_dict.get('pool', ''), |
| 'spec_name': spec_name, |
| } |
| return fields |
| |
| |
| ### Public API. |
| |
| |
| def on_task_requested(summary, deduped): |
| """When a task is created.""" |
| fields = _extract_job_fields(_tags_to_dict(summary.tags)) |
| fields['deduped'] = deduped |
| _jobs_requested.increment(fields=fields) |
| |
| |
| def on_task_completed(summary): |
| """When a task is stopped from being processed.""" |
| fields = _extract_job_fields(_tags_to_dict(summary.tags)) |
| if summary.state == task_result.State.EXPIRED: |
| _tasks_expired.increment(fields=fields) |
| return |
| |
| if summary.internal_failure: |
| fields['result'] = 'infra-failure' |
| elif summary.failure: |
| fields['result'] = 'failure' |
| else: |
| fields['result'] = 'success' |
| |
| completed_fields = fields.copy() |
| completed_fields['status'] = task_result.State.to_string(summary.state) |
| _jobs_completed.increment(fields=completed_fields) |
| if summary.duration is not None: |
| _jobs_durations.add(summary.duration, fields=fields) |
| |
| |
| def on_task_expired(summary, task_to_run): |
| """When a task slice is expired.""" |
| tags_dict = _tags_to_dict(summary.tags) |
| fields = {'project_id': tags_dict.get('project', '')} |
| |
| # slice expiration delay |
| _tasks_slice_expiration_delay.add( |
| task_to_run.expiration_delay, |
| fields=dict(fields, slice_index=task_to_run.task_slice_index)) |
| |
| # task expiration delay |
| if summary.expiration_delay: |
| _tasks_expiration_delay.add(summary.expiration_delay, fields=fields) |
| |
| |
| def on_bot_auth_success(auth_method, condition): |
| _bot_auth_successes.increment(fields={ |
| 'auth_method': auth_method, |
| 'condition': condition, |
| }) |
| |
| |
| def set_global_metrics(kind, payload=None): |
| if kind == 'jobs': |
| _set_jobs_metrics(payload) |
| elif kind == 'executors': |
| _set_executors_metrics(payload) |
| else: |
| logging.error('set_global_metrics(kind=%s): unknown kind.', kind) |
| |
| |
| def initialize(): |
| # These metrics are the ones that are reset everything they are flushed. |
| gae_ts_mon.register_global_metrics([ |
| _executors_pool, |
| _executors_status, |
| _jobs_active, |
| _jobs_max_pending_duration, |
| _jobs_pending_durations, |
| _jobs_running, |
| ]) |
| gae_ts_mon.register_global_metrics_callback('callback', _set_global_metrics) |