blob: bacef6e49f03d74eb61df5bf0b5cf37a7a34e491 [file] [log] [blame]
#!/usr/bin/python3
# Copyright 2010 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Parses and displays the contents of one or more autoserv result directories.
This script parses the contents of one or more autoserv results folders and
generates test reports.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import datetime
import glob
import json
import logging
import operator
import optparse
import os
import re
import six
from six.moves import range
import sys
import common
from autotest_lib.utils import terminal
_STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
def _test_name_from_dir(test_dir):
"""Return the name from the test_dir.
Examples:
/tmp/test_that_latest/something/else/results-n-testname
returns `testname`
/tmp/TTL/something/results-n-test-name-here
returns `test-name-here`
"""
test_name = test_dir.split('/')[-1]
return '-'.join(test_name.split('-')[2:])
def generate_json_report(topDir):
status_dirs = find_dirs(topDir)
raw_json = {'tests': []}
for testdir in status_dirs:
raw_stat = get_status(testdir)
load_json(raw_stat, raw_json, testdir)
return raw_json
def load_json(raw_stat, raw_json, testdir):
indentLevel = -1
missingResult = False
lastStartTime = None
firstFound = False
contentLine = None
for line in raw_stat.splitlines():
fields = get_fields(line)
# A non-formatted line won't have fields, thus don't parse it.
if not fields:
continue
if fields[0] == 'INFO' or fields[1] == '----':
continue
if fields[0] == 'START':
if missingResult:
# Nested starts.... skipping for now....
continue
missingResult = True
lastStartTime = fields[2]
indentLevel = indent(line)
elif indent(line) == indentLevel + 1:
if missingResult is not True:
raise Exception('Content found out of context.')
contentLine = line
elif indent(line) == indentLevel and isEndLine(fields[0]):
missingResult = False
verdict = (parseResult(fields[0]))
_translate_to_dict(raw_json, fields, verdict, contentLine,
lastStartTime, testdir, firstFound)
firstFound = True
else:
# This is the case where we are dealing with lots of spammy logs
# inside of a task/test.
# We want to keep the parser as simple as possible:
# 1.) Find start of a test
# 2.) Find the first indented line prior to the close line.
# 3.) Find the closing line which matches the indent.
# 4.) Save all that info as a result.
continue
def isEndLine(field):
if re.search(r'%s' % 'END', field):
return True
return False
def _raw_time(timestamp):
"""timestamp=1670062718 --> 1670062718"""
return timestamp.replace('timestamp=', '')
def _translate_to_dict(res, fields, verdict, contentLine,
lastStartTime, testdir, firstFound):
"""Return the full_status, testname, and err to a json dict."""
if not firstFound:
# Sigh, the first test found will actually be the full test name.
# because of inconsistencies in autotest tests, tast tests, and tagged
# tests, we can't actually "trust" this name, thus we must get it from
# the dir name, which is always correct.
# The following test names will WAI.
name = _test_name_from_dir(testdir)
else:
name = fields[1]
res['tests'].append(
{'verdict': verdict,
'testname': name,
'errmsg': errFromContent(contentLine),
'resultspath': testdir,
'starttime': _raw_time(lastStartTime),
'endtime': _raw_time(fields[2])
})
def errFromContent(contentLine):
return contentLine.split('\t')[-1] if contentLine else ""
def parseResult(status_raw):
status = 'Error'
if re.search(r'%s' % 'FAIL', status_raw):
status = 'Fail'
elif re.search(r'%s' % 'ERROR', status_raw):
status = 'Error'
elif re.search(r'%s' % 'ABORT', status_raw):
status = 'Abort'
elif re.search(r'%s' % 'WARN', status_raw):
status = 'Warn'
elif re.search(r'%s' % 'TEST_NA', status_raw):
status = 'Not Run'
elif re.search(r'GOOD', status_raw):
status = 'Pass'
return status
def get_fields(line):
indents = indent(line)
items = line.split('\t')
# A "real" status line will at min have 6 fields.
if len(items) < 6:
return None
status = items[0 + indents]
name = items[2 + indents]
taskTime = items[3 + indents]
return status, name, taskTime
def indent(line):
return (len(line) - len(line.lstrip('\t')))
def get_status(testdir):
status_file = os.path.join(testdir, 'status.log')
if not os.path.isfile(status_file):
return
return open(status_file, 'r').read()
def find_dirs(topDir):
f = []
for (dirpath, dirnames, filenames) in os.walk(topDir):
for d in dirnames:
if 'results-' not in d:
continue
fp = os.path.join(dirpath, d)
if 'status.log' in os.listdir(fp):
f.append(fp)
return f
def Die(message_format, *args, **kwargs):
"""Log a message and kill the current process.
@param message_format: string for logging.error.
"""
logging.error(message_format, *args, **kwargs)
sys.exit(1)
class CrashWaiver:
"""Represents a crash that we want to ignore for now."""
def __init__(self, signals, deadline, url, person):
self.signals = signals
self.deadline = datetime.datetime.strptime(deadline, '%Y-%b-%d')
self.issue_url = url
self.suppressor = person
# List of crashes which are okay to ignore. This list should almost always be
# empty. If you add an entry, include the bug URL and your name, something like
# 'crashy':CrashWaiver(
# ['sig 11'], '2011-Aug-18', 'http://crosbug/123456', 'developer'),
_CRASH_ALLOWLIST = {
}
class ResultCollector(object):
"""Collects status and performance data from an autoserv results dir."""
def __init__(self, collect_perf=True, collect_attr=False,
collect_info=False, escape_error=False,
allow_chrome_crashes=False):
"""Initialize ResultsCollector class.
@param collect_perf: Should perf keyvals be collected?
@param collect_attr: Should attr keyvals be collected?
@param collect_info: Should info keyvals be collected?
@param escape_error: Escape error message text for tools.
@param allow_chrome_crashes: Treat Chrome crashes as non-fatal.
"""
self._collect_perf = collect_perf
self._collect_attr = collect_attr
self._collect_info = collect_info
self._escape_error = escape_error
self._allow_chrome_crashes = allow_chrome_crashes
def _CollectPerf(self, testdir):
"""Parses keyval file under testdir and return the perf keyval pairs.
@param testdir: autoserv test result directory path.
@return dict of perf keyval pairs.
"""
if not self._collect_perf:
return {}
return self._CollectKeyval(testdir, 'perf')
def _CollectAttr(self, testdir):
"""Parses keyval file under testdir and return the attr keyval pairs.
@param testdir: autoserv test result directory path.
@return dict of attr keyval pairs.
"""
if not self._collect_attr:
return {}
return self._CollectKeyval(testdir, 'attr')
def _CollectKeyval(self, testdir, keyword):
"""Parses keyval file under testdir.
If testdir contains a result folder, process the keyval file and return
a dictionary of perf keyval pairs.
@param testdir: The autoserv test result directory.
@param keyword: The keyword of keyval, either 'perf' or 'attr'.
@return If the perf option is disabled or the there's no keyval file
under testdir, returns an empty dictionary. Otherwise, returns
a dictionary of parsed keyvals. Duplicate keys are uniquified
by their instance number.
"""
keyval = {}
keyval_file = os.path.join(testdir, 'results', 'keyval')
if not os.path.isfile(keyval_file):
return keyval
instances = {}
for line in open(keyval_file):
match = re.search(r'^(.+){%s}=(.+)$' % keyword, line)
if match:
key = match.group(1)
val = match.group(2)
# If the same key name was generated multiple times, uniquify
# all instances other than the first one by adding the instance
# count to the key name.
key_inst = key
instance = instances.get(key, 0)
if instance:
key_inst = '%s{%d}' % (key, instance)
instances[key] = instance + 1
keyval[key_inst] = val
return keyval
def _CollectCrashes(self, status_raw):
"""Parses status_raw file for crashes.
Saves crash details if crashes are discovered. If an allowlist is
present, only records allowed crashes.
@param status_raw: The contents of the status.log or status file from
the test.
@return a list of crash entries to be reported.
"""
crashes = []
regex = re.compile(
'Received crash notification for ([-\w]+).+ (sig \d+)')
chrome_regex = re.compile(r'^supplied_[cC]hrome|^chrome$')
for match in regex.finditer(status_raw):
w = _CRASH_ALLOWLIST.get(match.group(1))
if (self._allow_chrome_crashes and
chrome_regex.match(match.group(1))):
print('@@@STEP_WARNINGS@@@')
print('%s crashed with %s' % (match.group(1), match.group(2)))
elif (w is not None and match.group(2) in w.signals and
w.deadline > datetime.datetime.now()):
print('Ignoring crash in %s for waiver that expires %s' % (
match.group(1), w.deadline.strftime('%Y-%b-%d')))
else:
crashes.append('%s %s' % match.groups())
return crashes
def _CollectInfo(self, testdir, custom_info):
"""Parses *_info files under testdir/sysinfo/var/log.
If the sysinfo/var/log/*info files exist, save information that shows
hw, ec and bios version info.
This collection of extra info is disabled by default (this funtion is
a no-op). It is enabled only if the --info command-line option is
explicitly supplied. Normal job parsing does not supply this option.
@param testdir: The autoserv test result directory.
@param custom_info: Dictionary to collect detailed ec/bios info.
@return a dictionary of info that was discovered.
"""
if not self._collect_info:
return {}
info = custom_info
sysinfo_dir = os.path.join(testdir, 'sysinfo', 'var', 'log')
for info_file, info_keys in six.iteritems(
{'ec_info.txt': ['fw_version'],
'bios_info.txt': ['fwid', 'hwid']}):
info_file_path = os.path.join(sysinfo_dir, info_file)
if not os.path.isfile(info_file_path):
continue
# Some example raw text that might be matched include:
#
# fw_version | snow_v1.1.332-cf20b3e
# fwid = Google_Snow.2711.0.2012_08_06_1139 # Active firmware ID
# hwid = DAISY TEST A-A 9382 # Hardware ID
info_regex = re.compile(r'^(%s)\s*[|=]\s*(.*)' %
'|'.join(info_keys))
with open(info_file_path, 'r') as f:
for line in f:
line = line.strip()
line = line.split('#')[0]
match = info_regex.match(line)
if match:
info[match.group(1)] = str(match.group(2)).strip()
return info
def _CollectEndTimes(self, status_raw, status_re='', is_end=True):
"""Helper to match and collect timestamp and localtime.
Preferred to locate timestamp and localtime with an
'END GOOD test_name...' line. However, aborted tests occasionally fail
to produce this line and then need to scrape timestamps from the 'START
test_name...' line.
@param status_raw: multi-line text to search.
@param status_re: status regex to seek (e.g. GOOD|FAIL)
@param is_end: if True, search for 'END' otherwise 'START'.
@return Tuple of timestamp, localtime retrieved from the test status
log.
"""
timestamp = ''
localtime = ''
localtime_re = r'\w+\s+\w+\s+[:\w]+'
match_filter = (
r'^\s*%s\s+(?:%s).*timestamp=(\d*).*localtime=(%s).*$' % (
'END' if is_end else 'START', status_re, localtime_re))
matches = re.findall(match_filter, status_raw, re.MULTILINE)
if matches:
# There may be multiple lines with timestamp/localtime info.
# The last one found is selected because it will reflect the end
# time.
for i in range(len(matches)):
timestamp_, localtime_ = matches[-(i+1)]
if not timestamp or timestamp_ > timestamp:
timestamp = timestamp_
localtime = localtime_
return timestamp, localtime
def _CheckExperimental(self, testdir):
"""Parses keyval file and return the value of `experimental`.
@param testdir: The result directory that has the keyval file.
@return The value of 'experimental', which is a boolean value indicating
whether it is an experimental test or not.
"""
keyval_file = os.path.join(testdir, 'keyval')
if not os.path.isfile(keyval_file):
return False
with open(keyval_file) as f:
for line in f:
match = re.match(r'experimental=(.+)', line)
if match:
return match.group(1) == 'True'
else:
return False
def _get_failure_msg_from_status(self, status_raw):
reason_tags = 'ABORT|ERROR|FAIL|WARN|TEST_NA'
match = re.search(r'^\t+(%s)\t(.+)' % (reason_tags),
status_raw, re.MULTILINE)
error_msg = 'Reason Unknown'
if match:
failure_type = match.group(1)
reason = match.group(2).split('\t')[4]
if self._escape_error:
reason = re.escape(reason)
error_msg = ': '.join([failure_type, reason])
return error_msg
def _get_full_status(self, status_raw):
"""Collect the full status of a test, and err msg if any.
This will grab the full status, rather than just pass/fail.
Additionally, if there is an err msg, it will be scraped as well.
@param status_raw: the status log, as a string.
@return The full status, and the err msg, if any.
"""
status = 'Error'
if re.search(r'%s' % 'FAIL', status_raw):
status = 'Fail'
elif re.search(r'%s' % 'ERROR', status_raw):
status = 'Error'
elif re.search(r'%s' % 'ABORT', status_raw):
status = 'Abort'
elif re.search(r'%s' % 'WARN', status_raw):
status = 'Warn'
elif re.search(r'%s' % 'TEST_NA', status_raw):
status = 'Not Run'
elif re.search(r'GOOD.+completed successfully', status_raw):
status = 'Pass'
return status, None
return status, self._get_failure_msg_from_status(status_raw)
def _CollectResult(self, testdir, results, is_experimental=False):
"""Collects results stored under testdir into a dictionary.
The presence/location of status files (status.log, status and
job_report.html) varies depending on whether the job is a simple
client test, simple server test, old-style suite or new-style
suite. For example:
-In some cases a single job_report.html may exist but many times
multiple instances are produced in a result tree.
-Most tests will produce a status.log but client tests invoked
by a server test will only emit a status file.
The two common criteria that seem to define the presence of a
valid test result are:
1. Existence of a 'status.log' or 'status' file. Note that if both a
'status.log' and 'status' file exist for a test, the 'status' file
is always a subset of the 'status.log' fle contents.
2. Presence of a 'debug' directory.
In some cases multiple 'status.log' files will exist where the parent
'status.log' contains the contents of multiple subdirectory 'status.log'
files. Parent and subdirectory 'status.log' files are always expected
to agree on the outcome of a given test.
The test results discovered from the 'status*' files are included
in the result dictionary. The test directory name and a test directory
timestamp/localtime are saved to be used as sort keys for the results.
The value of 'is_experimental' is included in the result dictionary.
@param testdir: The autoserv test result directory.
@param results: A list to which a populated test-result-dictionary will
be appended if a status file is found.
@param is_experimental: A boolean value indicating whether the result
directory is for an experimental test.
"""
status_file = os.path.join(testdir, 'status.log')
top_level = True
if not os.path.isfile(status_file):
status_file = os.path.join(testdir, 'status')
top_level = False
if not os.path.isfile(status_file):
return
# Status is True if GOOD, else False for all others.
status = False
error_msg = ''
status_raw = open(status_file, 'r').read()
failure_tags = 'ABORT|ERROR|FAIL'
warning_tag = 'WARN|TEST_NA'
failure = re.search(r'%s' % failure_tags, status_raw)
warning = re.search(r'%s' % warning_tag, status_raw) and not failure
good = (re.search(r'GOOD.+completed successfully', status_raw) and
not (failure or warning))
# We'd like warnings to allow the tests to pass, but still gather info.
if good or warning:
status = True
if not good:
error_msg = self._get_failure_msg_from_status(status_raw)
# Grab the timestamp - can be used for sorting the test runs.
# Grab the localtime - may be printed to enable line filtering by date.
# Designed to match a line like this:
# END GOOD testname ... timestamp=1347324321 localtime=Sep 10 17:45:21
status_re = r'GOOD|%s|%s' % (failure_tags, warning_tag)
endtimestamp, endlocaltime = self._CollectEndTimes(status_raw,
status_re)
starttimestamp, startlocaltime = self._CollectEndTimes(status_raw,
is_end=False)
# Hung tests will occasionally skip printing the END line so grab
# a default timestamp from the START line in those cases.
if not endtimestamp:
endtimestamp, endlocaltime = starttimestamp, startlocaltime
full_status = False
for r in results:
# Already logged results for this test.
if r['testdir'] in testdir:
full_status, err = None, None
break
if full_status is not None:
full_status, err = self._get_full_status(status_raw)
results.append({
'testdir': testdir,
'crashes': self._CollectCrashes(status_raw),
'status': status,
'error_msg': error_msg,
'localtime': endlocaltime,
'timestamp': endtimestamp,
'perf': self._CollectPerf(testdir),
'attr': self._CollectAttr(testdir),
'info': self._CollectInfo(testdir, {'localtime': endlocaltime,
'timestamp': endtimestamp}),
'experimental': is_experimental,
'full_status': full_status,
'full_err': err,
'startlocaltime': startlocaltime,
'starttimestamp': starttimestamp
})
def RecursivelyCollectResults(self,
resdir,
parent_experimental_tag=False,
results=[]):
"""Recursively collect results into a list of dictionaries.
Only recurses into directories that possess a 'debug' subdirectory
because anything else is not considered a 'test' directory.
The value of 'experimental' in keyval file is used to determine whether
the result is for an experimental test. If it is, all its sub
directories are considered to be experimental tests too.
@param resdir: results/test directory to parse results from and recurse
into.
@param parent_experimental_tag: A boolean value, used to keep track of
whether its parent directory is for an experimental test.
@return List of dictionaries of results.
"""
is_experimental = (parent_experimental_tag or
self._CheckExperimental(resdir))
self._CollectResult(resdir, results, is_experimental)
for testdir in glob.glob(os.path.join(resdir, '*')):
# Remove false positives that are missing a debug dir.
if not os.path.exists(os.path.join(testdir, 'debug')):
continue
self.RecursivelyCollectResults(testdir, is_experimental, results)
return results
class ReportGenerator(object):
"""Collects and displays data from autoserv results directories.
This class collects status and performance data from one or more autoserv
result directories and generates test reports.
"""
_KEYVAL_INDENT = 2
_STATUS_STRINGS = {'hr': {'pass': '[ PASSED ]', 'fail': '[ FAILED ]'},
'csv': {'pass': 'PASS', 'fail': 'FAIL'}}
def __init__(self, options, args):
self._options = options
self._args = args
self._color = terminal.Color(options.color)
self._results = []
def _CollectAllResults(self):
"""Parses results into the self._results list.
Builds a list (self._results) where each entry is a dictionary of
result data from one test (which may contain other tests). Each
dictionary will contain values such as: test folder, status, localtime,
crashes, error_msg, perf keyvals [optional], info [optional].
"""
collector = ResultCollector(
collect_perf=self._options.perf,
collect_attr=self._options.attr,
collect_info=self._options.info,
escape_error=self._options.escape_error,
allow_chrome_crashes=self._options.allow_chrome_crashes)
for resdir in self._args:
if not os.path.isdir(resdir):
Die('%r does not exist', resdir)
self._results.extend(collector.RecursivelyCollectResults(resdir))
if not self._results:
Die('no test directories found')
def _GenStatusString(self, status):
"""Given a bool indicating success or failure, return the right string.
Also takes --csv into account, returns old-style strings if it is set.
@param status: True or False, indicating success or failure.
@return The appropriate string for printing..
"""
success = 'pass' if status else 'fail'
if self._options.csv:
return self._STATUS_STRINGS['csv'][success]
return self._STATUS_STRINGS['hr'][success]
def _Indent(self, msg):
"""Given a message, indents it appropriately.
@param msg: string to indent.
@return indented version of msg.
"""
return ' ' * self._KEYVAL_INDENT + msg
def _GetTestColumnWidth(self):
"""Returns the test column width based on the test data.
The test results are aligned by discovering the longest width test
directory name or perf key stored in the list of result dictionaries.
@return The width for the test column.
"""
width = 0
for result in self._results:
width = max(width, len(result['testdir']))
perf = result.get('perf')
if perf:
perf_key_width = len(max(perf, key=len))
width = max(width, perf_key_width + self._KEYVAL_INDENT)
return width
def _PrintDashLine(self, width):
"""Prints a line of dashes as a separator in output.
@param width: an integer.
"""
if not self._options.csv:
print(''.ljust(width +
len(self._STATUS_STRINGS['hr']['pass']), '-'))
def _PrintEntries(self, entries):
"""Prints a list of strings, delimited based on --csv flag.
@param entries: a list of strings, entities to output.
"""
delimiter = ',' if self._options.csv else ' '
print(delimiter.join(entries))
def _PrintErrors(self, test, error_msg):
"""Prints an indented error message, unless the --csv flag is set.
@param test: the name of a test with which to prefix the line.
@param error_msg: a message to print. None is allowed, but ignored.
"""
if not self._options.csv and error_msg:
self._PrintEntries([test, self._Indent(error_msg)])
def _PrintErrorLogs(self, test, test_string):
"""Prints the error log for |test| if --debug is set.
@param test: the name of a test suitable for embedding in a path
@param test_string: the name of a test with which to prefix the line.
"""
if self._options.print_debug:
debug_file_regex = os.path.join(
'results.', test, 'debug',
'%s*.ERROR' % os.path.basename(test))
for path in glob.glob(debug_file_regex):
try:
with open(path) as fh:
for line in fh:
# Ensure line is not just WS.
if len(line.lstrip()) <= 0:
continue
self._PrintEntries(
[test_string, self._Indent(line.rstrip())])
except IOError:
print('Could not open %s' % path)
def _PrintResultDictKeyVals(self, test_entry, result_dict):
"""Formatted print a dict of keyvals like 'perf' or 'info'.
This function emits each keyval on a single line for uncompressed
review. The 'perf' dictionary contains performance keyvals while the
'info' dictionary contains ec info, bios info and some test timestamps.
@param test_entry: The unique name of the test (dir) - matches other
test output.
@param result_dict: A dict of keyvals to be presented.
"""
if not result_dict:
return
dict_keys = list(result_dict.keys())
dict_keys.sort()
width = self._GetTestColumnWidth()
for dict_key in dict_keys:
if self._options.csv:
key_entry = dict_key
else:
key_entry = dict_key.ljust(width - self._KEYVAL_INDENT)
key_entry = key_entry.rjust(width)
value_entry = self._color.Color(
self._color.BOLD, result_dict[dict_key])
self._PrintEntries([test_entry, key_entry, value_entry])
def _GetSortedTests(self):
"""Sort the test result dicts in preparation for results printing.
By default sorts the results directionaries by their test names.
However, when running long suites, it is useful to see if an early test
has wedged the system and caused the remaining tests to abort/fail. The
datetime-based chronological sorting allows this view.
Uses the --sort-chron command line option to control.
"""
if self._options.sort_chron:
# Need to reverse sort the test dirs to ensure the suite folder
# shows at the bottom. Because the suite folder shares its datetime
# with the last test it shows second-to-last without the reverse
# sort first.
tests = sorted(self._results, key=operator.itemgetter('testdir'),
reverse=True)
tests = sorted(tests, key=operator.itemgetter('timestamp'))
else:
tests = sorted(self._results, key=operator.itemgetter('testdir'))
return tests
# TODO(zamorzaev): reuse this method in _GetResultsForHTMLReport to avoid
# code copying.
def _GetDedupedResults(self):
"""Aggregate results from multiple retries of the same test."""
deduped_results = {}
for test in self._GetSortedTests():
test_details_matched = re.search(r'(.*)results-(\d[0-9]*)-(.*)',
test['testdir'])
if not test_details_matched:
continue
log_dir, test_number, test_name = test_details_matched.groups()
if (test_name in deduped_results and
deduped_results[test_name].get('status')):
# Already have a successfull (re)try.
continue
deduped_results[test_name] = test
return list(deduped_results.values())
def _GetResultsForHTMLReport(self):
"""Return cleaned results for HTML report.!"""
import copy
tests = copy.deepcopy(self._GetSortedTests())
pass_tag = "Pass"
fail_tag = "Fail"
na_tag = "NA"
count = 0
html_results = {}
for test_status in tests:
individual_tc_results = {}
test_details_matched = re.search(r'(.*)results-(\d[0-9]*)-(.*)',
test_status['testdir'])
if not test_details_matched:
continue
log_dir = test_details_matched.group(1)
test_number = test_details_matched.group(2)
test_name = test_details_matched.group(3)
if '/' in test_name:
test_name = test_name.split('/')[0]
if test_status['error_msg'] is None:
test_status['error_msg'] = ''
if test_name not in html_results:
count = count + 1
# Arranging the results in an order
individual_tc_results['status'] = test_status['status']
individual_tc_results['error_msg'] = test_status['error_msg']
individual_tc_results['s_no'] = count
individual_tc_results['crashes'] = test_status['crashes']
# Add <b> and </b> tag for the good format in the report.
individual_tc_results['attempts'] = \
'<b>test_result_number: %s - %s</b> : %s' % (
test_number, log_dir, test_status['error_msg'])
html_results[test_name] = individual_tc_results
else:
# If test found already then we are using the previous data
# instead of creating two different html rows. If existing
# status is False then needs to be updated
if html_results[test_name]['status'] is False:
html_results[test_name]['status'] = test_status['status']
html_results[test_name]['error_msg'] = test_status[
'error_msg']
html_results[test_name]['crashes'] = \
html_results[test_name]['crashes'] + test_status[
'crashes']
html_results[test_name]['attempts'] = \
html_results[test_name]['attempts'] + \
'</br><b>test_result_number : %s - %s</b> : %s' % (
test_number, log_dir, test_status['error_msg'])
# Re-formating the dictionary as s_no as key. So that we can have
# ordered data at the end
sorted_html_results = {}
for key in html_results.keys():
sorted_html_results[str(html_results[key]['s_no'])] = \
html_results[key]
sorted_html_results[str(html_results[key]['s_no'])]['test'] = key
# Mapping the Test case status if True->Pass, False->Fail and if
# True and the error message then NA
for key in sorted_html_results.keys():
if sorted_html_results[key]['status']:
if sorted_html_results[key]['error_msg'] != '':
sorted_html_results[key]['status'] = na_tag
else:
sorted_html_results[key]['status'] = pass_tag
else:
sorted_html_results[key]['status'] = fail_tag
return sorted_html_results
def GenerateReportHTML(self):
"""Generate clean HTMl report for the results."""
results = self._GetResultsForHTMLReport()
html_table_header = """ <th>S.No</th>
<th>Test</th>
<th>Status</th>
<th>Error Message</th>
<th>Crashes</th>
<th>Attempts</th>
"""
passed_tests = len([key for key in results.keys() if results[key][
'status'].lower() == 'pass'])
failed_tests = len([key for key in results.keys() if results[key][
'status'].lower() == 'fail'])
na_tests = len([key for key in results.keys() if results[key][
'status'].lower() == 'na'])
total_tests = passed_tests + failed_tests + na_tests
# Sort the keys
ordered_keys = sorted([int(key) for key in results.keys()])
html_table_body = ''
for key in ordered_keys:
key = str(key)
if results[key]['status'].lower() == 'pass':
color = 'LimeGreen'
elif results[key]['status'].lower() == 'na':
color = 'yellow'
else:
color = 'red'
html_table_body = html_table_body + """<tr>
<td>%s</td>
<td>%s</td>
<td
style="background-color:%s;">
%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td></tr>""" % \
(key, results[key]['test'],
color,
results[key]['status'],
results[key]['error_msg'],
results[key]['crashes'],
results[key]['attempts'])
html_page = """
<!DOCTYPE html>
<html lang="en">
<head>
<title>Automation Results</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width,initial-scale=1">
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css">
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/js/bootstrap.min.js"></script>
</head>
<body>
<div class="container">
<h2>Automation Report</h2>
<table class="table table-bordered" border="1">
<thead>
<tr style="background-color:LightSkyBlue;">
\n%s
</tr>
</thead>
<tbody>
\n%s
</tbody>
</table>
<div class="row">
<div class="col-sm-4">Passed: <b>%d</b></div>
<div class="col-sm-4">Failed: <b>%d</b></div>
<div class="col-sm-4">NA: <b>%d</b></div>
</div>
<div class="row">
<div class="col-sm-4">Total: <b>%d</b></div>
</div>
</div>
</body>
</html>
""" % (html_table_header, html_table_body, passed_tests,
failed_tests, na_tests, total_tests)
with open(os.path.join(self._options.html_report_dir,
"test_report.html"), 'w') as html_file:
html_file.write(html_page)
def _GenerateReportText(self):
"""Prints a result report to stdout.
Prints a result table to stdout. Each row of the table contains the
test result directory and the test result (PASS, FAIL). If the perf
option is enabled, each test entry is followed by perf keyval entries
from the test results.
"""
tests = self._GetSortedTests()
width = self._GetTestColumnWidth()
crashes = {}
tests_pass = 0
self._PrintDashLine(width)
for result in tests:
testdir = result['testdir']
test_entry = testdir if self._options.csv else testdir.ljust(width)
status_entry = self._GenStatusString(result['status'])
if result['status']:
color = self._color.GREEN
# Change the color of 'PASSED' if the test run wasn't completely
# ok, so it's more obvious it isn't a pure pass.
if 'WARN' in result['error_msg']:
color = self._color.YELLOW
elif 'TEST_NA' in result['error_msg']:
color = self._color.MAGENTA
tests_pass += 1
else:
color = self._color.RED
test_entries = [test_entry, self._color.Color(color, status_entry)]
info = result.get('info', {})
info.update(result.get('attr', {}))
if self._options.csv and (self._options.info or self._options.attr):
if info:
test_entries.extend(['%s=%s' % (k, info[k])
for k in sorted(info.keys())])
if not result['status'] and result['error_msg']:
test_entries.append('reason="%s"' % result['error_msg'])
self._PrintEntries(test_entries)
self._PrintErrors(test_entry, result['error_msg'])
# Print out error log for failed tests.
if not result['status']:
self._PrintErrorLogs(testdir, test_entry)
# Emit the perf keyvals entries. There will be no entries if the
# --no-perf option is specified.
self._PrintResultDictKeyVals(test_entry, result['perf'])
# Determine that there was a crash during this test.
if result['crashes']:
for crash in result['crashes']:
if not crash in crashes:
crashes[crash] = set([])
crashes[crash].add(testdir)
# Emit extra test metadata info on separate lines if not --csv.
if not self._options.csv:
self._PrintResultDictKeyVals(test_entry, info)
self._PrintDashLine(width)
if not self._options.csv:
total_tests = len(tests)
percent_pass = 100 * tests_pass / total_tests
pass_str = '%d/%d (%d%%)' % (tests_pass, total_tests, percent_pass)
print('Total PASS: ' +
self._color.Color(self._color.BOLD, pass_str))
if self._options.crash_detection:
print('')
if crashes:
print(self._color.Color(self._color.RED,
'Crashes detected during testing:'))
self._PrintDashLine(width)
for crash_name, crashed_tests in sorted(six.iteritems(crashes)):
print(self._color.Color(self._color.RED, crash_name))
for crashed_test in crashed_tests:
print(self._Indent(crashed_test))
self._PrintDashLine(width)
print(('Total unique crashes: ' +
self._color.Color(self._color.BOLD, str(len(crashes)))))
# Sometimes the builders exit before these buffers are flushed.
sys.stderr.flush()
sys.stdout.flush()
def _translate_to_dict(self):
"""Return the full_status, testname, and err to a json dict."""
res = {'tests': []}
for test_info in self._results:
if test_info['full_status'] is None:
continue
res['tests'].append(
{'verdict': test_info['full_status'],
'testname': _test_name_from_dir(test_info['testdir']),
'errmsg': test_info['full_err'],
'resultspath': test_info['testdir'],
'starttime': test_info['starttimestamp'],
'endtime': test_info['timestamp']
})
return res
def _write_simple_json(self):
"""Write the translated json results to results.json."""
if not self._options.html_report_dir:
return
json_results = generate_json_report(self._options.html_report_dir)
with open(os.path.join(self._options.html_report_dir,
"results.json"), 'w') as wf:
json.dump(json_results, wf)
def Run(self):
"""Runs report generation."""
self._CollectAllResults()
self._write_simple_json()
if not self._options.just_status_code:
self._GenerateReportText()
if self._options.html:
print("\nLogging the data into test_report.html file.")
try:
self.GenerateReportHTML()
except Exception as e:
print("Failed to generate HTML report %s" % str(e))
for d in self._GetDedupedResults():
if d['experimental'] and self._options.ignore_experimental_tests:
continue
if not d['status'] or (
self._options.crash_detection and d['crashes']):
# When a test fails, but autotest doesn't crash, do not exit(1)
if not self._options.is_cft:
sys.exit(1)
def main():
usage = 'Usage: %prog [options] result-directories...'
parser = optparse.OptionParser(usage=usage)
parser.add_option('--color', dest='color', action='store_true',
default=_STDOUT_IS_TTY,
help='Use color for text reports [default if TTY stdout]'
)
parser.add_option('--no-color', dest='color', action='store_false',
help='Don\'t use color for text reports')
parser.add_option('--no-crash-detection', dest='crash_detection',
action='store_false', default=True,
help='Don\'t report crashes or error out when detected')
parser.add_option('--csv', dest='csv', action='store_true',
help='Output test result in CSV format. '
'Implies --no-debug --no-crash-detection.')
parser.add_option('--html', dest='html', action='store_true',
help='To generate HTML File. '
'Implies --no-debug --no-crash-detection.')
parser.add_option('--html-report-dir', dest='html_report_dir',
action='store', default=None, help='Path to generate '
'html report')
parser.add_option('--info', dest='info', action='store_true',
default=False,
help='Include info keyvals in the report')
parser.add_option('--escape-error', dest='escape_error',
action='store_true', default=False,
help='Escape error message text for tools.')
parser.add_option('--perf', dest='perf', action='store_true',
default=True,
help='Include perf keyvals in the report [default]')
parser.add_option('--attr', dest='attr', action='store_true',
default=False,
help='Include attr keyvals in the report')
parser.add_option('--no-perf', dest='perf', action='store_false',
help='Don\'t include perf keyvals in the report')
parser.add_option('--sort-chron', dest='sort_chron', action='store_true',
default=False,
help='Sort results by datetime instead of by test name.')
parser.add_option('--no-debug', dest='print_debug', action='store_false',
default=True,
help='Don\'t print out logs when tests fail.')
parser.add_option('--allow_chrome_crashes',
dest='allow_chrome_crashes',
action='store_true', default=False,
help='Treat Chrome crashes as non-fatal.')
parser.add_option('--ignore_experimental_tests',
dest='ignore_experimental_tests',
action='store_true', default=False,
help='If set, experimental test results will not '
'influence the exit code.')
parser.add_option('--just_status_code',
dest='just_status_code',
action='store_true', default=False,
help='Skip generating a report, just return status code.')
parser.add_option('--cft',
dest='is_cft',
action='store_true', default=False,
help='If set: will not return 1 on test failure')
(options, args) = parser.parse_args()
if not args:
parser.print_help()
Die('no result directories provided')
if options.csv and (options.print_debug or options.crash_detection):
Warning('Forcing --no-debug --no-crash-detection')
options.print_debug = False
options.crash_detection = False
report_options = ['color', 'csv', 'info', 'escape_error', 'perf', 'attr',
'sort_chron', 'print_debug', 'html', 'html_report_dir']
if options.just_status_code and any(
getattr(options, opt) for opt in report_options):
Warning('Passed --just_status_code and incompatible options %s' %
' '.join(opt for opt in report_options if getattr(options,opt)))
generator = ReportGenerator(options, args)
generator.Run()
if __name__ == '__main__':
main()