blob: a7c3510ee0d82cf4e8f4fa6ca5d22a1a4c273690 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2015 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.
"""Calculate statistics about tasks.
Saves the data fetched from the server into a json file to enable reprocessing
the data without having to always fetch from the server.
"""
import datetime
import json
import logging
import optparse
import os
import subprocess
import sys
import urllib
CLIENT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(
__file__.decode(sys.getfilesystemencoding()))))
_EPOCH = datetime.datetime.utcfromtimestamp(0)
# Type of bucket to use.
MAJOR_OS, MAJOR_OS_ASAN, MINOR_OS, MINOR_OS_GPU = range(4)
def do_bucket(items, bucket_type):
"""Categorizes the tasks based on one of the bucket type defined above."""
out = {}
for task in items:
if 'heartbeat:1' in task['tags']:
# Skip heartbeats.
continue
is_asan = 'asan:1' in task['tags']
os_tag = None
gpu_tag = None
for t in task['tags']:
if t.startswith('os:'):
os_tag = t[3:]
if os_tag == 'Linux':
# GPU tests still specify Linux.
# TODO(maruel): Fix the recipe.
os_tag = 'Ubuntu'
elif t.startswith('gpu:'):
gpu_tag = t[4:]
if bucket_type in (MAJOR_OS, MAJOR_OS_ASAN):
if os_tag:
os_tag = os_tag.split('-')[0]
tag = os_tag or ''
if bucket_type == MINOR_OS_GPU and gpu_tag and gpu_tag != 'none':
tag += ' gpu:' + gpu_tag
if bucket_type == MAJOR_OS_ASAN and is_asan:
tag += ' ASan'
out.setdefault(tag, []).append(task)
# Also create global buckets for ASan.
if bucket_type == MAJOR_OS_ASAN:
tag = '(any OS) ASan' if is_asan else '(any OS) Not ASan'
out.setdefault(tag, []).append(task)
return out
def seconds_to_timedelta(seconds):
"""Converts seconds in datetime.timedelta, stripping sub-second precision.
This is for presentation, where subsecond values for summaries is not useful.
"""
return datetime.timedelta(seconds=round(seconds))
def parse_time_option(value):
"""Converts time as an option into a datetime.datetime.
Returns None if not specified.
"""
if not value:
return None
try:
return _EPOCH + datetime.timedelta(seconds=int(value))
except ValueError:
pass
for fmt in (
'%Y-%m-%d',
'%Y-%m-%d %H:%M',
'%Y-%m-%dT%H:%M',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S.%f'):
try:
return datetime.datetime.strptime(value, fmt)
except ValueError:
pass
raise ValueError('Failed to parse %s' % value)
def parse_time(value):
"""Converts serialized time from the API to datetime.datetime."""
for fmt in ('%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S'):
try:
return datetime.datetime.strptime(value, fmt)
except ValueError:
pass
raise ValueError('Failed to parse %s' % value)
def average(items):
if not items:
return 0.
return sum(items) / len(items)
def median(items):
return percentile(items, 50)
def percentile(items, percent):
"""Uses NIST method."""
if not items:
return 0.
rank = percent * .01 * (len(items) + 1)
rank_int = int(rank)
rest = rank - rank_int
if rest and rank_int < len(items) - 1:
return items[rank_int] + rest * (items[rank_int+1] - items[rank_int])
return items[min(rank_int, len(items) - 1)]
def sp(dividend, divisor):
"""Returns the percentage for dividend/divisor, safely."""
if not divisor:
return 0.
return 100. * float(dividend) / float(divisor)
def fetch_data(options):
"""Fetches TaskResultSummary as JSON from options.swarming and writes it to
options.json.
"""
if not options.start:
# Defaults to 25 hours ago.
options.start = datetime.datetime.utcnow() - datetime.timedelta(
seconds=25*60*60)
else:
options.start = parse_time_option(options.start)
if not options.end:
options.end = options.start + datetime.timedelta(days=1)
else:
options.end = parse_time_option(options.end)
url = 'tasks/list?' + urllib.urlencode(
{
'start': int((options.start - _EPOCH).total_seconds()),
'end': int((options.end - _EPOCH).total_seconds()),
})
cmd = [
sys.executable, os.path.join(CLIENT_DIR, 'swarming.py'),
'query',
'-S', options.swarming,
'--json', options.json,
'--limit', '0',
'--progress',
url,
]
if options.verbose:
cmd.extend(('--verbose', '--verbose', '--verbose'))
logging.info('%s', ' '.join(cmd))
subprocess.check_call(cmd)
print('')
def stats(tasks, show_cost):
"""Calculates and prints statistics about the tasks as a list of JSON encoded
TaskResultSummary.
"""
# Split tasks into 3 buckets.
# - 'rn' means ran, not idempotent
# - 'ri' means ran, idempotent
# - 'dd' means deduplicated.
rn = [
i for i in tasks
if not i.get('deduped_from') and not i.get('properties_hash')
]
ri = [
i for i in tasks if not i.get('deduped_from') and i.get('properties_hash')
]
dd = [i for i in tasks if i.get('deduped_from')]
# Note worthy results.
failures = [i for i in tasks if i.get('failure')]
internal_failures = [i for i in tasks if i.get('internal_failure')]
two_tries = [
i for i in tasks if i.get('try_number') == '2' and not i.get('deduped_from')
]
# TODO(maruel): 'state'
# Summations.
duration_rn = sum(i.get('duration', 0.) for i in rn)
duration_ri = sum(i.get('duration', 0.) for i in ri)
duration_dd = sum(i.get('duration', 0.) for i in dd)
duration_total = duration_rn + duration_ri + duration_dd
cost_rn = sum(sum(i.get('costs_usd') or [0.]) for i in rn)
cost_ri = sum(sum(i.get('costs_usd') or [0.]) for i in ri)
cost_dd = sum(i.get('cost_saved_usd', 0.) for i in dd)
cost_total = cost_rn + cost_ri + cost_dd
pendings = sorted(
(parse_time(i['started_ts']) - parse_time(i['created_ts'])).total_seconds()
for i in tasks if i.get('started_ts') and not i.get('deduped_from')
)
pending_total = datetime.timedelta(seconds=round(sum(pendings), 2))
pending_avg = datetime.timedelta(seconds=round(average(pendings), 2))
pending_med = datetime.timedelta(seconds=round(median(pendings), 2))
pending_p99 = datetime.timedelta(seconds=round(percentile(pendings, 99), 2))
# Calculate percentages to understand load relativeness.
percent_rn_nb_total = sp(len(rn), len(tasks))
percent_ri_nb_total = sp(len(ri), len(tasks))
percent_dd_nb_total = sp(len(dd), len(tasks))
percent_dd_nb_rel = sp(len(dd), len(ri) + len(dd))
percent_rn_duration_total = sp(duration_rn, duration_total)
percent_ri_duration_total = sp(duration_ri, duration_total)
percent_dd_duration_total = sp(duration_dd, duration_total)
percent_dd_duration_rel = sp(duration_dd, duration_dd + duration_ri)
percent_rn_cost_total = sp(cost_rn, cost_total)
percent_ri_cost_total = sp(cost_ri, cost_total)
percent_dd_cost_total = sp(cost_dd, cost_total)
percent_dd_cost_rel = sp(cost_dd, cost_dd + cost_ri)
reliability = 100. - sp(len(internal_failures), len(tasks))
percent_failures = sp(len(failures), len(tasks))
percent_two_tries = sp(len(two_tries), len(tasks))
# Print results as a table.
if rn:
cost = ' %7.2f$ (%5.1f%%)' % (cost_rn, percent_rn_cost_total)
print(
' %6d (%5.1f%%) %18s (%5.1f%%)%s '
'Real tasks executed, not idempotent' % (
len(rn), percent_rn_nb_total,
seconds_to_timedelta(duration_rn), percent_rn_duration_total,
cost if show_cost else ''))
if ri:
cost = ' %7.2f$ (%5.1f%%)' % (cost_ri, percent_ri_cost_total)
print(
' %6d (%5.1f%%) %18s (%5.1f%%)%s '
'Real tasks executed, idempotent' % (
len(ri), percent_ri_nb_total,
seconds_to_timedelta(duration_ri), percent_ri_duration_total,
cost if show_cost else ''))
if ri and rn:
cost = ' %7.2f$ ' % (cost_rn + cost_ri)
print(
' %6d %18s %s '
'Real tasks executed, all types' % (
len(rn) + len(ri),
seconds_to_timedelta(duration_rn + duration_ri),
cost if show_cost else ''))
if dd:
cost = ' %7.2f$*(%5.1f%%)' % (cost_dd, percent_dd_cost_total)
print(
' %6d*(%5.1f%%) %18s*(%5.1f%%)%s *Wasn\'t run, '
'previous results reused' % (
len(dd), percent_dd_nb_total,
seconds_to_timedelta(duration_dd), percent_dd_duration_total,
cost if show_cost else ''))
cost = ' (%5.1f%%)' % (percent_dd_cost_rel)
print(
' (%5.1f%%) (%5.1f%%)%s '
' (relative to idempotent tasks only)' % (
percent_dd_nb_rel, percent_dd_duration_rel,
cost if show_cost else ''))
if int(bool(rn)) + int(bool(ri)) + int(bool(dd)) > 1:
cost = ' %7.2f$' % (cost_total)
print(
' %6d %18s%s '
'Total tasks' % (
len(tasks), seconds_to_timedelta(duration_total),
cost if show_cost else ''))
print (
' Reliability: %7g%% Internal errors: %-4d' % (
reliability, len(internal_failures)))
print (
' Tasks failures: %-4d (%5.3f%%)' % (
len(failures), percent_failures))
print (
' Retried: %-4d (%5.3f%%) (Upgraded an internal failure '
'to a successful task)' %
(len(two_tries), percent_two_tries))
print (
' Pending Total: %13s Avg: %7s Median: %7s P99%%: %7s' % (
pending_total, pending_avg, pending_med, pending_p99))
def present_task_types(items, bucket_type, show_cost):
cost = ' Usage Cost $USD' if show_cost else ''
print(' Nb of Tasks Total Duration%s' % cost)
buckets = do_bucket(items, bucket_type)
for index, (bucket, tasks) in enumerate(sorted(buckets.iteritems())):
if index:
print('')
print('%s:' % (bucket or '<None>'))
stats(tasks, show_cost)
if buckets:
print('')
print('Global:')
stats(items, show_cost)
def present_users(items):
users = {}
for task in items:
user = ''
for tag in task['tags']:
if tag.startswith('user:'):
if tag[5:]:
user = tag[5:]
break
if tag == 'purpose:CI':
user = 'CI'
break
if tag == 'heartbeat:1':
user = 'heartbeat'
break
if user:
users.setdefault(user, 0)
users[user] += 1
maxlen = max(len(i) for i in users)
maxusers = 100
for index, (name, tasks) in enumerate(
sorted(users.iteritems(), key=lambda x: -x[1])):
if index == maxusers:
break
print('%3d %-*s: %d' % (index + 1, maxlen, name, tasks))
def main():
parser = optparse.OptionParser(description=sys.modules['__main__'].__doc__)
parser.add_option(
'-S', '--swarming',
metavar='URL', default=os.environ.get('SWARMING_SERVER', ''),
help='Swarming server to use')
parser.add_option(
'--start', help='Starting date in UTC; defaults to 25 hours ago')
parser.add_option(
'--end', help='End date in UTC; defaults to --start+1 day')
parser.add_option(
'--no-cost', action='store_false', dest='cost', default=True,
help='Strip $ from display')
parser.add_option(
'--users', action='store_true', help='Display top users instead')
parser.add_option(
'--json', default='tasks.json',
help='File containing raw data; default: %default')
parser.add_option('-v', '--verbose', action='count', default=0)
group = optparse.OptionGroup(parser, 'Grouping')
group.add_option(
'--major-os', action='store_const',
dest='bucket', const=MAJOR_OS, default=MAJOR_OS,
help='Classify by OS type, independent of OS version (default)')
group.add_option(
'--minor-os', action='store_const',
dest='bucket', const=MINOR_OS,
help='Classify by minor OS version')
group.add_option(
'--gpu', action='store_const',
dest='bucket', const=MINOR_OS_GPU,
help='Classify by minor OS version and GPU type when requested')
group.add_option(
'--asan', action='store_const',
dest='bucket', const=MAJOR_OS_ASAN,
help='Classify by major OS version and ASAN')
parser.add_option_group(group)
options, args = parser.parse_args()
if args:
parser.error('Unsupported argument %s' % args)
logging.basicConfig(level=logging.DEBUG if options.verbose else logging.ERROR)
if options.swarming:
fetch_data(options)
elif not os.path.isfile(options.json):
parser.error('--swarming is required.')
with open(options.json, 'rb') as f:
items = json.load(f)['items']
first = items[-1]
last = items[0]
print(
'From %s to %s (%s)' % (
first['created_ts'].split('.')[0],
last['created_ts'].split('.')[0],
parse_time(last['created_ts']) - parse_time(first['created_ts'])
))
print('')
if options.users:
present_users(items)
else:
present_task_types(items, options.bucket, options.cost)
return 0
if __name__ == '__main__':
sys.exit(main())