| #!/usr/bin/python3 |
| # Copyright 2010 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| |
| """Parses and displays the contents of one or more autoserv result directories. |
| |
| This script parses the contents of one or more autoserv results folders and |
| generates test reports. |
| """ |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import datetime |
| import glob |
| import json |
| import logging |
| import operator |
| import optparse |
| import os |
| import re |
| import six |
| from six.moves import range |
| import sys |
| |
| import common |
| from autotest_lib.utils import terminal |
| |
| |
| _STDOUT_IS_TTY = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() |
| |
| |
| def _test_name_from_dir(test_dir): |
| """Return the name from the test_dir. |
| |
| Examples: |
| /tmp/test_that_latest/something/else/results-n-testname |
| returns `testname` |
| |
| /tmp/TTL/something/results-n-test-name-here |
| returns `test-name-here` |
| |
| """ |
| test_name = test_dir.split('/')[-1] |
| return '-'.join(test_name.split('-')[2:]) |
| |
| |
| def generate_json_report(topDir): |
| status_dirs = find_dirs(topDir) |
| raw_json = {'tests': []} |
| for testdir in status_dirs: |
| raw_stat = get_status(testdir) |
| load_json(raw_stat, raw_json, testdir) |
| return raw_json |
| |
| |
| def load_json(raw_stat, raw_json, testdir): |
| indentLevel = -1 |
| missingResult = False |
| lastStartTime = None |
| firstFound = False |
| contentLine = None |
| for line in raw_stat.splitlines(): |
| fields = get_fields(line) |
| # A non-formatted line won't have fields, thus don't parse it. |
| if not fields: |
| continue |
| if fields[0] == 'INFO' or fields[1] == '----': |
| continue |
| if fields[0] == 'START': |
| if missingResult: |
| # Nested starts.... skipping for now.... |
| continue |
| missingResult = True |
| lastStartTime = fields[2] |
| indentLevel = indent(line) |
| elif indent(line) == indentLevel + 1: |
| if missingResult is not True: |
| raise Exception('Content found out of context.') |
| contentLine = line |
| elif indent(line) == indentLevel and isEndLine(fields[0]): |
| missingResult = False |
| verdict = (parseResult(fields[0])) |
| _translate_to_dict(raw_json, fields, verdict, contentLine, |
| lastStartTime, testdir, firstFound) |
| firstFound = True |
| else: |
| # This is the case where we are dealing with lots of spammy logs |
| # inside of a task/test. |
| # We want to keep the parser as simple as possible: |
| # 1.) Find start of a test |
| # 2.) Find the first indented line prior to the close line. |
| # 3.) Find the closing line which matches the indent. |
| # 4.) Save all that info as a result. |
| continue |
| |
| |
| def isEndLine(field): |
| if re.search(r'%s' % 'END', field): |
| return True |
| return False |
| |
| |
| def _raw_time(timestamp): |
| """timestamp=1670062718 --> 1670062718""" |
| return timestamp.replace('timestamp=', '') |
| |
| |
| def _translate_to_dict(res, fields, verdict, contentLine, |
| lastStartTime, testdir, firstFound): |
| """Return the full_status, testname, and err to a json dict.""" |
| if not firstFound: |
| # Sigh, the first test found will actually be the full test name. |
| # because of inconsistencies in autotest tests, tast tests, and tagged |
| # tests, we can't actually "trust" this name, thus we must get it from |
| # the dir name, which is always correct. |
| # The following test names will WAI. |
| name = _test_name_from_dir(testdir) |
| else: |
| name = fields[1] |
| res['tests'].append( |
| {'verdict': verdict, |
| 'testname': name, |
| 'errmsg': errFromContent(contentLine), |
| 'resultspath': testdir, |
| 'starttime': _raw_time(lastStartTime), |
| 'endtime': _raw_time(fields[2]) |
| }) |
| |
| |
| def errFromContent(contentLine): |
| return contentLine.split('\t')[-1] if contentLine else "" |
| |
| |
| def parseResult(status_raw): |
| status = 'Error' |
| if re.search(r'%s' % 'FAIL', status_raw): |
| status = 'Fail' |
| elif re.search(r'%s' % 'ERROR', status_raw): |
| status = 'Error' |
| elif re.search(r'%s' % 'ABORT', status_raw): |
| status = 'Abort' |
| elif re.search(r'%s' % 'WARN', status_raw): |
| status = 'Warn' |
| elif re.search(r'%s' % 'TEST_NA', status_raw): |
| status = 'Not Run' |
| elif re.search(r'GOOD', status_raw): |
| status = 'Pass' |
| return status |
| |
| |
| def get_fields(line): |
| indents = indent(line) |
| items = line.split('\t') |
| # A "real" status line will at min have 6 fields. |
| if len(items) < 6: |
| return None |
| status = items[0 + indents] |
| name = items[2 + indents] |
| taskTime = items[3 + indents] |
| return status, name, taskTime |
| |
| |
| def indent(line): |
| return (len(line) - len(line.lstrip('\t'))) |
| |
| |
| def get_status(testdir): |
| status_file = os.path.join(testdir, 'status.log') |
| if not os.path.isfile(status_file): |
| return |
| return open(status_file, 'r').read() |
| |
| |
| def find_dirs(topDir): |
| f = [] |
| for (dirpath, dirnames, filenames) in os.walk(topDir): |
| for d in dirnames: |
| if 'results-' not in d: |
| continue |
| fp = os.path.join(dirpath, d) |
| if 'status.log' in os.listdir(fp): |
| f.append(fp) |
| return f |
| |
| |
| def Die(message_format, *args, **kwargs): |
| """Log a message and kill the current process. |
| |
| @param message_format: string for logging.error. |
| |
| """ |
| logging.error(message_format, *args, **kwargs) |
| sys.exit(1) |
| |
| |
| class CrashWaiver: |
| """Represents a crash that we want to ignore for now.""" |
| def __init__(self, signals, deadline, url, person): |
| self.signals = signals |
| self.deadline = datetime.datetime.strptime(deadline, '%Y-%b-%d') |
| self.issue_url = url |
| self.suppressor = person |
| |
| # List of crashes which are okay to ignore. This list should almost always be |
| # empty. If you add an entry, include the bug URL and your name, something like |
| # 'crashy':CrashWaiver( |
| # ['sig 11'], '2011-Aug-18', 'http://crosbug/123456', 'developer'), |
| |
| _CRASH_ALLOWLIST = { |
| } |
| |
| |
| class ResultCollector(object): |
| """Collects status and performance data from an autoserv results dir.""" |
| |
| def __init__(self, collect_perf=True, collect_attr=False, |
| collect_info=False, escape_error=False, |
| allow_chrome_crashes=False): |
| """Initialize ResultsCollector class. |
| |
| @param collect_perf: Should perf keyvals be collected? |
| @param collect_attr: Should attr keyvals be collected? |
| @param collect_info: Should info keyvals be collected? |
| @param escape_error: Escape error message text for tools. |
| @param allow_chrome_crashes: Treat Chrome crashes as non-fatal. |
| |
| """ |
| self._collect_perf = collect_perf |
| self._collect_attr = collect_attr |
| self._collect_info = collect_info |
| self._escape_error = escape_error |
| self._allow_chrome_crashes = allow_chrome_crashes |
| |
| def _CollectPerf(self, testdir): |
| """Parses keyval file under testdir and return the perf keyval pairs. |
| |
| @param testdir: autoserv test result directory path. |
| |
| @return dict of perf keyval pairs. |
| |
| """ |
| if not self._collect_perf: |
| return {} |
| return self._CollectKeyval(testdir, 'perf') |
| |
| def _CollectAttr(self, testdir): |
| """Parses keyval file under testdir and return the attr keyval pairs. |
| |
| @param testdir: autoserv test result directory path. |
| |
| @return dict of attr keyval pairs. |
| |
| """ |
| if not self._collect_attr: |
| return {} |
| return self._CollectKeyval(testdir, 'attr') |
| |
| def _CollectKeyval(self, testdir, keyword): |
| """Parses keyval file under testdir. |
| |
| If testdir contains a result folder, process the keyval file and return |
| a dictionary of perf keyval pairs. |
| |
| @param testdir: The autoserv test result directory. |
| @param keyword: The keyword of keyval, either 'perf' or 'attr'. |
| |
| @return If the perf option is disabled or the there's no keyval file |
| under testdir, returns an empty dictionary. Otherwise, returns |
| a dictionary of parsed keyvals. Duplicate keys are uniquified |
| by their instance number. |
| |
| """ |
| keyval = {} |
| keyval_file = os.path.join(testdir, 'results', 'keyval') |
| if not os.path.isfile(keyval_file): |
| return keyval |
| |
| instances = {} |
| |
| for line in open(keyval_file): |
| match = re.search(r'^(.+){%s}=(.+)$' % keyword, line) |
| if match: |
| key = match.group(1) |
| val = match.group(2) |
| |
| # If the same key name was generated multiple times, uniquify |
| # all instances other than the first one by adding the instance |
| # count to the key name. |
| key_inst = key |
| instance = instances.get(key, 0) |
| if instance: |
| key_inst = '%s{%d}' % (key, instance) |
| instances[key] = instance + 1 |
| |
| keyval[key_inst] = val |
| |
| return keyval |
| |
| def _CollectCrashes(self, status_raw): |
| """Parses status_raw file for crashes. |
| |
| Saves crash details if crashes are discovered. If an allowlist is |
| present, only records allowed crashes. |
| |
| @param status_raw: The contents of the status.log or status file from |
| the test. |
| |
| @return a list of crash entries to be reported. |
| |
| """ |
| crashes = [] |
| regex = re.compile( |
| 'Received crash notification for ([-\w]+).+ (sig \d+)') |
| chrome_regex = re.compile(r'^supplied_[cC]hrome|^chrome$') |
| for match in regex.finditer(status_raw): |
| w = _CRASH_ALLOWLIST.get(match.group(1)) |
| if (self._allow_chrome_crashes and |
| chrome_regex.match(match.group(1))): |
| print('@@@STEP_WARNINGS@@@') |
| print('%s crashed with %s' % (match.group(1), match.group(2))) |
| elif (w is not None and match.group(2) in w.signals and |
| w.deadline > datetime.datetime.now()): |
| print('Ignoring crash in %s for waiver that expires %s' % ( |
| match.group(1), w.deadline.strftime('%Y-%b-%d'))) |
| else: |
| crashes.append('%s %s' % match.groups()) |
| return crashes |
| |
| def _CollectInfo(self, testdir, custom_info): |
| """Parses *_info files under testdir/sysinfo/var/log. |
| |
| If the sysinfo/var/log/*info files exist, save information that shows |
| hw, ec and bios version info. |
| |
| This collection of extra info is disabled by default (this funtion is |
| a no-op). It is enabled only if the --info command-line option is |
| explicitly supplied. Normal job parsing does not supply this option. |
| |
| @param testdir: The autoserv test result directory. |
| @param custom_info: Dictionary to collect detailed ec/bios info. |
| |
| @return a dictionary of info that was discovered. |
| |
| """ |
| if not self._collect_info: |
| return {} |
| info = custom_info |
| |
| sysinfo_dir = os.path.join(testdir, 'sysinfo', 'var', 'log') |
| for info_file, info_keys in six.iteritems( |
| {'ec_info.txt': ['fw_version'], |
| 'bios_info.txt': ['fwid', 'hwid']}): |
| info_file_path = os.path.join(sysinfo_dir, info_file) |
| if not os.path.isfile(info_file_path): |
| continue |
| # Some example raw text that might be matched include: |
| # |
| # fw_version | snow_v1.1.332-cf20b3e |
| # fwid = Google_Snow.2711.0.2012_08_06_1139 # Active firmware ID |
| # hwid = DAISY TEST A-A 9382 # Hardware ID |
| info_regex = re.compile(r'^(%s)\s*[|=]\s*(.*)' % |
| '|'.join(info_keys)) |
| with open(info_file_path, 'r') as f: |
| for line in f: |
| line = line.strip() |
| line = line.split('#')[0] |
| match = info_regex.match(line) |
| if match: |
| info[match.group(1)] = str(match.group(2)).strip() |
| return info |
| |
| def _CollectEndTimes(self, status_raw, status_re='', is_end=True): |
| """Helper to match and collect timestamp and localtime. |
| |
| Preferred to locate timestamp and localtime with an |
| 'END GOOD test_name...' line. However, aborted tests occasionally fail |
| to produce this line and then need to scrape timestamps from the 'START |
| test_name...' line. |
| |
| @param status_raw: multi-line text to search. |
| @param status_re: status regex to seek (e.g. GOOD|FAIL) |
| @param is_end: if True, search for 'END' otherwise 'START'. |
| |
| @return Tuple of timestamp, localtime retrieved from the test status |
| log. |
| |
| """ |
| timestamp = '' |
| localtime = '' |
| |
| localtime_re = r'\w+\s+\w+\s+[:\w]+' |
| match_filter = ( |
| r'^\s*%s\s+(?:%s).*timestamp=(\d*).*localtime=(%s).*$' % ( |
| 'END' if is_end else 'START', status_re, localtime_re)) |
| matches = re.findall(match_filter, status_raw, re.MULTILINE) |
| if matches: |
| # There may be multiple lines with timestamp/localtime info. |
| # The last one found is selected because it will reflect the end |
| # time. |
| for i in range(len(matches)): |
| timestamp_, localtime_ = matches[-(i+1)] |
| if not timestamp or timestamp_ > timestamp: |
| timestamp = timestamp_ |
| localtime = localtime_ |
| return timestamp, localtime |
| |
| def _CheckExperimental(self, testdir): |
| """Parses keyval file and return the value of `experimental`. |
| |
| @param testdir: The result directory that has the keyval file. |
| |
| @return The value of 'experimental', which is a boolean value indicating |
| whether it is an experimental test or not. |
| |
| """ |
| keyval_file = os.path.join(testdir, 'keyval') |
| if not os.path.isfile(keyval_file): |
| return False |
| |
| with open(keyval_file) as f: |
| for line in f: |
| match = re.match(r'experimental=(.+)', line) |
| if match: |
| return match.group(1) == 'True' |
| else: |
| return False |
| |
| def _get_failure_msg_from_status(self, status_raw): |
| reason_tags = 'ABORT|ERROR|FAIL|WARN|TEST_NA' |
| match = re.search(r'^\t+(%s)\t(.+)' % (reason_tags), |
| status_raw, re.MULTILINE) |
| |
| error_msg = 'Reason Unknown' |
| if match: |
| failure_type = match.group(1) |
| reason = match.group(2).split('\t')[4] |
| if self._escape_error: |
| reason = re.escape(reason) |
| error_msg = ': '.join([failure_type, reason]) |
| |
| return error_msg |
| |
| def _get_full_status(self, status_raw): |
| """Collect the full status of a test, and err msg if any. |
| |
| This will grab the full status, rather than just pass/fail. |
| Additionally, if there is an err msg, it will be scraped as well. |
| |
| @param status_raw: the status log, as a string. |
| |
| @return The full status, and the err msg, if any. |
| |
| """ |
| status = 'Error' |
| if re.search(r'%s' % 'FAIL', status_raw): |
| status = 'Fail' |
| elif re.search(r'%s' % 'ERROR', status_raw): |
| status = 'Error' |
| elif re.search(r'%s' % 'ABORT', status_raw): |
| status = 'Abort' |
| elif re.search(r'%s' % 'WARN', status_raw): |
| status = 'Warn' |
| elif re.search(r'%s' % 'TEST_NA', status_raw): |
| status = 'Not Run' |
| elif re.search(r'GOOD.+completed successfully', status_raw): |
| status = 'Pass' |
| return status, None |
| |
| return status, self._get_failure_msg_from_status(status_raw) |
| |
| def _CollectResult(self, testdir, results, is_experimental=False): |
| """Collects results stored under testdir into a dictionary. |
| |
| The presence/location of status files (status.log, status and |
| job_report.html) varies depending on whether the job is a simple |
| client test, simple server test, old-style suite or new-style |
| suite. For example: |
| -In some cases a single job_report.html may exist but many times |
| multiple instances are produced in a result tree. |
| -Most tests will produce a status.log but client tests invoked |
| by a server test will only emit a status file. |
| |
| The two common criteria that seem to define the presence of a |
| valid test result are: |
| 1. Existence of a 'status.log' or 'status' file. Note that if both a |
| 'status.log' and 'status' file exist for a test, the 'status' file |
| is always a subset of the 'status.log' fle contents. |
| 2. Presence of a 'debug' directory. |
| |
| In some cases multiple 'status.log' files will exist where the parent |
| 'status.log' contains the contents of multiple subdirectory 'status.log' |
| files. Parent and subdirectory 'status.log' files are always expected |
| to agree on the outcome of a given test. |
| |
| The test results discovered from the 'status*' files are included |
| in the result dictionary. The test directory name and a test directory |
| timestamp/localtime are saved to be used as sort keys for the results. |
| |
| The value of 'is_experimental' is included in the result dictionary. |
| |
| @param testdir: The autoserv test result directory. |
| @param results: A list to which a populated test-result-dictionary will |
| be appended if a status file is found. |
| @param is_experimental: A boolean value indicating whether the result |
| directory is for an experimental test. |
| |
| """ |
| status_file = os.path.join(testdir, 'status.log') |
| top_level = True |
| |
| if not os.path.isfile(status_file): |
| status_file = os.path.join(testdir, 'status') |
| top_level = False |
| if not os.path.isfile(status_file): |
| return |
| |
| # Status is True if GOOD, else False for all others. |
| status = False |
| error_msg = '' |
| status_raw = open(status_file, 'r').read() |
| failure_tags = 'ABORT|ERROR|FAIL' |
| warning_tag = 'WARN|TEST_NA' |
| failure = re.search(r'%s' % failure_tags, status_raw) |
| warning = re.search(r'%s' % warning_tag, status_raw) and not failure |
| good = (re.search(r'GOOD.+completed successfully', status_raw) and |
| not (failure or warning)) |
| |
| # We'd like warnings to allow the tests to pass, but still gather info. |
| if good or warning: |
| status = True |
| |
| if not good: |
| error_msg = self._get_failure_msg_from_status(status_raw) |
| |
| # Grab the timestamp - can be used for sorting the test runs. |
| # Grab the localtime - may be printed to enable line filtering by date. |
| # Designed to match a line like this: |
| # END GOOD testname ... timestamp=1347324321 localtime=Sep 10 17:45:21 |
| status_re = r'GOOD|%s|%s' % (failure_tags, warning_tag) |
| endtimestamp, endlocaltime = self._CollectEndTimes(status_raw, |
| status_re) |
| starttimestamp, startlocaltime = self._CollectEndTimes(status_raw, |
| is_end=False) |
| # Hung tests will occasionally skip printing the END line so grab |
| # a default timestamp from the START line in those cases. |
| if not endtimestamp: |
| endtimestamp, endlocaltime = starttimestamp, startlocaltime |
| |
| full_status = False |
| for r in results: |
| # Already logged results for this test. |
| if r['testdir'] in testdir: |
| full_status, err = None, None |
| break |
| |
| if full_status is not None: |
| full_status, err = self._get_full_status(status_raw) |
| |
| results.append({ |
| 'testdir': testdir, |
| 'crashes': self._CollectCrashes(status_raw), |
| 'status': status, |
| 'error_msg': error_msg, |
| 'localtime': endlocaltime, |
| 'timestamp': endtimestamp, |
| 'perf': self._CollectPerf(testdir), |
| 'attr': self._CollectAttr(testdir), |
| 'info': self._CollectInfo(testdir, {'localtime': endlocaltime, |
| 'timestamp': endtimestamp}), |
| 'experimental': is_experimental, |
| 'full_status': full_status, |
| 'full_err': err, |
| 'startlocaltime': startlocaltime, |
| 'starttimestamp': starttimestamp |
| }) |
| |
| def RecursivelyCollectResults(self, |
| resdir, |
| parent_experimental_tag=False, |
| results=[]): |
| """Recursively collect results into a list of dictionaries. |
| |
| Only recurses into directories that possess a 'debug' subdirectory |
| because anything else is not considered a 'test' directory. |
| |
| The value of 'experimental' in keyval file is used to determine whether |
| the result is for an experimental test. If it is, all its sub |
| directories are considered to be experimental tests too. |
| |
| @param resdir: results/test directory to parse results from and recurse |
| into. |
| @param parent_experimental_tag: A boolean value, used to keep track of |
| whether its parent directory is for an experimental test. |
| |
| @return List of dictionaries of results. |
| |
| """ |
| is_experimental = (parent_experimental_tag or |
| self._CheckExperimental(resdir)) |
| self._CollectResult(resdir, results, is_experimental) |
| for testdir in glob.glob(os.path.join(resdir, '*')): |
| # Remove false positives that are missing a debug dir. |
| if not os.path.exists(os.path.join(testdir, 'debug')): |
| continue |
| |
| self.RecursivelyCollectResults(testdir, is_experimental, results) |
| return results |
| |
| |
| class ReportGenerator(object): |
| """Collects and displays data from autoserv results directories. |
| |
| This class collects status and performance data from one or more autoserv |
| result directories and generates test reports. |
| """ |
| |
| _KEYVAL_INDENT = 2 |
| _STATUS_STRINGS = {'hr': {'pass': '[ PASSED ]', 'fail': '[ FAILED ]'}, |
| 'csv': {'pass': 'PASS', 'fail': 'FAIL'}} |
| |
| def __init__(self, options, args): |
| self._options = options |
| self._args = args |
| self._color = terminal.Color(options.color) |
| self._results = [] |
| |
| def _CollectAllResults(self): |
| """Parses results into the self._results list. |
| |
| Builds a list (self._results) where each entry is a dictionary of |
| result data from one test (which may contain other tests). Each |
| dictionary will contain values such as: test folder, status, localtime, |
| crashes, error_msg, perf keyvals [optional], info [optional]. |
| |
| """ |
| collector = ResultCollector( |
| collect_perf=self._options.perf, |
| collect_attr=self._options.attr, |
| collect_info=self._options.info, |
| escape_error=self._options.escape_error, |
| allow_chrome_crashes=self._options.allow_chrome_crashes) |
| |
| for resdir in self._args: |
| if not os.path.isdir(resdir): |
| Die('%r does not exist', resdir) |
| self._results.extend(collector.RecursivelyCollectResults(resdir)) |
| |
| if not self._results: |
| Die('no test directories found') |
| |
| def _GenStatusString(self, status): |
| """Given a bool indicating success or failure, return the right string. |
| |
| Also takes --csv into account, returns old-style strings if it is set. |
| |
| @param status: True or False, indicating success or failure. |
| |
| @return The appropriate string for printing.. |
| |
| """ |
| success = 'pass' if status else 'fail' |
| if self._options.csv: |
| return self._STATUS_STRINGS['csv'][success] |
| return self._STATUS_STRINGS['hr'][success] |
| |
| def _Indent(self, msg): |
| """Given a message, indents it appropriately. |
| |
| @param msg: string to indent. |
| @return indented version of msg. |
| |
| """ |
| return ' ' * self._KEYVAL_INDENT + msg |
| |
| def _GetTestColumnWidth(self): |
| """Returns the test column width based on the test data. |
| |
| The test results are aligned by discovering the longest width test |
| directory name or perf key stored in the list of result dictionaries. |
| |
| @return The width for the test column. |
| |
| """ |
| width = 0 |
| for result in self._results: |
| width = max(width, len(result['testdir'])) |
| perf = result.get('perf') |
| if perf: |
| perf_key_width = len(max(perf, key=len)) |
| width = max(width, perf_key_width + self._KEYVAL_INDENT) |
| return width |
| |
| def _PrintDashLine(self, width): |
| """Prints a line of dashes as a separator in output. |
| |
| @param width: an integer. |
| """ |
| if not self._options.csv: |
| print(''.ljust(width + |
| len(self._STATUS_STRINGS['hr']['pass']), '-')) |
| |
| def _PrintEntries(self, entries): |
| """Prints a list of strings, delimited based on --csv flag. |
| |
| @param entries: a list of strings, entities to output. |
| |
| """ |
| delimiter = ',' if self._options.csv else ' ' |
| print(delimiter.join(entries)) |
| |
| def _PrintErrors(self, test, error_msg): |
| """Prints an indented error message, unless the --csv flag is set. |
| |
| @param test: the name of a test with which to prefix the line. |
| @param error_msg: a message to print. None is allowed, but ignored. |
| |
| """ |
| if not self._options.csv and error_msg: |
| self._PrintEntries([test, self._Indent(error_msg)]) |
| |
| def _PrintErrorLogs(self, test, test_string): |
| """Prints the error log for |test| if --debug is set. |
| |
| @param test: the name of a test suitable for embedding in a path |
| @param test_string: the name of a test with which to prefix the line. |
| |
| """ |
| if self._options.print_debug: |
| debug_file_regex = os.path.join( |
| 'results.', test, 'debug', |
| '%s*.ERROR' % os.path.basename(test)) |
| for path in glob.glob(debug_file_regex): |
| try: |
| with open(path) as fh: |
| for line in fh: |
| # Ensure line is not just WS. |
| if len(line.lstrip()) <= 0: |
| continue |
| self._PrintEntries( |
| [test_string, self._Indent(line.rstrip())]) |
| except IOError: |
| print('Could not open %s' % path) |
| |
| def _PrintResultDictKeyVals(self, test_entry, result_dict): |
| """Formatted print a dict of keyvals like 'perf' or 'info'. |
| |
| This function emits each keyval on a single line for uncompressed |
| review. The 'perf' dictionary contains performance keyvals while the |
| 'info' dictionary contains ec info, bios info and some test timestamps. |
| |
| @param test_entry: The unique name of the test (dir) - matches other |
| test output. |
| @param result_dict: A dict of keyvals to be presented. |
| |
| """ |
| if not result_dict: |
| return |
| dict_keys = list(result_dict.keys()) |
| dict_keys.sort() |
| width = self._GetTestColumnWidth() |
| for dict_key in dict_keys: |
| if self._options.csv: |
| key_entry = dict_key |
| else: |
| key_entry = dict_key.ljust(width - self._KEYVAL_INDENT) |
| key_entry = key_entry.rjust(width) |
| value_entry = self._color.Color( |
| self._color.BOLD, result_dict[dict_key]) |
| self._PrintEntries([test_entry, key_entry, value_entry]) |
| |
| def _GetSortedTests(self): |
| """Sort the test result dicts in preparation for results printing. |
| |
| By default sorts the results directionaries by their test names. |
| However, when running long suites, it is useful to see if an early test |
| has wedged the system and caused the remaining tests to abort/fail. The |
| datetime-based chronological sorting allows this view. |
| |
| Uses the --sort-chron command line option to control. |
| |
| """ |
| if self._options.sort_chron: |
| # Need to reverse sort the test dirs to ensure the suite folder |
| # shows at the bottom. Because the suite folder shares its datetime |
| # with the last test it shows second-to-last without the reverse |
| # sort first. |
| tests = sorted(self._results, key=operator.itemgetter('testdir'), |
| reverse=True) |
| tests = sorted(tests, key=operator.itemgetter('timestamp')) |
| else: |
| tests = sorted(self._results, key=operator.itemgetter('testdir')) |
| return tests |
| |
| # TODO(zamorzaev): reuse this method in _GetResultsForHTMLReport to avoid |
| # code copying. |
| def _GetDedupedResults(self): |
| """Aggregate results from multiple retries of the same test.""" |
| deduped_results = {} |
| for test in self._GetSortedTests(): |
| test_details_matched = re.search(r'(.*)results-(\d[0-9]*)-(.*)', |
| test['testdir']) |
| if not test_details_matched: |
| continue |
| |
| log_dir, test_number, test_name = test_details_matched.groups() |
| if (test_name in deduped_results and |
| deduped_results[test_name].get('status')): |
| # Already have a successfull (re)try. |
| continue |
| |
| deduped_results[test_name] = test |
| return list(deduped_results.values()) |
| |
| def _GetResultsForHTMLReport(self): |
| """Return cleaned results for HTML report.!""" |
| import copy |
| tests = copy.deepcopy(self._GetSortedTests()) |
| pass_tag = "Pass" |
| fail_tag = "Fail" |
| na_tag = "NA" |
| count = 0 |
| html_results = {} |
| for test_status in tests: |
| individual_tc_results = {} |
| test_details_matched = re.search(r'(.*)results-(\d[0-9]*)-(.*)', |
| test_status['testdir']) |
| if not test_details_matched: |
| continue |
| log_dir = test_details_matched.group(1) |
| test_number = test_details_matched.group(2) |
| test_name = test_details_matched.group(3) |
| if '/' in test_name: |
| test_name = test_name.split('/')[0] |
| if test_status['error_msg'] is None: |
| test_status['error_msg'] = '' |
| if test_name not in html_results: |
| count = count + 1 |
| # Arranging the results in an order |
| individual_tc_results['status'] = test_status['status'] |
| individual_tc_results['error_msg'] = test_status['error_msg'] |
| individual_tc_results['s_no'] = count |
| individual_tc_results['crashes'] = test_status['crashes'] |
| |
| # Add <b> and </b> tag for the good format in the report. |
| individual_tc_results['attempts'] = \ |
| '<b>test_result_number: %s - %s</b> : %s' % ( |
| test_number, log_dir, test_status['error_msg']) |
| html_results[test_name] = individual_tc_results |
| else: |
| |
| # If test found already then we are using the previous data |
| # instead of creating two different html rows. If existing |
| # status is False then needs to be updated |
| if html_results[test_name]['status'] is False: |
| html_results[test_name]['status'] = test_status['status'] |
| html_results[test_name]['error_msg'] = test_status[ |
| 'error_msg'] |
| html_results[test_name]['crashes'] = \ |
| html_results[test_name]['crashes'] + test_status[ |
| 'crashes'] |
| html_results[test_name]['attempts'] = \ |
| html_results[test_name]['attempts'] + \ |
| '</br><b>test_result_number : %s - %s</b> : %s' % ( |
| test_number, log_dir, test_status['error_msg']) |
| |
| # Re-formating the dictionary as s_no as key. So that we can have |
| # ordered data at the end |
| sorted_html_results = {} |
| for key in html_results.keys(): |
| sorted_html_results[str(html_results[key]['s_no'])] = \ |
| html_results[key] |
| sorted_html_results[str(html_results[key]['s_no'])]['test'] = key |
| |
| # Mapping the Test case status if True->Pass, False->Fail and if |
| # True and the error message then NA |
| for key in sorted_html_results.keys(): |
| if sorted_html_results[key]['status']: |
| if sorted_html_results[key]['error_msg'] != '': |
| sorted_html_results[key]['status'] = na_tag |
| else: |
| sorted_html_results[key]['status'] = pass_tag |
| else: |
| sorted_html_results[key]['status'] = fail_tag |
| |
| return sorted_html_results |
| |
| def GenerateReportHTML(self): |
| """Generate clean HTMl report for the results.""" |
| |
| results = self._GetResultsForHTMLReport() |
| html_table_header = """ <th>S.No</th> |
| <th>Test</th> |
| <th>Status</th> |
| <th>Error Message</th> |
| <th>Crashes</th> |
| <th>Attempts</th> |
| """ |
| passed_tests = len([key for key in results.keys() if results[key][ |
| 'status'].lower() == 'pass']) |
| failed_tests = len([key for key in results.keys() if results[key][ |
| 'status'].lower() == 'fail']) |
| na_tests = len([key for key in results.keys() if results[key][ |
| 'status'].lower() == 'na']) |
| total_tests = passed_tests + failed_tests + na_tests |
| |
| # Sort the keys |
| ordered_keys = sorted([int(key) for key in results.keys()]) |
| html_table_body = '' |
| for key in ordered_keys: |
| key = str(key) |
| if results[key]['status'].lower() == 'pass': |
| color = 'LimeGreen' |
| elif results[key]['status'].lower() == 'na': |
| color = 'yellow' |
| else: |
| color = 'red' |
| html_table_body = html_table_body + """<tr> |
| <td>%s</td> |
| <td>%s</td> |
| <td |
| style="background-color:%s;"> |
| %s</td> |
| <td>%s</td> |
| <td>%s</td> |
| <td>%s</td></tr>""" % \ |
| (key, results[key]['test'], |
| color, |
| results[key]['status'], |
| results[key]['error_msg'], |
| results[key]['crashes'], |
| results[key]['attempts']) |
| html_page = """ |
| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <title>Automation Results</title> |
| <meta charset="utf-8"> |
| <meta name="viewport" content="width=device-width,initial-scale=1"> |
| <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css"> |
| <script src="https://ajax.googleapis.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script> |
| <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/js/bootstrap.min.js"></script> |
| </head> |
| <body> |
| <div class="container"> |
| <h2>Automation Report</h2> |
| <table class="table table-bordered" border="1"> |
| <thead> |
| <tr style="background-color:LightSkyBlue;"> |
| \n%s |
| </tr> |
| </thead> |
| <tbody> |
| \n%s |
| </tbody> |
| </table> |
| <div class="row"> |
| <div class="col-sm-4">Passed: <b>%d</b></div> |
| <div class="col-sm-4">Failed: <b>%d</b></div> |
| <div class="col-sm-4">NA: <b>%d</b></div> |
| </div> |
| <div class="row"> |
| <div class="col-sm-4">Total: <b>%d</b></div> |
| </div> |
| </div> |
| </body> |
| </html> |
| |
| """ % (html_table_header, html_table_body, passed_tests, |
| failed_tests, na_tests, total_tests) |
| with open(os.path.join(self._options.html_report_dir, |
| "test_report.html"), 'w') as html_file: |
| html_file.write(html_page) |
| |
| def _GenerateReportText(self): |
| """Prints a result report to stdout. |
| |
| Prints a result table to stdout. Each row of the table contains the |
| test result directory and the test result (PASS, FAIL). If the perf |
| option is enabled, each test entry is followed by perf keyval entries |
| from the test results. |
| |
| """ |
| tests = self._GetSortedTests() |
| width = self._GetTestColumnWidth() |
| |
| crashes = {} |
| tests_pass = 0 |
| self._PrintDashLine(width) |
| |
| for result in tests: |
| testdir = result['testdir'] |
| test_entry = testdir if self._options.csv else testdir.ljust(width) |
| |
| status_entry = self._GenStatusString(result['status']) |
| if result['status']: |
| color = self._color.GREEN |
| # Change the color of 'PASSED' if the test run wasn't completely |
| # ok, so it's more obvious it isn't a pure pass. |
| if 'WARN' in result['error_msg']: |
| color = self._color.YELLOW |
| elif 'TEST_NA' in result['error_msg']: |
| color = self._color.MAGENTA |
| tests_pass += 1 |
| else: |
| color = self._color.RED |
| |
| test_entries = [test_entry, self._color.Color(color, status_entry)] |
| |
| info = result.get('info', {}) |
| info.update(result.get('attr', {})) |
| if self._options.csv and (self._options.info or self._options.attr): |
| if info: |
| test_entries.extend(['%s=%s' % (k, info[k]) |
| for k in sorted(info.keys())]) |
| if not result['status'] and result['error_msg']: |
| test_entries.append('reason="%s"' % result['error_msg']) |
| |
| self._PrintEntries(test_entries) |
| self._PrintErrors(test_entry, result['error_msg']) |
| |
| # Print out error log for failed tests. |
| if not result['status']: |
| self._PrintErrorLogs(testdir, test_entry) |
| |
| # Emit the perf keyvals entries. There will be no entries if the |
| # --no-perf option is specified. |
| self._PrintResultDictKeyVals(test_entry, result['perf']) |
| |
| # Determine that there was a crash during this test. |
| if result['crashes']: |
| for crash in result['crashes']: |
| if not crash in crashes: |
| crashes[crash] = set([]) |
| crashes[crash].add(testdir) |
| |
| # Emit extra test metadata info on separate lines if not --csv. |
| if not self._options.csv: |
| self._PrintResultDictKeyVals(test_entry, info) |
| |
| self._PrintDashLine(width) |
| |
| if not self._options.csv: |
| total_tests = len(tests) |
| percent_pass = 100 * tests_pass / total_tests |
| pass_str = '%d/%d (%d%%)' % (tests_pass, total_tests, percent_pass) |
| print('Total PASS: ' + |
| self._color.Color(self._color.BOLD, pass_str)) |
| |
| if self._options.crash_detection: |
| print('') |
| if crashes: |
| print(self._color.Color(self._color.RED, |
| 'Crashes detected during testing:')) |
| self._PrintDashLine(width) |
| |
| for crash_name, crashed_tests in sorted(six.iteritems(crashes)): |
| print(self._color.Color(self._color.RED, crash_name)) |
| for crashed_test in crashed_tests: |
| print(self._Indent(crashed_test)) |
| |
| self._PrintDashLine(width) |
| print(('Total unique crashes: ' + |
| self._color.Color(self._color.BOLD, str(len(crashes))))) |
| |
| # Sometimes the builders exit before these buffers are flushed. |
| sys.stderr.flush() |
| sys.stdout.flush() |
| |
| def _translate_to_dict(self): |
| """Return the full_status, testname, and err to a json dict.""" |
| res = {'tests': []} |
| for test_info in self._results: |
| if test_info['full_status'] is None: |
| continue |
| res['tests'].append( |
| {'verdict': test_info['full_status'], |
| 'testname': _test_name_from_dir(test_info['testdir']), |
| 'errmsg': test_info['full_err'], |
| 'resultspath': test_info['testdir'], |
| 'starttime': test_info['starttimestamp'], |
| 'endtime': test_info['timestamp'] |
| }) |
| return res |
| |
| def _write_simple_json(self): |
| """Write the translated json results to results.json.""" |
| if not self._options.html_report_dir: |
| return |
| json_results = generate_json_report(self._options.html_report_dir) |
| with open(os.path.join(self._options.html_report_dir, |
| "results.json"), 'w') as wf: |
| json.dump(json_results, wf) |
| |
| def Run(self): |
| """Runs report generation.""" |
| self._CollectAllResults() |
| self._write_simple_json() |
| if not self._options.just_status_code: |
| self._GenerateReportText() |
| if self._options.html: |
| print("\nLogging the data into test_report.html file.") |
| try: |
| self.GenerateReportHTML() |
| except Exception as e: |
| print("Failed to generate HTML report %s" % str(e)) |
| for d in self._GetDedupedResults(): |
| if d['experimental'] and self._options.ignore_experimental_tests: |
| continue |
| if not d['status'] or ( |
| self._options.crash_detection and d['crashes']): |
| # When a test fails, but autotest doesn't crash, do not exit(1) |
| if not self._options.is_cft: |
| sys.exit(1) |
| |
| |
| def main(): |
| usage = 'Usage: %prog [options] result-directories...' |
| parser = optparse.OptionParser(usage=usage) |
| parser.add_option('--color', dest='color', action='store_true', |
| default=_STDOUT_IS_TTY, |
| help='Use color for text reports [default if TTY stdout]' |
| ) |
| parser.add_option('--no-color', dest='color', action='store_false', |
| help='Don\'t use color for text reports') |
| parser.add_option('--no-crash-detection', dest='crash_detection', |
| action='store_false', default=True, |
| help='Don\'t report crashes or error out when detected') |
| parser.add_option('--csv', dest='csv', action='store_true', |
| help='Output test result in CSV format. ' |
| 'Implies --no-debug --no-crash-detection.') |
| parser.add_option('--html', dest='html', action='store_true', |
| help='To generate HTML File. ' |
| 'Implies --no-debug --no-crash-detection.') |
| parser.add_option('--html-report-dir', dest='html_report_dir', |
| action='store', default=None, help='Path to generate ' |
| 'html report') |
| parser.add_option('--info', dest='info', action='store_true', |
| default=False, |
| help='Include info keyvals in the report') |
| parser.add_option('--escape-error', dest='escape_error', |
| action='store_true', default=False, |
| help='Escape error message text for tools.') |
| parser.add_option('--perf', dest='perf', action='store_true', |
| default=True, |
| help='Include perf keyvals in the report [default]') |
| parser.add_option('--attr', dest='attr', action='store_true', |
| default=False, |
| help='Include attr keyvals in the report') |
| parser.add_option('--no-perf', dest='perf', action='store_false', |
| help='Don\'t include perf keyvals in the report') |
| parser.add_option('--sort-chron', dest='sort_chron', action='store_true', |
| default=False, |
| help='Sort results by datetime instead of by test name.') |
| parser.add_option('--no-debug', dest='print_debug', action='store_false', |
| default=True, |
| help='Don\'t print out logs when tests fail.') |
| parser.add_option('--allow_chrome_crashes', |
| dest='allow_chrome_crashes', |
| action='store_true', default=False, |
| help='Treat Chrome crashes as non-fatal.') |
| parser.add_option('--ignore_experimental_tests', |
| dest='ignore_experimental_tests', |
| action='store_true', default=False, |
| help='If set, experimental test results will not ' |
| 'influence the exit code.') |
| parser.add_option('--just_status_code', |
| dest='just_status_code', |
| action='store_true', default=False, |
| help='Skip generating a report, just return status code.') |
| parser.add_option('--cft', |
| dest='is_cft', |
| action='store_true', default=False, |
| help='If set: will not return 1 on test failure') |
| |
| |
| (options, args) = parser.parse_args() |
| |
| if not args: |
| parser.print_help() |
| Die('no result directories provided') |
| |
| if options.csv and (options.print_debug or options.crash_detection): |
| Warning('Forcing --no-debug --no-crash-detection') |
| options.print_debug = False |
| options.crash_detection = False |
| |
| report_options = ['color', 'csv', 'info', 'escape_error', 'perf', 'attr', |
| 'sort_chron', 'print_debug', 'html', 'html_report_dir'] |
| if options.just_status_code and any( |
| getattr(options, opt) for opt in report_options): |
| Warning('Passed --just_status_code and incompatible options %s' % |
| ' '.join(opt for opt in report_options if getattr(options,opt))) |
| |
| generator = ReportGenerator(options, args) |
| generator.Run() |
| |
| |
| if __name__ == '__main__': |
| main() |