blob: 18d83a48373a38adb88896dc51a60f6231932978 [file]
# Copyright 2019 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import functools
import uuid
import pytest
from google.protobuf import empty_pb2
from google.protobuf import json_format
from chromeperf.engine import combinators
from chromeperf.engine import evaluator
from chromeperf.engine import event as event_module
from chromeperf.engine import predicates
from chromeperf.pinpoint import test_runner_payload_pb2
from chromeperf.pinpoint import change_pb2
from chromeperf.pinpoint.evaluators import isolate_finder
from chromeperf.pinpoint.evaluators import test_runner
from chromeperf.pinpoint.models import change as change_module
from chromeperf.pinpoint.models import task as task_module
from . import test_utils # pylint: disable=relative-beyond-top-level
from . import bisection_test_util # pylint: disable=relative-beyond-top-level
DIMENSIONS = [
test_runner.TaskOptions.Dimension(
key='pool', value='Chrome-perf-pinpoint'),
test_runner.TaskOptions.Dimension(key='key', value='value'),
]
@pytest.fixture
def swarming_tasks_new(mocker):
return mocker.patch('chromeperf.services.swarming.Tasks.New')
@pytest.fixture
def swarming_task_result(mocker):
return mocker.patch('chromeperf.services.swarming.Task.Result')
@pytest.fixture
def swarming_task_stdout(mocker):
return mocker.patch('chromeperf.services.swarming.Task.Stdout')
@pytest.fixture
def populated_job(datastore_client, request):
job = test_utils.MockJob(datastore_client.key('Job', str(uuid.uuid4())))
git_hash = request.node.get_closest_marker('git_hash')
if not git_hash:
git_hash = '7c7e90be'
else:
git_hash = git_hash.args[0]
task_module.populate_task_graph(
datastore_client, job,
test_runner.create_graph(
test_runner.TaskOptions(
build_options=isolate_finder.TaskOptions(
builder='Some Builder',
target='telemetry_perf_tests',
bucket='luci.bucket',
change=change_module.reconstitute_change(
datastore_client,
{
'commits': [{
'repository': 'chromium',
'git_hash': 'aaaaaaa',
}]
},
)),
swarming_server='some_server',
dimensions=DIMENSIONS,
extra_args=[],
attempts=10,
)))
return job
def test_TestRunner_EvaluateToCompletion(
datastore_client,
swarming_task_result,
swarming_tasks_new,
populated_job,
):
swarming_tasks_new.return_value = {'task_id': 'task id'}
test_evaluator = combinators.SequenceEvaluator(evaluators=(
combinators.FilteringEvaluator(
predicate=predicates.TaskTypeEq('find_isolate'),
delegate=combinators.SequenceEvaluator(evaluators=(
bisection_test_util.FakeFoundIsolate(
datastore_client,
populated_job,
),
combinators.TaskPayloadLiftingEvaluator(),
))),
test_runner.Evaluator(populated_job, datastore_client),
))
test_event = event_module.build_event(type='initiate',
target_task=None,
payload=empty_pb2.Empty())
context = evaluator.evaluate_graph(
test_event,
test_evaluator,
task_module.task_graph_loader(datastore_client, populated_job),
)
assert set(task.state
for task in context.values()) == set(['ongoing', 'completed'])
# Ensure that we've found all the 'test_runner' tasks.
select_context = evaluator.evaluate_graph(
event_module.select_event(),
combinators.Selector(task_type='run_test'),
task_module.task_graph_loader(datastore_client, populated_job))
# Check that the output in the payload is populated as we expect.
for task_id, task_context in select_context.items():
task_payload = test_runner_payload_pb2.TestRunnerPayload()
assert task_context.payload.Unpack(task_payload)
assert task_id == f'run_test_chromium@aaaaaaa_{task_payload.index}'
assert task_context.state == 'ongoing'
# FIXME: Check more pertinent information here.
assert json_format.MessageToDict(task_payload.output) != {}
# Ensure that we've actually made the calls to the Swarming service.
swarming_tasks_new.assert_called()
assert swarming_tasks_new.call_count == 10
# Then we propagate an event for each of the test_runner tasks in the graph.
swarming_task_result.return_value = {
'bot_id': 'bot id',
'exit_code': 0,
'failure': False,
'outputs_ref': {
'isolatedserver': 'output isolate server',
'isolated': 'output isolate hash',
},
'state': 'COMPLETED',
}
for attempt in range(10):
task_id = f'run_test_chromium@aaaaaaa_{attempt}'
context = evaluator.evaluate_graph(
event_module.build_event(payload=empty_pb2.Empty(),
type='update',
target_task=task_id), test_evaluator,
task_module.task_graph_loader(datastore_client, populated_job))
assert task_id in context
assert context[task_id].state == 'completed'
# Ensure that we've polled the status of each of the tasks, and that we've
# marked the tasks completed.
select_context = evaluator.evaluate_graph(
event_module.select_event(),
combinators.Selector(task_type='run_test'),
task_module.task_graph_loader(datastore_client, populated_job))
for attempt in range(10):
task_id = f'run_test_chromium@aaaaaaa_{attempt}'
task_context = select_context[task_id]
task_payload = test_runner_payload_pb2.TestRunnerPayload()
assert task_context.payload.Unpack(task_payload)
assert task_id == f'run_test_chromium@aaaaaaa_{task_payload.index}'
assert task_context.state == 'completed'
# FIXME: Check more pertinent information here.
assert json_format.MessageToDict(task_payload.output) != {}
assert (task_payload.output.task_output.isolate_server ==
'output isolate server')
assert (task_payload.output.task_output.isolate_hash ==
'output isolate hash')
# Ensure that we've actually made the calls to the Swarming service.
swarming_task_result.assert_called()
assert swarming_task_result.call_count == 10
def test_TestRunner_EvaluateFailedDependency(
datastore_client,
swarming_task_result,
swarming_tasks_new,
populated_job,
):
test_evaluator = combinators.SequenceEvaluator(evaluators=(
combinators.FilteringEvaluator(
predicate=predicates.TaskTypeEq('find_isolate'),
delegate=combinators.SequenceEvaluator(evaluators=(
bisection_test_util.FakeFindIsolateFailed(
datastore_client=datastore_client,
job=populated_job,
),
combinators.TaskPayloadLiftingEvaluator(),
))),
test_runner.Evaluator(populated_job, datastore_client),
))
# When we initiate the test_runner tasks, we should immediately see the
# tasks failing because the dependency has a hard failure status.
context = evaluator.evaluate_graph(
event_module.build_event(payload=empty_pb2.Empty(),
type='initiate',
target_task=None), test_evaluator,
task_module.task_graph_loader(datastore_client, populated_job))
for attempt in range(10):
task_id = f'run_test_chromium@aaaaaaa_{attempt}'
assert task_id in context
task_context = context[task_id]
assert task_context.state == 'failed'
task_payload = test_runner_payload_pb2.TestRunnerPayload()
assert task_context.payload.Unpack(task_payload)
assert len(task_payload.errors) == 1
def test_TestRunner_EvaluatePendingDependency(datastore_client, populated_job):
# Ensure that tasks stay pending in the event of an update.
context = evaluator.evaluate_graph(
event_module.build_event(
payload=empty_pb2.Empty(),
type='update',
target_task=None,
),
test_runner.Evaluator(
datastore_client=datastore_client,
job=populated_job,
),
task_module.task_graph_loader(
datastore_client=datastore_client,
job=populated_job,
),
)
assert 'find_isolate_chromium@aaaaaaa' in context
assert context['find_isolate_chromium@aaaaaaa'].state == 'pending'
for attempt in range(10):
task_id = f'run_test_chromium@aaaaaaa_{attempt}'
assert task_id in context
assert context[task_id].state == 'pending'
def test_TestRunner_EvaluateHandleFailures_Hard(
datastore_client,
swarming_task_stdout,
swarming_task_result,
swarming_tasks_new,
populated_job,
):
swarming_tasks_new.return_value = {'task_id': 'task id'}
test_evaluator = combinators.SequenceEvaluator(evaluators=(
combinators.FilteringEvaluator(
predicate=predicates.TaskTypeEq('find_isolate'),
delegate=combinators.SequenceEvaluator(evaluators=(
bisection_test_util.FakeFoundIsolate(
datastore_client=datastore_client,
job=populated_job,
),
combinators.TaskPayloadLiftingEvaluator(),
))),
test_runner.Evaluator(
datastore_client=datastore_client,
job=populated_job,
),
))
context = evaluator.evaluate_graph(
event_module.build_event(payload=empty_pb2.Empty(),
type='initiate',
target_task=None), test_evaluator,
task_module.task_graph_loader(datastore_client=datastore_client,
job=populated_job))
assert context != {}
# We set it up so that when we poll the swarming task, that we're going to
# get an error status. We're expecting that hard failures are detected.
swarming_task_stdout.return_value = {
'output':
"""Traceback (most recent call last):
File "../../testing/scripts/run_performance_tests.py", line 282, in <module>
sys.exit(main())
File "../../testing/scripts/run_performance_tests.py", line 226, in main
benchmarks = args.benchmark_names.split(',')
AttributeError: 'Namespace' object has no attribute 'benchmark_names'"""
}
swarming_task_result.return_value = {
'bot_id': 'bot id',
'exit_code': 1,
'failure': True,
'outputs_ref': {
'isolatedserver': 'output isolate server',
'isolated': 'output isolate hash',
},
'state': 'COMPLETED',
}
for attempt in range(10):
task_id = f'run_test_chromium@aaaaaaa_{attempt}'
context = evaluator.evaluate_graph(
event_module.build_event(payload=empty_pb2.Empty(),
type='update',
target_task=task_id),
test_evaluator,
task_module.task_graph_loader(
datastore_client=datastore_client,
job=populated_job,
),
)
assert context != {}
assert task_id in context
select_context = evaluator.evaluate_graph(
event_module.build_event(payload=empty_pb2.Empty(),
type='select',
target_task=task_id),
combinators.Selector(task_type='run_test'),
task_module.task_graph_loader(
datastore_client=datastore_client,
job=populated_job,
))
assert select_context != {}
task_context = select_context[task_id]
task_payload = test_runner_payload_pb2.TestRunnerPayload()
task_context.payload.Unpack(task_payload)
assert task_context.state == 'failed'
assert len(task_payload.errors) == 1
def test_TestRunner_EvaluateHandleFailures_Expired(
datastore_client,
swarming_task_stdout,
swarming_task_result,
swarming_tasks_new,
populated_job,
):
swarming_tasks_new.return_value = {'task_id': 'task id'}
test_evaluator = combinators.SequenceEvaluator(evaluators=(
combinators.FilteringEvaluator(
predicate=predicates.TaskTypeEq('find_isolate'),
delegate=combinators.SequenceEvaluator(evaluators=(
bisection_test_util.FakeFoundIsolate(
datastore_client=datastore_client, job=populated_job),
combinators.TaskPayloadLiftingEvaluator(),
))),
test_runner.Evaluator(datastore_client=datastore_client,
job=populated_job),
))
context = evaluator.evaluate_graph(
event_module.build_event(payload=empty_pb2.Empty(),
type='initiate',
target_task=None), test_evaluator,
task_module.task_graph_loader(datastore_client=datastore_client,
job=populated_job))
assert context != {}
swarming_task_result.return_value = {
'state': 'EXPIRED',
}
for attempt in range(10):
task_id = f'run_test_chromium@aaaaaaa_{attempt}'
context = evaluator.evaluate_graph(
event_module.build_event(
payload=empty_pb2.Empty(),
type='update',
target_task=task_id,
),
test_evaluator,
task_module.task_graph_loader(datastore_client=datastore_client,
job=populated_job),
)
assert context != {}
select_context = evaluator.evaluate_graph(
event_module.select_event(target_task=task_id),
combinators.Selector(task_type='run_test'),
task_module.task_graph_loader(datastore_client=datastore_client,
job=populated_job),
)
assert select_context != {}
assert task_id in select_context
task_context = context[task_id]
task_payload = test_runner_payload_pb2.TestRunnerPayload()
assert task_context.payload.Unpack(task_payload)
assert task_context.state == 'failed'
assert len(task_payload.errors) == 1
@pytest.mark.skip('Deferring implementation pending design.')
def test_TestRunner_EvaluateHandleFailures_Retry():
pass