blob: edf59783500aa4752766995c1832b15544af4d70 [file] [log] [blame]
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import filecmp
import json
import logging
from logging import handlers
import os
import posixpath
import shutil
import subprocess
from core.external_modules import pandas as pd
from core.external_modules import numpy as np
from core import gsutil
from py_utils import tempfile_ext
PINBOARD_DIR = os.path.abspath(os.path.dirname(__file__))
TOOLS_PERF_DIR = os.path.normpath(os.path.join(PINBOARD_DIR, '..', '..'))
CACHED_DATA_DIR = os.path.join(TOOLS_PERF_DIR, '_cached_data', 'pinboard')
PINPOINT_CLI = os.path.join(TOOLS_PERF_DIR, 'pinpoint_cli')
JOB_CONFIGS_PATH = os.path.join(PINBOARD_DIR, 'job_configs.json')
JOBS_STATE_FILE = 'jobs_state.json'
DATASET_PKL_FILE = 'dataset.pkl'
DATASET_CSV_FILE = 'dataset.csv'
CLOUD_STORAGE_DIR = 'gs://chrome-health-tvdata/pinboard'
TZ = 'America/Los_Angeles' # MTV-time.
# Only these are exported and uploaded to the Cloud Storage dataset.
MEASUREMENTS = set([
# V8 metrics.
'JavaScript:duration',
'Optimize-Background:duration',
'Optimize:duration',
'RunsPerMinute',
'Total-Main-Thread:duration',
'Total:duration',
'V8-Only-Main-Thread:duration',
'V8-Only:duration',
'memory:chrome:renderer_processes:reported_by_chrome:v8:effective_size',
'total:500ms_window:renderer_eqt:v8',
# Startup metrics.
'experimental_content_start_time',
'experimental_navigation_start_time',
'first_contentful_paint_time',
'messageloop_start_time',
'navigation_commit_time',
])
# Compute averages over a fixed set of active stories. These may need to be
# periodically updated.
ACTIVE_STORIES = set([
'browse:chrome:newtab',
'browse:chrome:omnibox',
'browse:media:facebook_photos',
'browse:media:googleplaystore:2019',
'browse:media:imgur',
'browse:media:youtube',
'browse:news:cricbuzz',
'browse:news:toi',
'browse:shopping:amazon',
'browse:shopping:lazada',
'browse:social:facebook',
'browse:social:instagram',
'browse:tools:maps',
'load:media:facebook_photos',
'load:media:youtube:2018',
'load:news:irctc',
'load:news:wikipedia:2018',
'intent:coldish:bbc',
'Speedometer2',
])
def StartPinpointJobs(state, date):
"""Start new pinpoint jobs for the last commit on the given date."""
revision, timestamp = GetLastCommitOfDate(date)
if any(item['revision'] == revision for item in state):
logging.info('No new jobs to start.')
return
# Add a new item to the state with info about jobs for this revision.
logging.info('Starting jobs for %s (%s):', timestamp[:10], revision)
item = {'revision': revision, 'timestamp': timestamp, 'jobs': []}
configs = LoadJsonFile(JOB_CONFIGS_PATH)
for config in configs:
config['base_git_hash'] = revision
with tempfile_ext.NamedTemporaryFile() as tmp:
json.dump(config, tmp)
tmp.close()
output = subprocess.check_output(
['vpython', PINPOINT_CLI, 'start-job', tmp.name],
universal_newlines=True).strip()
logging.info(output)
assert 'https://pinpoint' in output
item['jobs'].append({'id': output.split('/')[-1], 'status': 'queued'})
state.append(item)
state.sort(key=lambda p: p['timestamp']) # Keep items sorted by date.
def IsJobFinished(job):
return job['status'] in ['completed', 'failed']
def CollectPinpointResults(state):
"""Check the status of pinpoint jobs and collect their results."""
# First iterate over all running jobs, and update their status.
for item in state:
active = [job['id'] for job in item['jobs'] if not IsJobFinished(job)]
if not active:
continue
cmd = ['vpython', PINPOINT_CLI, 'status']
cmd.extend(active)
output = subprocess.check_output(cmd, universal_newlines=True)
updates = dict(line.split(': ', 1) for line in output.splitlines())
logging.info('Got job updates: %s.', updates)
for job in item['jobs']:
if job['id'] in updates:
job['status'] = updates[job['id']]
# Now iterate over all completed jobs, and download their results if needed.
for item in state:
if _SkipProcessing(item): # Skip if not ready or all failed.
continue
output_file = RevisionResultsFile(item)
if not os.path.exists(output_file):
cmd = ['vpython', PINPOINT_CLI, 'get-csv', '--output', output_file, '--']
job_ids = [j['id'] for j in item['jobs'] if j['status'] == 'completed']
logging.info('Getting csv data for commit: %s.', item['revision'])
subprocess.check_output(cmd + job_ids)
def LoadJobsState():
"""Load the latest recorded state of pinpoint jobs."""
local_path = CachedFilePath(JOBS_STATE_FILE)
if os.path.exists(local_path) or DownloadFromCloudStorage(local_path):
return LoadJsonFile(local_path)
else:
logging.info('No jobs state found. Creating empty state.')
return []
def UpdateJobsState(state):
"""Write back the updated state of pinpoint jobs.
If there were any changes to the state, i.e. new jobs were created or
existing ones completed, both the local cached copy and the backup in cloud
storage are updated.
"""
local_path = CachedFilePath(JOBS_STATE_FILE)
with tempfile_ext.NamedTemporaryFile() as tmp:
json.dump(state, tmp, sort_keys=True, indent=2, separators=(',', ': '))
tmp.close()
if not os.path.exists(local_path) or not filecmp.cmp(tmp.name, local_path):
shutil.copyfile(tmp.name, local_path)
UploadToCloudStorage(local_path)
def GetCachedDataset():
"""Load the latest dataset with cached data."""
local_path = CachedFilePath(DATASET_PKL_FILE)
if os.path.exists(local_path) or DownloadFromCloudStorage(local_path):
return pd.read_pickle(local_path)
else:
return None
def UpdateCachedDataset(df):
"""Write back the dataset with cached data."""
local_path = CachedFilePath(DATASET_PKL_FILE)
df.to_pickle(local_path)
UploadToCloudStorage(local_path)
def GetItemsToUpdate(state):
"""Select jobs with new data to download and cached data for existing jobs.
This also filters out old revisions to keep only recent (6 months) data.
Returns:
new_items: A list of job items from which to get data.
cached_df: A DataFrame with existing cached data, may be None.
"""
from_date = str(TimeAgo(months=6).date())
new_items = [item for item in state if item['timestamp'] > from_date]
df = GetCachedDataset()
if df is not None:
recent_revisions = set(item['revision'] for item in new_items)
df = df[df['revision'].isin(recent_revisions)]
known_revisions = set(df['revision'])
new_items = [
item for item in new_items if item['revision'] not in known_revisions]
return new_items, df
def AggregateAndUploadResults(new_items, cached_df=None):
"""Aggregate results collected and upload them to cloud storage."""
dfs = []
if cached_df is not None:
dfs.append(cached_df)
found_new = False
for item in new_items:
if _SkipProcessing(item): # Jobs are not ready, or all have failed.
continue
if not found_new:
logging.info('Processing data from new results:')
found_new = True
logging.info('- %s (%s)', item['timestamp'][:10], item['revision'])
dfs.append(GetRevisionResults(item))
if not found_new:
logging.info('No new data found.')
return
# Otherwise update our cache and upload.
df = pd.concat(dfs, ignore_index=True)
UpdateCachedDataset(df)
# Drop revisions with no results and mark the last result for each metric,
# both with/without patch, as a 'reference'. This allows making score cards
# comparing their most recent results in Data Studio dashboards.
df = df[df['count'] > 0].copy()
latest_result = df.groupby(
['label', 'benchmark', 'name'])['timestamp'].transform('max')
df['reference'] = df['timestamp'] == latest_result
dataset_file = CachedFilePath(DATASET_CSV_FILE)
df.to_csv(dataset_file, index=False)
UploadToCloudStorage(dataset_file)
logging.info('Total %s rows of data uploaded.' % len(df.index))
def GetRevisionResults(item):
"""Aggregate the results from jobs that ran on a particular revision."""
# First load pinpoint csv results into a DataFrame. The dtype arg is needed
# to ensure that job_id's are always read a strings (even if some of them
# look like large numbers).
df = pd.read_csv(RevisionResultsFile(item), dtype={'job_id': str})
assert df['change'].str.contains(item['revision']).all(), (
'Not all results match the expected git revision')
# Filter out and keep only the measurements and stories that we want.
df = df[df['name'].isin(MEASUREMENTS)]
df = df[df['story'].isin(ACTIVE_STORIES)]
if not df.empty:
# Aggregate over the results of individual stories.
df = df.groupby(['change', 'name', 'benchmark', 'unit'])['mean'].agg(
['mean', 'count']).reset_index()
else:
# Otherwise build a single row with an "empty" aggregate for this revision.
# This is needed so we can remember in the cache that this revision has
# been processed.
df = pd.DataFrame(index=[0])
df['change'] = item['revision']
df['name'] = '(missing)'
df['benchmark'] = '(missing)'
df['unit'] = ''
df['mean'] = np.nan
df['count'] = 0
# Convert time units from milliseconds to seconds. This is what Data Studio
# dashboards expect.
is_ms_unit = df['unit'].str.startswith('ms_')
df.loc[is_ms_unit, 'mean'] = df['mean'] / 1000
# Distinguish jobs that ran with/without the tested patch.
df['label'] = df['change'].str.contains(r'\+').map(
{False: 'without_patch', True: 'with_patch'})
# Add timestamp and revision information. We snap the date to noon and make
# it naive (i.e. no timezone), so the dashboard doesn't get confused with
# dates close to the end of day.
date = item['timestamp'].split('T')[0] + 'T12:00:00'
df['timestamp'] = pd.Timestamp(date)
df['revision'] = item['revision']
# Fake the timestamp of jobs without the patch to appear as if they ran a
# year ago; this makes it easier to visualize and compare timeseries from
# runs with/without the patch in Data Studio dashboards.
df.loc[df['label'] == 'without_patch', 'timestamp'] = (
df['timestamp'] - pd.DateOffset(years=1))
return df[['revision', 'timestamp', 'label',
'benchmark', 'name', 'mean', 'count']]
def _SkipProcessing(item):
"""Return True if not all jobs have finished or all have failed."""
return (not all(IsJobFinished(job) for job in item['jobs']) or
all(job['status'] == 'failed' for job in item['jobs']))
def GetLastCommitOfDate(date):
""""Find the the lastest commit that landed on a given date."""
# Make sure our local git repo has up to date info on origin/master.
logging.info('Fetching latest origin/master data.')
subprocess.check_output(
['git', 'fetch', 'origin', 'master'], cwd=TOOLS_PERF_DIR,
stderr=subprocess.STDOUT)
# Snap the date to the end of the day.
cutoff_date = date.replace(hour=12).ceil('D')
logging.info('Finding latest commit before %s.', cutoff_date)
if not FindCommit(after_date=cutoff_date):
# We expect there to be some commits after the 'cutoff_date', otherwise
# there isn't yet a *last* commit before that date.
raise ValueError("Given date appears to be in the future. There isn't yet "
'a last commit before %s.' % cutoff_date)
return FindCommit(before_date=cutoff_date)
def FindCommit(before_date=None, after_date=None):
"""Find latest commit with optional before/after date constraints."""
cmd = ['git', 'log', '--max-count', '1', '--format=format:%H:%ct']
if before_date is not None:
cmd.extend(['--before', before_date.isoformat()])
if after_date is not None:
cmd.extend(['--after', after_date.isoformat()])
cmd.append('origin/master')
line = subprocess.check_output(cmd, cwd=TOOLS_PERF_DIR).strip()
if line:
revision, commit_time = line.split(':')
commit_time = pd.Timestamp(
int(commit_time), unit='s', tz=TZ).isoformat()
return revision, commit_time
else:
return None
def RevisionResultsFile(item):
"""Get a filepath where to cache results of jobs for a single revision."""
return CachedFilePath('job_results', item['revision'] + '.csv')
def CachedFilePath(arg, *args):
"""Get the path to a file stored in local cache."""
return os.path.join(CACHED_DATA_DIR, arg, *args)
def UploadToCloudStorage(filepath):
"""Copy the given file to cloud storage."""
gsutil.Copy(
filepath, posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath)))
def DownloadFromCloudStorage(filepath):
"""Get the given file from cloud storage."""
try:
gsutil.Copy(
posixpath.join(CLOUD_STORAGE_DIR, os.path.basename(filepath)), filepath)
logging.info('Downloaded copy of %s from cloud storage.', filepath)
return True
except subprocess.CalledProcessError:
logging.info('Failed to download copy of %s from cloud storage.', filepath)
return False
def LoadJsonFile(filename):
with open(filename) as f:
return json.load(f)
def TimeAgo(**kwargs):
return pd.Timestamp.now(TZ) - pd.DateOffset(**kwargs)
def SetUpLogging(level):
"""Set up logging to log both to stderr and a file."""
logger = logging.getLogger()
logger.setLevel(level)
formatter = logging.Formatter(
'(%(levelname)s) %(asctime)s [%(module)s] %(message)s')
h1 = logging.StreamHandler()
h1.setFormatter(formatter)
logger.addHandler(h1)
h2 = handlers.TimedRotatingFileHandler(
filename=CachedFilePath('pinboard.log'), when='W0', backupCount=5)
h2.setFormatter(formatter)
logger.addHandler(h2)
def Main():
SetUpLogging(level=logging.INFO)
actions = ('start', 'collect', 'upload')
parser = argparse.ArgumentParser()
parser.add_argument(
'actions', metavar='ACTION', nargs='+', choices=actions + ('auto',),
help=("select action to perform: 'start' pinpoint jobs, 'collect' job "
"results, 'upload' aggregated data, or 'auto' to do all in "
"sequence."))
parser.add_argument(
'--date', type=lambda s: pd.Timestamp(s, tz=TZ), default=TimeAgo(days=1),
help=('Run jobs for the last commit landed on the given date (assuming '
'MTV time). Defaults to the last commit landed yesterday.'))
args = parser.parse_args()
if 'auto' in args.actions:
logging.info('=== auto run for %s ===', args.date)
args.actions = actions
cached_results_dir = CachedFilePath('job_results')
if not os.path.isdir(cached_results_dir):
os.makedirs(cached_results_dir)
state = LoadJobsState()
try:
if 'start' in args.actions:
StartPinpointJobs(state, args.date)
new_items, cached_df = GetItemsToUpdate(state)
if 'collect' in args.actions:
CollectPinpointResults(new_items)
finally:
UpdateJobsState(state)
if 'upload' in args.actions:
AggregateAndUploadResults(new_items, cached_df)