| #!/usr/bin/env python |
| # Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Implements a simple "negative compile" test for C++ on linux. |
| |
| Sometimes a C++ API needs to ensure that various usages cannot compile. To |
| enable unittesting of these assertions, we use this python script to |
| invoke gcc on a source file and assert that compilation fails. |
| |
| For more info, see: |
| http://dev.chromium.org/developers/testing/no-compile-tests |
| """ |
| |
| import ast |
| import locale |
| import os |
| import re |
| import select |
| import shlex |
| import subprocess |
| import sys |
| import time |
| |
| |
| # Matches lines that start with #if and have the substring TEST in the |
| # conditional. Also extracts the comment. This allows us to search for |
| # lines like the following: |
| # |
| # #ifdef NCTEST_NAME_OF_TEST // [r'expected output'] |
| # #if defined(NCTEST_NAME_OF_TEST) // [r'expected output'] |
| # #if NCTEST_NAME_OF_TEST // [r'expected output'] |
| # #elif NCTEST_NAME_OF_TEST // [r'expected output'] |
| # #elif DISABLED_NCTEST_NAME_OF_TEST // [r'expected output'] |
| # |
| # inside the unittest file. |
| NCTEST_CONFIG_RE = re.compile(r'^#(?:el)?if.*\s+(\S*NCTEST\S*)\s*(//.*)?') |
| |
| |
| # Matches and removes the defined() preprocesor predicate. This is useful |
| # for test cases that use the preprocessor if-statement form: |
| # |
| # #if defined(NCTEST_NAME_OF_TEST) |
| # |
| # Should be used to post-process the results found by NCTEST_CONFIG_RE. |
| STRIP_DEFINED_RE = re.compile(r'defined\((.*)\)') |
| |
| |
| # Used to grab the expectation from comment at the end of an #ifdef. See |
| # NCTEST_CONFIG_RE's comment for examples of what the format should look like. |
| # |
| # The extracted substring should be a python array of regular expressions. |
| EXTRACT_EXPECTATION_RE = re.compile(r'//\s*(\[.*\])') |
| |
| |
| # The header for the result file so that it can be compiled. |
| RESULT_FILE_HEADER = """ |
| // This file is generated by the no compile test from: |
| // %s |
| |
| #include "base/logging.h" |
| #include "testing/gtest/include/gtest/gtest.h" |
| |
| """ |
| |
| |
| # The GUnit test function to output on a successful test completion. |
| SUCCESS_GUNIT_TEMPLATE = """ |
| TEST(%s, %s) { |
| LOG(INFO) << "Took %f secs. Started at %f, ended at %f"; |
| } |
| """ |
| |
| # The GUnit test function to output for a disabled test. |
| DISABLED_GUNIT_TEMPLATE = """ |
| TEST(%s, %s) { } |
| """ |
| |
| |
| # Timeout constants. |
| NCTEST_TERMINATE_TIMEOUT_SEC = 60 |
| NCTEST_KILL_TIMEOUT_SEC = NCTEST_TERMINATE_TIMEOUT_SEC + 2 |
| BUSY_LOOP_MAX_TIME_SEC = NCTEST_KILL_TIMEOUT_SEC * 2 |
| |
| |
| def ValidateInput(parallelism, sourcefile_path, cflags, resultfile_path): |
| """Make sure the arguments being passed in are sane.""" |
| assert parallelism >= 1 |
| assert type(sourcefile_path) is str |
| assert type(cflags) is str |
| assert type(resultfile_path) is str |
| |
| |
| def ParseExpectation(expectation_string): |
| """Extracts expectation definition from the trailing comment on the ifdef. |
| |
| See the comment on NCTEST_CONFIG_RE for examples of the format we are parsing. |
| |
| Args: |
| expectation_string: A string like "// [r'some_regex']" |
| |
| Returns: |
| A list of compiled regular expressions indicating all possible valid |
| compiler outputs. If the list is empty, all outputs are considered valid. |
| """ |
| assert expectation_string is not None |
| |
| match = EXTRACT_EXPECTATION_RE.match(expectation_string) |
| assert match |
| |
| raw_expectation = ast.literal_eval(match.group(1)) |
| assert type(raw_expectation) is list |
| |
| expectation = [] |
| for regex_str in raw_expectation: |
| assert type(regex_str) is str |
| expectation.append(re.compile(regex_str)) |
| return expectation |
| |
| |
| def ExtractTestConfigs(sourcefile_path): |
| """Parses the soruce file for test configurations. |
| |
| Each no-compile test in the file is separated by an ifdef macro. We scan |
| the source file with the NCTEST_CONFIG_RE to find all ifdefs that look like |
| they demark one no-compile test and try to extract the test configuration |
| from that. |
| |
| Args: |
| sourcefile_path: The path to the source file. |
| |
| Returns: |
| A list of test configurations. Each test configuration is a dictionary of |
| the form: |
| |
| { name: 'NCTEST_NAME' |
| suite_name: 'SOURCE_FILE_NAME' |
| expectations: [re.Pattern, re.Pattern] } |
| |
| The |suite_name| is used to generate a pretty gtest output on successful |
| completion of the no compile test. |
| |
| The compiled regexps in |expectations| define the valid outputs of the |
| compiler. If any one of the listed patterns matches either the stderr or |
| stdout from the compilation, and the compilation failed, then the test is |
| considered to have succeeded. If the list is empty, than we ignore the |
| compiler output and just check for failed compilation. If |expectations| |
| is actually None, then this specifies a compiler sanity check test, which |
| should expect a SUCCESSFUL compilation. |
| """ |
| sourcefile = open(sourcefile_path, 'r') |
| |
| # Convert filename from underscores to CamelCase. |
| words = os.path.splitext(os.path.basename(sourcefile_path))[0].split('_') |
| words = [w.capitalize() for w in words] |
| suite_name = 'NoCompile' + ''.join(words) |
| |
| # Start with at least the compiler sanity test. You need to always have one |
| # sanity test to show that compiler flags and configuration are not just |
| # wrong. Otherwise, having a misconfigured compiler, or an error in the |
| # shared portions of the .nc file would cause all tests to erroneously pass. |
| test_configs = [{'name': 'NCTEST_SANITY', |
| 'suite_name': suite_name, |
| 'expectations': None}] |
| |
| for line in sourcefile: |
| match_result = NCTEST_CONFIG_RE.match(line) |
| if not match_result: |
| continue |
| |
| groups = match_result.groups() |
| |
| # Grab the name and remove the defined() predicate if there is one. |
| name = groups[0] |
| strip_result = STRIP_DEFINED_RE.match(name) |
| if strip_result: |
| name = strip_result.group(1) |
| |
| # Read expectations if there are any. |
| test_configs.append({'name': name, |
| 'suite_name': suite_name, |
| 'expectations': ParseExpectation(groups[1])}) |
| sourcefile.close() |
| return test_configs |
| |
| |
| def StartTest(sourcefile_path, cflags, config): |
| """Start one negative compile test. |
| |
| Args: |
| sourcefile_path: The path to the source file. |
| cflags: A string with all the CFLAGS to give to gcc. This string will be |
| split by shelex so be careful with escaping. |
| config: A dictionary describing the test. See ExtractTestConfigs |
| for a description of the config format. |
| |
| Returns: |
| A dictionary containing all the information about the started test. The |
| fields in the dictionary are as follows: |
| { 'proc': A subprocess object representing the compiler run. |
| 'cmdline': The exectued command line. |
| 'name': The name of the test. |
| 'suite_name': The suite name to use when generating the gunit test |
| result. |
| 'terminate_timeout': The timestamp in seconds since the epoch after |
| which the test should be terminated. |
| 'kill_timeout': The timestamp in seconds since the epoch after which |
| the test should be given a hard kill signal. |
| 'started_at': A timestamp in seconds since the epoch for when this test |
| was started. |
| 'aborted_at': A timestamp in seconds since the epoch for when this test |
| was aborted. If the test completed successfully, |
| this value is 0. |
| 'finished_at': A timestamp in seconds since the epoch for when this |
| test was successfully complete. If the test is aborted, |
| or running, this value is 0. |
| 'expectations': A dictionary with the test expectations. See |
| ParseExpectation() for the structure. |
| } |
| """ |
| # TODO(ajwong): Get the compiler from gyp. |
| cmdline = ['g++'] |
| cmdline.extend(shlex.split(cflags)) |
| name = config['name'] |
| expectations = config['expectations'] |
| if expectations is not None: |
| cmdline.append('-D%s' % name) |
| cmdline.extend(['-o', '/dev/null', '-c', '-x', 'c++', sourcefile_path]) |
| |
| process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| now = time.time() |
| return {'proc': process, |
| 'cmdline': ' '.join(cmdline), |
| 'name': name, |
| 'suite_name': config['suite_name'], |
| 'terminate_timeout': now + NCTEST_TERMINATE_TIMEOUT_SEC, |
| 'kill_timeout': now + NCTEST_KILL_TIMEOUT_SEC, |
| 'started_at': now, |
| 'aborted_at': 0, |
| 'finished_at': 0, |
| 'expectations': expectations} |
| |
| |
| def PassTest(resultfile, test): |
| """Logs the result of a test started by StartTest(), or a disabled test |
| configuration. |
| |
| Args: |
| resultfile: File object for .cc file that results are written to. |
| test: An instance of the dictionary returned by StartTest(), a |
| configuration from ExtractTestConfigs(). |
| """ |
| # The 'started_at' key is only added if a test has been started. |
| if 'started_at' in test: |
| resultfile.write(SUCCESS_GUNIT_TEMPLATE % ( |
| test['suite_name'], test['name'], |
| test['finished_at'] - test['started_at'], |
| test['started_at'], test['finished_at'])) |
| else: |
| resultfile.write(DISABLED_GUNIT_TEMPLATE % ( |
| test['suite_name'], test['name'])) |
| |
| |
| def FailTest(resultfile, test, error, stdout=None, stderr=None): |
| """Logs the result of a test started by StartTest() |
| |
| Args: |
| resultfile: File object for .cc file that results are written to. |
| test: An instance of the dictionary returned by StartTest() |
| error: The printable reason for the failure. |
| stdout: The test's output to stdout. |
| stderr: The test's output to stderr. |
| """ |
| resultfile.write('#error "%s Failed: %s"\n' % (test['name'], error)) |
| resultfile.write('#error "compile line: %s"\n' % test['cmdline']) |
| if stdout and len(stdout) != 0: |
| resultfile.write('#error "%s stdout:"\n' % test['name']) |
| for line in stdout.split('\n'): |
| resultfile.write('#error " %s:"\n' % line) |
| |
| if stderr and len(stderr) != 0: |
| resultfile.write('#error "%s stderr:"\n' % test['name']) |
| for line in stderr.split('\n'): |
| resultfile.write('#error " %s"\n' % line) |
| resultfile.write('\n') |
| |
| |
| def WriteStats(resultfile, suite_name, timings): |
| """Logs the peformance timings for each stage of the script into a fake test. |
| |
| Args: |
| resultfile: File object for .cc file that results are written to. |
| suite_name: The name of the GUnit suite this test belongs to. |
| timings: Dictionary with timestamps for each stage of the script run. |
| """ |
| stats_template = ("Started %f, Ended %f, Total %fs, Extract %fs, " |
| "Compile %fs, Process %fs") |
| total_secs = timings['results_processed'] - timings['started'] |
| extract_secs = timings['extract_done'] - timings['started'] |
| compile_secs = timings['compile_done'] - timings['extract_done'] |
| process_secs = timings['results_processed'] - timings['compile_done'] |
| resultfile.write('TEST(%s, Stats) { LOG(INFO) << "%s"; }\n' % ( |
| suite_name, stats_template % ( |
| timings['started'], timings['results_processed'], total_secs, |
| extract_secs, compile_secs, process_secs))) |
| |
| |
| def ProcessTestResult(resultfile, test): |
| """Interprets and logs the result of a test started by StartTest() |
| |
| Args: |
| resultfile: File object for .cc file that results are written to. |
| test: The dictionary from StartTest() to process. |
| """ |
| # Snap a copy of stdout and stderr into the test dictionary immediately |
| # cause we can only call this once on the Popen object, and lots of stuff |
| # below will want access to it. |
| proc = test['proc'] |
| (stdout, stderr) = proc.communicate() |
| |
| if test['aborted_at'] != 0: |
| FailTest(resultfile, test, "Compile timed out. Started %f ended %f." % |
| (test['started_at'], test['aborted_at'])) |
| return |
| |
| if test['expectations'] is None: |
| # This signals a compiler sanity check test. Fail iff compilation failed. |
| if proc.poll() == 0: |
| PassTest(resultfile, test) |
| return |
| else: |
| FailTest(resultfile, test, 'Sanity compile failed. Is compiler borked?', |
| stdout, stderr) |
| return |
| elif proc.poll() == 0: |
| # Handle failure due to successful compile. |
| FailTest(resultfile, test, |
| 'Unexpected successful compilation.', |
| stdout, stderr) |
| return |
| else: |
| # Check the output has the right expectations. If there are no |
| # expectations, then we just consider the output "matched" by default. |
| if len(test['expectations']) == 0: |
| PassTest(resultfile, test) |
| return |
| |
| # Otherwise test against all expectations. |
| for regexp in test['expectations']: |
| if (regexp.search(stdout) is not None or |
| regexp.search(stderr) is not None): |
| PassTest(resultfile, test) |
| return |
| expectation_str = ', '.join( |
| ["r'%s'" % regexp.pattern for regexp in test['expectations']]) |
| FailTest(resultfile, test, |
| 'Expectations [%s] did not match output.' % expectation_str, |
| stdout, stderr) |
| return |
| |
| |
| def CompleteAtLeastOneTest(resultfile, executing_tests): |
| """Blocks until at least one task is removed from executing_tests. |
| |
| This function removes completed tests from executing_tests, logging failures |
| and output. If no tests can be removed, it will enter a poll-loop until one |
| test finishes or times out. On a timeout, this function is responsible for |
| terminating the process in the appropriate fashion. |
| |
| Args: |
| executing_tests: A dict mapping a string containing the test name to the |
| test dict return from StartTest(). |
| |
| Returns: |
| A list of tests that have finished. |
| """ |
| finished_tests = [] |
| busy_loop_timeout = time.time() + BUSY_LOOP_MAX_TIME_SEC |
| while len(finished_tests) == 0: |
| # If we don't make progress for too long, assume the code is just dead. |
| assert busy_loop_timeout > time.time() |
| |
| # Select on the output pipes. |
| read_set = [] |
| for test in executing_tests.values(): |
| read_set.extend([test['proc'].stderr, test['proc'].stdout]) |
| result = select.select(read_set, [], read_set, NCTEST_TERMINATE_TIMEOUT_SEC) |
| |
| # Now attempt to process results. |
| now = time.time() |
| for test in executing_tests.values(): |
| proc = test['proc'] |
| if proc.poll() is not None: |
| test['finished_at'] = now |
| finished_tests.append(test) |
| elif test['terminate_timeout'] < now: |
| proc.terminate() |
| test['aborted_at'] = now |
| elif test['kill_timeout'] < now: |
| proc.kill() |
| test['aborted_at'] = now |
| |
| for test in finished_tests: |
| del executing_tests[test['name']] |
| return finished_tests |
| |
| |
| def main(): |
| if len(sys.argv) != 5: |
| print ('Usage: %s <parallelism> <sourcefile> <cflags> <resultfile>' % |
| sys.argv[0]) |
| sys.exit(1) |
| |
| # Force us into the "C" locale so the compiler doesn't localize its output. |
| # In particular, this stops gcc from using smart quotes when in english UTF-8 |
| # locales. This makes the expectation writing much easier. |
| os.environ['LC_ALL'] = 'C' |
| |
| parallelism = int(sys.argv[1]) |
| sourcefile_path = sys.argv[2] |
| cflags = sys.argv[3] |
| resultfile_path = sys.argv[4] |
| |
| timings = {'started': time.time()} |
| |
| ValidateInput(parallelism, sourcefile_path, cflags, resultfile_path) |
| |
| test_configs = ExtractTestConfigs(sourcefile_path) |
| timings['extract_done'] = time.time() |
| |
| resultfile = open(resultfile_path, 'w') |
| resultfile.write(RESULT_FILE_HEADER % sourcefile_path) |
| |
| # Run the no-compile tests, but ensure we do not run more than |parallelism| |
| # tests at once. |
| timings['header_written'] = time.time() |
| executing_tests = {} |
| finished_tests = [] |
| for config in test_configs: |
| # CompleteAtLeastOneTest blocks until at least one test finishes. Thus, this |
| # acts as a semaphore. We cannot use threads + a real semaphore because |
| # subprocess forks, which can cause all sorts of hilarity with threads. |
| if len(executing_tests) >= parallelism: |
| finished_tests.extend(CompleteAtLeastOneTest(resultfile, executing_tests)) |
| |
| if config['name'].startswith('DISABLED_'): |
| PassTest(resultfile, config) |
| else: |
| test = StartTest(sourcefile_path, cflags, config) |
| assert test['name'] not in executing_tests |
| executing_tests[test['name']] = test |
| |
| # If there are no more test to start, we still need to drain the running |
| # ones. |
| while len(executing_tests) > 0: |
| finished_tests.extend(CompleteAtLeastOneTest(resultfile, executing_tests)) |
| timings['compile_done'] = time.time() |
| |
| for test in finished_tests: |
| ProcessTestResult(resultfile, test) |
| timings['results_processed'] = time.time() |
| |
| # We always know at least a sanity test was run. |
| WriteStats(resultfile, finished_tests[0]['suite_name'], timings) |
| |
| resultfile.close() |
| |
| |
| if __name__ == '__main__': |
| main() |