blob: 61e9cc7c420bf6cf2eaa11a5dd75edb13d5706a1 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import sqlite3
from core import cli_utils
from core.external_modules import pandas
from core.services import dashboard_service
from cli_tools.soundwave import pandas_sqlite
from cli_tools.soundwave import studies
from cli_tools.soundwave import tables
from cli_tools.soundwave import worker_pool
def _FetchBugsWorker(args):
con = sqlite3.connect(args.database_file, timeout=10)
def Process(bug_id):
bugs = tables.bugs.DataFrameFromJson([dashboard_service.Bugs(bug_id)])
pandas_sqlite.InsertOrReplaceRecords(con, 'bugs', bugs)
worker_pool.Process = Process
def FetchAlertsData(args):
params = {
'test_suite': args.benchmark,
'min_timestamp': cli_utils.DaysAgoToTimestamp(args.days)
}
if args.sheriff != 'all':
params['sheriff'] = args.sheriff
with tables.DbSession(args.database_file) as con:
# Get alerts.
num_alerts = 0
bug_ids = set()
# TODO: This loop may be slow when fetching thousands of alerts, needs a
# better progress indicator.
for data in dashboard_service.IterAlerts(**params):
alerts = tables.alerts.DataFrameFromJson(data)
pandas_sqlite.InsertOrReplaceRecords(con, 'alerts', alerts)
num_alerts += len(alerts)
bug_ids.update(alerts['bug_id'].unique())
print '%d alerts found!' % num_alerts
# Get set of bugs associated with those alerts.
bug_ids.discard(0) # A bug_id of 0 means untriaged.
print '%d bugs found!' % len(bug_ids)
# Filter out bugs already in cache.
if args.use_cache:
known_bugs = set(
b for b in bug_ids if tables.bugs.Get(con, b) is not None)
if known_bugs:
print '(skipping %d bugs already in the database)' % len(known_bugs)
bug_ids.difference_update(known_bugs)
# Use worker pool to fetch bug data.
total_seconds = worker_pool.Run(
'Fetching data of %d bugs: ' % len(bug_ids),
_FetchBugsWorker, args, bug_ids)
print '[%.1f bugs per second]' % (len(bug_ids) / total_seconds)
def _IterStaleTestPaths(con, test_paths):
"""Iterate over test_paths yielding only those with stale or absent data.
A test_path is considered to be stale if the most recent data point we have
for it in the db is more than a day older.
"""
a_day_ago = pandas.Timestamp.utcnow() - pandas.Timedelta(days=1)
a_day_ago = a_day_ago.tz_convert(tz=None)
for test_path in test_paths:
latest = tables.timeseries.GetMostRecentPoint(con, test_path)
if latest is None or latest['timestamp'] < a_day_ago:
yield test_path
def _FetchTimeseriesWorker(args):
con = sqlite3.connect(args.database_file, timeout=10)
min_timestamp = cli_utils.DaysAgoToTimestamp(args.days)
def Process(test_path):
try:
if isinstance(test_path, tables.timeseries.Key):
params = test_path.AsApiParams()
params['min_timestamp'] = min_timestamp
data = dashboard_service.Timeseries2(**params)
else:
data = dashboard_service.Timeseries(test_path, days=args.days)
except KeyError:
logging.info('Timeseries not found: %s', test_path)
return
timeseries = tables.timeseries.DataFrameFromJson(test_path, data)
pandas_sqlite.InsertOrReplaceRecords(con, 'timeseries', timeseries)
worker_pool.Process = Process
def _ReadTimeseriesFromFile(filename):
with open(filename, 'r') as f:
data = json.load(f)
return [tables.timeseries.Key.FromDict(ts) for ts in data]
def FetchTimeseriesData(args):
def _MatchesAllFilters(test_path):
return all(f in test_path for f in args.filters)
with tables.DbSession(args.database_file) as con:
# Get test_paths.
if args.benchmark is not None:
test_paths = dashboard_service.ListTestPaths(
args.benchmark, sheriff=args.sheriff)
elif args.input_file is not None:
test_paths = _ReadTimeseriesFromFile(args.input_file)
elif args.study is not None:
test_paths = list(args.study.IterTestPaths())
else:
raise ValueError('No source for test paths specified')
# Apply --filter's to test_paths.
if args.filters:
test_paths = filter(_MatchesAllFilters, test_paths)
num_found = len(test_paths)
print '%d test paths found!' % num_found
# Filter out test_paths already in cache.
if args.use_cache:
test_paths = list(_IterStaleTestPaths(con, test_paths))
num_skipped = num_found - len(test_paths)
if num_skipped:
print '(skipping %d test paths already in the database)' % num_skipped
# Use worker pool to fetch test path data.
total_seconds = worker_pool.Run(
'Fetching data of %d timeseries: ' % len(test_paths),
_FetchTimeseriesWorker, args, test_paths)
print '[%.1f test paths per second]' % (len(test_paths) / total_seconds)
if args.output_csv is not None:
print
print 'Post-processing data for study ...'
dfs = []
with tables.DbSession(args.database_file) as con:
for test_path in test_paths:
df = tables.timeseries.GetTimeSeries(con, test_path)
dfs.append(df)
df = studies.PostProcess(pandas.concat(dfs, ignore_index=True))
with cli_utils.OpenWrite(args.output_csv) as f:
df.to_csv(f, index=False)
print 'Wrote timeseries data to:', args.output_csv