blob: 9f242a929943af79a150ffdf9fd36e9a3d6d3031 [file] [log] [blame]
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
Presubmit checks for the validity of V8-side test specifications in pyl files.
For simplicity, we check all pyl files on any changes in this folder.
"""
import ast
import os
SUPPORTED_BUILDER_SPEC_KEYS = [
'swarming_dimensions',
'swarming_task_attrs',
'tests',
]
# This is not an exhaustive list. It only reflects what we currently use. If
# there's need to specify a different dimension, just add it here.
SUPPORTED_SWARMING_DIMENSIONS = [
'cores',
'cpu',
'os',
]
# This is not an exhaustive list. It only reflects what we currently use. If
# there's need to specify a different property, just add it here.
SUPPORTED_SWARMING_TASK_ATTRS = [
'expiration',
'hard_timeout',
'priority',
]
SUPPORTED_TEST_KEYS = [
'name',
'shards',
'suffix',
'swarming_dimensions',
'swarming_task_attrs',
'test_args',
'variant',
]
def check_keys(error_msg, src_dict, supported_keys):
errors = []
for key in src_dict.keys():
if key not in supported_keys:
errors += error_msg('Key "%s" must be one of %s' % (key, supported_keys))
return errors
def _check_properties(error_msg, src_dict, prop_name, supported_keys):
properties = src_dict.get(prop_name, {})
if not isinstance(properties, dict):
return error_msg('Value for %s must be a dict' % prop_name)
return check_keys(error_msg, properties, supported_keys)
def _check_int_range(error_msg, src_dict, prop_name, lower_bound=None,
upper_bound=None):
if prop_name not in src_dict:
# All properties are optional.
return []
try:
value = int(src_dict[prop_name])
except ValueError:
return error_msg('If specified, %s must be an int' % prop_name)
if lower_bound is not None and value < lower_bound:
return error_msg('If specified, %s must be >=%d' % (prop_name, lower_bound))
if upper_bound is not None and value > upper_bound:
return error_msg('If specified, %s must be <=%d' % (prop_name, upper_bound))
return []
def _check_swarming_task_attrs(error_msg, src_dict):
errors = []
task_attrs = src_dict.get('swarming_task_attrs', {})
errors += _check_int_range(
error_msg, task_attrs, 'priority', lower_bound=25, upper_bound=100)
errors += _check_int_range(
error_msg, task_attrs, 'expiration', lower_bound=1)
errors += _check_int_range(
error_msg, task_attrs, 'hard_timeout', lower_bound=1)
return errors
def _check_swarming_config(error_msg, src_dict):
errors = []
errors += _check_properties(
error_msg, src_dict, 'swarming_dimensions',
SUPPORTED_SWARMING_DIMENSIONS)
errors += _check_properties(
error_msg, src_dict, 'swarming_task_attrs',
SUPPORTED_SWARMING_TASK_ATTRS)
errors += _check_swarming_task_attrs(error_msg, src_dict)
return errors
def _check_test(error_msg, test):
if not isinstance(test, dict):
return error_msg('Each test must be specified with a dict')
errors = check_keys(error_msg, test, SUPPORTED_TEST_KEYS)
if not test.get('name'):
errors += error_msg('A test requires a name')
errors += _check_swarming_config(error_msg, test)
test_args = test.get('test_args', [])
if not isinstance(test_args, list):
errors += error_msg('If specified, test_args must be a list of arguments')
if not all(isinstance(x, basestring) for x in test_args):
errors += error_msg('If specified, all test_args must be strings')
# Limit shards to 10 to avoid erroneous resource exhaustion.
errors += _check_int_range(
error_msg, test, 'shards', lower_bound=1, upper_bound=10)
variant = test.get('variant', 'default')
if not variant or not isinstance(variant, basestring):
errors += error_msg('If specified, variant must be a non-empty string')
return errors
def _check_test_spec(file_path, raw_pyl):
def error_msg(msg):
return ['Error in %s:\n%s' % (file_path, msg)]
try:
# Eval python literal file.
full_test_spec = ast.literal_eval(raw_pyl)
except SyntaxError as e:
return error_msg('Pyl parsing failed with:\n%s' % e)
if not isinstance(full_test_spec, dict):
return error_msg('Test spec must be a dict')
errors = []
for buildername, builder_spec in full_test_spec.iteritems():
def error_msg(msg):
return ['Error in %s for builder %s:\n%s' % (file_path, buildername, msg)]
if not isinstance(buildername, basestring) or not buildername:
errors += error_msg('Buildername must be a non-empty string')
if not isinstance(builder_spec, dict) or not builder_spec:
errors += error_msg('Value must be a non-empty dict')
continue
errors += check_keys(error_msg, builder_spec, SUPPORTED_BUILDER_SPEC_KEYS)
errors += _check_swarming_config(error_msg, builder_spec)
for test in builder_spec.get('tests', []):
errors += _check_test(error_msg, test)
return errors
def CheckChangeOnCommit(input_api, output_api):
def file_filter(regexp):
return lambda f: input_api.FilterSourceFile(f, white_list=(regexp,))
# Calculate which files are affected.
if input_api.AffectedFiles(False, file_filter(r'.*PRESUBMIT\.py')):
# If PRESUBMIT.py itself was changed, check also the test spec.
affected_files = [
os.path.join(input_api.PresubmitLocalPath(), 'builders.pyl'),
]
else:
# Otherwise, check test spec only when changed.
affected_files = [
f.AbsoluteLocalPath()
for f in input_api.AffectedFiles(False, file_filter(r'.*builders\.pyl'))
]
errors = []
for file_path in affected_files:
with open(file_path) as f:
errors += _check_test_spec(file_path, f.read())
return [output_api.PresubmitError(r) for r in errors]