blob: 2da0eb6966c7c311458312612c01a4888ca2990a [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Processes tests and creates new Anomaly entities.
This module contains the ProcessTest function, which searches the recent
points in a test for potential regressions or improvements, and creates
new Anomaly entities.
"""
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import logging
from google.appengine.ext import deferred
from google.appengine.ext import ndb
from dashboard import email_sheriff
from dashboard import find_change_points
from dashboard.common import utils
from dashboard.models import alert_group
from dashboard.models import anomaly
from dashboard.models import anomaly_config
from dashboard.models import graph_data
from dashboard.models import histogram
from dashboard.models import subscription
from dashboard.sheriff_config_client import SheriffConfigClient
from tracing.value.diagnostics import reserved_infos
# Number of points to fetch and pass to FindChangePoints. A different number
# may be used if a test has a "max_window_size" anomaly config parameter.
DEFAULT_NUM_POINTS = 50
@ndb.synctasklet
def ProcessTests(test_keys):
"""Processes a list of tests to find new anoamlies.
Args:
test_keys: A list of TestMetadata ndb.Key's.
"""
yield ProcessTestsAsync(test_keys)
@ndb.tasklet
def ProcessTestsAsync(test_keys):
# Using a parallel yield here let's the tasklets for each _ProcessTest run
# in parallel.
yield [_ProcessTest(k) for k in test_keys]
@ndb.tasklet
def _ProcessTest(test_key):
"""Processes a test to find new anomalies.
Args:
test_key: The ndb.Key for a TestMetadata.
"""
# We're dropping clank support, which goes through the old recipe_bisect
# system. For now, we're simply disabling alert generation and stopping
# bisects from getting kicked off. We'll follow up with a more thorough
# removal of all old bisect related code.
# crbug.com/937230
if test_key.id().startswith('ClankInternal'):
raise ndb.Return(None)
test = yield test_key.get_async()
config = yield anomaly_config.GetAnomalyConfigDictAsync(test)
max_num_rows = config.get('max_window_size', DEFAULT_NUM_POINTS)
rows_by_stat = yield GetRowsToAnalyzeAsync(test, max_num_rows)
ref_rows_by_stat = {}
ref_test = yield _CorrespondingRefTest(test_key)
if ref_test:
ref_rows_by_stat = yield GetRowsToAnalyzeAsync(ref_test, max_num_rows)
for s, rows in rows_by_stat.items():
if rows:
logging.info('Processing test: %s', test_key.id())
yield _ProcessTestStat(config, test, s, rows, ref_rows_by_stat.get(s))
def _EmailSheriff(sheriff, test_key, anomaly_key):
test_entity = test_key.get()
anomaly_entity = anomaly_key.get()
email_sheriff.EmailSheriff(sheriff, test_entity, anomaly_entity)
@ndb.tasklet
def _ProcessTestStat(config, test, stat, rows, ref_rows):
# If there were no rows fetched, then there's nothing to analyze.
if not rows:
logging.error('No rows fetched for %s', test.test_path)
raise ndb.Return(None)
# TODO(crbug/1158326): Use the data from the git-hosted anomaly configuration
# instead of the provided config.
# Get all the sheriff from sheriff-config match the path
client = SheriffConfigClient()
subscriptions, err_msg = client.Match(test.test_path)
# Breaks the process when Match failed to ensure find_anomaly do the best
# effort to find the subscriber. Leave retrying to upstream.
if err_msg is not None:
raise RuntimeError(err_msg)
# If we don't find any subscriptions, then we shouldn't waste resources on
# trying to find anomalies that we aren't going to alert on anyway.
if not subscriptions:
logging.error('No subscription for %s', test.test_path)
raise ndb.Return(None)
subscription_names = [s.name for s in subscriptions or []]
configs = {
s.name: [c.to_dict() for c in s.anomaly_configs
] for s in subscriptions or [] if s.anomaly_configs
}
if configs:
logging.debug('matched anomaly configs: %s', configs)
# Get anomalies and check if they happen in ref build also.
change_points = FindChangePointsForTest(rows, config)
test_key = test.key
if ref_rows:
ref_change_points = FindChangePointsForTest(ref_rows, config)
# Filter using any jumps in ref
change_points = _FilterAnomaliesFoundInRef(change_points, ref_change_points,
test_key)
anomalies = yield [
_MakeAnomalyEntity(c, test, stat, rows, config) for c in change_points
]
# If no new anomalies were found, then we're done.
if not anomalies:
raise ndb.Return(None)
logging.info('Created %d anomalies', len(anomalies))
logging.info(' Test: %s', test_key.id())
logging.info(' Stat: %s', stat)
for a in anomalies:
a.subscriptions = subscriptions
a.subscription_names = subscription_names
a.internal_only = (
any([
s.visibility != subscription.VISIBILITY.PUBLIC
for s in subscriptions
]) or test.internal_only)
a.groups = alert_group.AlertGroup.GetGroupsForAnomaly(a, subscriptions)
yield ndb.put_multi_async(anomalies)
# TODO(simonhatch): email_sheriff.EmailSheriff() isn't a tasklet yet, so this
# code will run serially.
# Email sheriff about any new regressions.
for anomaly_entity in anomalies:
if anomaly_entity.bug_id is None and not anomaly_entity.is_improvement:
deferred.defer(_EmailSheriff, anomaly_entity.subscriptions, test.key,
anomaly_entity.key)
@ndb.tasklet
def _FindLatestAlert(test, stat):
query = anomaly.Anomaly.query()
query = query.filter(anomaly.Anomaly.test == test.key)
query = query.filter(anomaly.Anomaly.statistic == stat)
query = query.order(-anomaly.Anomaly.end_revision)
results = yield query.get_async()
if not results:
raise ndb.Return(None)
raise ndb.Return(results)
@ndb.tasklet
def _FindMonitoredStatsForTest(test):
del test
# TODO: This will get filled out after refactor.
raise ndb.Return(['avg'])
@ndb.synctasklet
def GetRowsToAnalyze(test, max_num_rows):
"""Gets the Row entities that we want to analyze.
Args:
test: The TestMetadata entity to get data for.
max_num_rows: The maximum number of points to get.
Returns:
A list of the latest Rows after the last alerted revision, ordered by
revision. These rows are fetched with t a projection query so they only
have the revision, value, and timestamp properties.
"""
result = yield GetRowsToAnalyzeAsync(test, max_num_rows)
raise ndb.Return(result)
@ndb.tasklet
def GetRowsToAnalyzeAsync(test, max_num_rows):
# If this is a histogram based test, there may be multiple statistics we want
# to analyze
alerted_stats = yield _FindMonitoredStatsForTest(test)
latest_alert_by_stat = dict(
(s, _FindLatestAlert(test, s)) for s in alerted_stats)
results = {}
for s in alerted_stats:
results[s] = _FetchRowsByStat(test.key, s, latest_alert_by_stat[s],
max_num_rows)
for s in results.keys():
results[s] = yield results[s]
raise ndb.Return(results)
@ndb.tasklet
def _FetchRowsByStat(test_key, stat, last_alert_future, max_num_rows):
# If stats are specified, we only want to alert on those, otherwise alert on
# everything.
if stat == 'avg':
query = graph_data.Row.query(projection=['revision', 'timestamp', 'value'])
else:
query = graph_data.Row.query()
query = query.filter(
graph_data.Row.parent_test == utils.OldStyleTestKey(test_key))
# The query is ordered in descending order by revision because we want
# to get the newest points.
if last_alert_future:
last_alert = yield last_alert_future
if last_alert:
query = query.filter(graph_data.Row.revision > last_alert.end_revision)
query = query.order(-graph_data.Row.revision)
# However, we want to analyze them in ascending order.
rows = yield query.fetch_async(limit=max_num_rows)
vals = []
for r in list(reversed(rows)):
if stat == 'avg':
vals.append((r.revision, r, r.value))
elif stat == 'std':
vals.append((r.revision, r, r.error))
else:
vals.append((r.revision, r, getattr(r, 'd_%s' % stat)))
raise ndb.Return(vals)
def _FilterAnomaliesFoundInRef(change_points, ref_change_points, test):
change_points_filtered = []
test_path = utils.TestPath(test)
for c in change_points:
# Log information about what anomaly got filtered and what did not.
if not _IsAnomalyInRef(c, ref_change_points):
logging.info('Nothing was filtered out for test %s, and revision %s',
test_path, c.x_value)
change_points_filtered.append(c)
else:
logging.info('Filtering out anomaly for test %s, and revision %s',
test_path, c.x_value)
return change_points_filtered
@ndb.tasklet
def _CorrespondingRefTest(test_key):
"""Returns the TestMetadata for the corresponding ref build trace, or None."""
test_path = utils.TestPath(test_key)
possible_ref_test_paths = [test_path + '_ref', test_path + '/ref']
for path in possible_ref_test_paths:
ref_test = yield utils.TestKey(path).get_async()
if ref_test:
raise ndb.Return(ref_test)
raise ndb.Return(None)
def _IsAnomalyInRef(change_point, ref_change_points):
"""Checks if anomalies are detected in both ref and non ref build.
Args:
change_point: A find_change_points.ChangePoint object to check.
ref_change_points: List of find_change_points.ChangePoint objects
found for a ref build series.
Returns:
True if there is a match found among the ref build series change points.
"""
for ref_change_point in ref_change_points:
if change_point.x_value == ref_change_point.x_value:
return True
return False
def _GetImmediatelyPreviousRevisionNumber(later_revision, rows):
"""Gets the revision number of the Row immediately before the given one.
Args:
later_revision: A revision number.
rows: List of Row entities in ascending order by revision.
Returns:
The revision number just before the given one.
"""
for (revision, _, _) in reversed(rows):
if revision < later_revision:
return revision
assert False, 'No matching revision found in |rows|.'
def _GetRefBuildKeyForTest(test):
"""TestMetadata key of the reference build for the given test, if one exists.
Args:
test: the TestMetadata entity to get the ref build for.
Returns:
A TestMetadata key if found, or None if not.
"""
potential_path = '%s/ref' % test.test_path
potential_test = utils.TestKey(potential_path).get()
if potential_test:
return potential_test.key
potential_path = '%s_ref' % test.test_path
potential_test = utils.TestKey(potential_path).get()
if potential_test:
return potential_test.key
return None
def _GetDisplayRange(old_end, rows):
"""Get the revision range using a_display_rev, if applicable.
Args:
old_end: the x_value from the change_point
rows: List of Row entities in asscending order by revision.
Returns:
A end_rev, start_rev tuple with the correct revision.
"""
start_rev = end_rev = 0
for (revision, row, _) in reversed(rows):
if revision == old_end and hasattr(row, 'r_commit_pos'):
end_rev = row.r_commit_pos
elif revision < old_end and hasattr(row, 'r_commit_pos'):
start_rev = row.r_commit_pos + 1
break
if not end_rev or not start_rev:
end_rev = start_rev = None
return start_rev, end_rev
@ndb.tasklet
def _MakeAnomalyEntity(change_point, test, stat, rows, config):
"""Creates an Anomaly entity.
Args:
change_point: A find_change_points.ChangePoint object.
test: The TestMetadata entity that the anomalies were found on.
stat: The TestMetadata stat that the anomaly was found on.
rows: List of Row entities that the anomalies were found on.
config: A dict representing the anomaly detection configuration
parameters used to produce this anomaly.
Returns:
An Anomaly entity, which is not yet put in the datastore.
"""
end_rev = change_point.extended_end
start_rev = _GetImmediatelyPreviousRevisionNumber(
change_point.extended_start, rows) + 1
print(change_point.extended_start, change_point.extended_end)
display_start = display_end = None
if test.master_name == 'ClankInternal':
display_start, display_end = _GetDisplayRange(change_point.x_value, rows)
median_before = change_point.median_before
median_after = change_point.median_after
suite_key = test.key.id().split('/')[:3]
suite_key = '/'.join(suite_key)
suite_key = utils.TestKey(suite_key)
queried_diagnostics = yield (
histogram.SparseDiagnostic.GetMostRecentDataByNamesAsync(
suite_key,
set([
reserved_infos.BUG_COMPONENTS.name, reserved_infos.OWNERS.name,
reserved_infos.INFO_BLURB.name,
reserved_infos.ALERT_GROUPING.name,
])))
bug_components = queried_diagnostics.get(reserved_infos.BUG_COMPONENTS.name,
{}).get('values')
if bug_components:
bug_components = bug_components[0]
# TODO(902796): Remove this typecheck.
if isinstance(bug_components, list):
bug_components = bug_components[0]
ownership_information = {
'emails':
queried_diagnostics.get(reserved_infos.OWNERS.name, {}).get('values'),
'component':
bug_components,
# Info blurbs should be a single string, and we'll only take the firs
# element of the list of values.
'info_blurb':
queried_diagnostics.get(reserved_infos.INFO_BLURB.name,
{}).get('values', [None])[0],
}
alert_grouping = queried_diagnostics.get(reserved_infos.ALERT_GROUPING.name,
{}).get('values', [])
# Compute additional anomaly metadata.
def MinMax(iterable):
min_ = max_ = None
for val in iterable:
if min_ is None:
min_ = max_ = val
else:
min_ = min(min_, val)
max_ = max(max_, val)
return min_, max_
earliest_input_timestamp, latest_input_timestamp = MinMax(
r.timestamp for unused_rev, r, unused_val in rows)
new_anomaly = anomaly.Anomaly(
start_revision=start_rev,
end_revision=end_rev,
median_before_anomaly=median_before,
median_after_anomaly=median_after,
segment_size_before=change_point.size_before,
segment_size_after=change_point.size_after,
window_end_revision=change_point.window_end,
std_dev_before_anomaly=change_point.std_dev_before,
t_statistic=change_point.t_statistic,
degrees_of_freedom=change_point.degrees_of_freedom,
p_value=change_point.p_value,
is_improvement=_IsImprovement(test, median_before, median_after),
ref_test=_GetRefBuildKeyForTest(test),
test=test.key,
statistic=stat,
internal_only=test.internal_only,
units=test.units,
display_start=display_start,
display_end=display_end,
ownership=ownership_information,
alert_grouping=alert_grouping,
earliest_input_timestamp=earliest_input_timestamp,
latest_input_timestamp=latest_input_timestamp,
anomaly_config=config)
raise ndb.Return(new_anomaly)
def FindChangePointsForTest(rows, config_dict):
"""Gets the anomaly data from the anomaly detection module.
Args:
rows: The Row entities to find anomalies for, sorted backwards by revision.
config_dict: Anomaly threshold parameters as a dictionary.
Returns:
A list of find_change_points.ChangePoint objects.
"""
data_series = [(revision, value) for (revision, _, value) in rows]
return find_change_points.FindChangePoints(data_series, **config_dict)
def _IsImprovement(test, median_before, median_after):
"""Returns whether the alert is an improvement for the given test.
Args:
test: TestMetadata to get the improvement direction for.
median_before: The median of the segment immediately before the anomaly.
median_after: The median of the segment immediately after the anomaly.
Returns:
True if it is improvement anomaly, otherwise False.
"""
if (median_before < median_after
and test.improvement_direction == anomaly.UP):
return True
if (median_before >= median_after
and test.improvement_direction == anomaly.DOWN):
return True
return False