blob: df70566c483d3cee768a3a39e0c9a618bbf5aca0 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import collections
from cli_tools.soundwave import pandas_sqlite
from core.external_modules import pandas
TABLE_NAME = 'timeseries'
COLUMN_TYPES = (
# Index columns.
('test_suite', str), # benchmark name ('loading.mobile')
('measurement', str), # metric name ('timeToFirstContentfulPaint')
('bot', str), # master/builder name ('ChromiumPerf.android-nexus5')
('test_case', str), # story name ('Wikipedia')
('point_id', 'int64'), # monotonically increasing id for time series axis
# Other columns.
('value', 'float64'), # value recorded for test_path at given point_id
('timestamp', 'datetime64[ns]'), # when the value got stored on dashboard
('commit_pos', 'int64'), # chromium commit position
('chromium_rev', str), # git hash of chromium revision
('clank_rev', str), # git hash of clank revision
('trace_url', str), # URL to a sample trace.
('units', str), # unit of measurement (e.g. 'ms', 'bytes')
('improvement_direction', str), # good direction ('up', 'down', 'unknown')
)
COLUMNS = tuple(c for c, _ in COLUMN_TYPES)
INDEX = COLUMNS[:5]
# Copied from https://goo.gl/DzGYpW.
_CODE_TO_IMPROVEMENT_DIRECTION = {
0: 'up',
1: 'down',
}
TEST_PATH_PARTS = (
'master', 'builder', 'test_suite', 'measurement', 'test_case')
# Query template to find all data points of a given test_path (i.e. fixed
# test_suite, measurement, bot, and test_case values).
_QUERY_TIME_SERIES = (
'SELECT * FROM %s WHERE %s'
% (TABLE_NAME, ' AND '.join('%s=?' % c for c in INDEX[:-1])))
# Required columns to request from /timeseries2 API.
_TIMESERIES2_COLS = [
'revision',
'revisions',
'avg',
'timestamp',
'annotations']
class Key(collections.namedtuple('Key', INDEX[:-1])):
"""Uniquely identifies a single timeseries."""
@classmethod
def FromDict(cls, *args, **kwargs):
kwargs = dict(*args, **kwargs)
kwargs.setdefault('test_case', '') # test_case is optional.
return cls(**kwargs)
def AsDict(self):
return dict(zip(self._fields, self))
def AsApiParams(self):
"""Return a dict with params for a /timeseries2 API request."""
params = self.AsDict()
if not params['test_case']:
del params['test_case'] # test_case is optional.
params['columns'] = ','.join(_TIMESERIES2_COLS)
return params
def DataFrame(rows=None):
return pandas_sqlite.DataFrame(COLUMN_TYPES, index=INDEX, rows=rows)
def _ParseIntValue(value, on_error=-1):
# Try to parse as int and, in case of error, return a pre-defined value.
try:
return int(value)
except StandardError:
return on_error
def _ParseConfigFromTestPath(test_path):
if isinstance(test_path, Key):
return test_path.AsDict()
values = test_path.split('/', len(TEST_PATH_PARTS) - 1)
if len(values) < len(TEST_PATH_PARTS):
values.append('') # Possibly missing test_case.
if len(values) != len(TEST_PATH_PARTS):
raise ValueError(test_path)
config = dict(zip(TEST_PATH_PARTS, values))
config['bot'] = '%s/%s' % (config.pop('master'), config.pop('builder'))
return config
def DataFrameFromJson(test_path, data):
if isinstance(test_path, Key):
return _DataFrameFromJsonV2(test_path, data)
else:
# TODO(crbug.com/907121): Remove when we can switch entirely to v2.
return _DataFrameFromJsonV1(test_path, data)
def _DataFrameFromJsonV2(ts_key, data):
rows = []
for point in data['data']:
point = dict(zip(_TIMESERIES2_COLS, point))
rows.append(ts_key + (
point['revision'], # point_id
point['avg'], # value
point['timestamp'], # timestamp
_ParseIntValue(point['revisions']['r_commit_pos']), # commit_pos
point['revisions'].get('r_chromium'), # chromium_rev
point['revisions'].get('r_clank'), # clank_rev
point['annotations'].get('a_tracing_uri'), # trace_url
data['units'], # units
data['improvement_direction'], # improvement_direction
))
return DataFrame(rows)
def _DataFrameFromJsonV1(test_path, data):
assert test_path == data['test_path']
config = _ParseConfigFromTestPath(data['test_path'])
config['improvement_direction'] = _CODE_TO_IMPROVEMENT_DIRECTION.get(
data['improvement_direction'], 'unknown')
timeseries = data['timeseries']
# The first element in timeseries list contains header with column names.
header = timeseries[0]
rows = []
# Remaining elements contain the values for each row.
for values in timeseries[1:]:
row = config.copy()
row.update(zip(header, values))
row['point_id'] = row['revision']
row['commit_pos'] = _ParseIntValue(row['r_commit_pos'])
row['chromium_rev'] = row.get('r_chromium')
row['clank_rev'] = row.get('r_clank', None)
rows.append(tuple(row.get(k) for k in COLUMNS))
return DataFrame(rows)
def GetTimeSeries(con, test_path, extra_cond=None):
"""Get the records for all data points on the given test_path.
Returns:
A pandas.DataFrame with all records found.
"""
config = _ParseConfigFromTestPath(test_path)
params = tuple(config[c] for c in INDEX[:-1])
query = _QUERY_TIME_SERIES
if extra_cond is not None:
query = ' '.join([query, extra_cond])
return pandas.read_sql(query, con, params=params, parse_dates=['timestamp'])
def GetMostRecentPoint(con, test_path):
"""Find the record for the most recent data point on the given test_path.
Returns:
A pandas.Series with the record if found, or None otherwise.
"""
df = GetTimeSeries(con, test_path, 'ORDER BY timestamp DESC LIMIT 1')
return df.iloc[0] if not df.empty else None