blob: 2ce78a7da401c35fa4b85e10cf072f121bdfd356 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
from dto.flake_swarming_task_output import FlakeSwarmingTaskOutput
from dto.flakiness import Flakiness
from gae_libs.pipelines import GeneratorPipeline
from gae_libs.pipelines import pipeline
from gae_libs.pipelines import SynchronousPipeline
from libs.list_of_basestring import ListOfBasestring
from libs.structured_object import StructuredObject
from pipelines.flake_failure.get_isolate_sha_pipeline import (
GetIsolateShaOutput)
from pipelines.flake_failure.run_flake_swarming_task_pipeline import (
RunFlakeSwarmingTaskInput)
from pipelines.flake_failure.run_flake_swarming_task_pipeline import (
RunFlakeSwarmingTaskPipeline)
from services.flake_failure import flakiness_util
from services.flake_failure import pass_rate_util
from services.flake_failure import run_swarming_util
class AggregateFlakinessInput(StructuredObject):
# The Flakiness object to update.
flakiness_thus_far = Flakiness
# The flake swarming task output with which to update flakiness with.
incoming_swarming_task_output = FlakeSwarmingTaskOutput
class AggregateFlakinessPipeline(SynchronousPipeline):
"""Aggregates new swarming task results into Flakiness."""
input_type = AggregateFlakinessInput
output_type = Flakiness
def RunImpl(self, parameters):
"""Aggregates new swarming task results into Flakines."""
return flakiness_util.UpdateFlakiness(
parameters.flakiness_thus_far, parameters.incoming_swarming_task_output)
class DetermineApproximatePassRateInput(StructuredObject):
# The name of the master used to trigger swarming tasks.
master_name = basestring
# The name of the builder used to trigger swarming tasks.
builder_name = basestring
# A reference build number to trigger swarming tasks.
reference_build_number = int
# The name of the step containing the test to analyze.
step_name = basestring
# The name of the test to analyze.
test_name = basestring
# The commit position being analyzed.
commit_position = int
# The current pass rate information about the test being examined thus far.
flakiness_thus_far = Flakiness
# The isolate sha and other fields corresponding to the commit_position.
get_isolate_sha_output = GetIsolateShaOutput
# The output of the last swarming task that was run to update the current
# flakiness thus far.
previous_swarming_task_output = FlakeSwarmingTaskOutput
# The revision corresponding to the commit position.
revision = basestring
class DetermineApproximatePassRatePipeline(GeneratorPipeline):
"""Determines the true pass rate of a test at a commit position."""
input_type = DetermineApproximatePassRateInput
def RunImpl(self, parameters):
"""Pipeline to find the true pass rate of a test at a commit position."""
master_name = parameters.master_name
builder_name = parameters.builder_name
reference_build_number = parameters.reference_build_number
step_name = parameters.step_name
test_name = parameters.test_name
commit_position = parameters.commit_position
get_isolate_sha_output = parameters.get_isolate_sha_output
build_url = get_isolate_sha_output.build_url
try_job_url = get_isolate_sha_output.try_job_url
flakiness_thus_far = parameters.flakiness_thus_far
previous_swarming_task_output = parameters.previous_swarming_task_output
# Extract pass rate and iterations already-completed up to this point.
if previous_swarming_task_output:
assert flakiness_thus_far, (
'Previous swarming task output not captured properly')
error = previous_swarming_task_output.error
pass_rate_at_commit_position = flakiness_thus_far.pass_rate
previous_pass_count = previous_swarming_task_output.pass_count
previous_iterations = previous_swarming_task_output.iterations
previous_pass_rate = (
float(previous_pass_count / previous_iterations)
if previous_iterations else None)
else:
error = None
pass_rate_at_commit_position = None
previous_iterations = 0
previous_pass_count = 0
previous_pass_rate = None
# Create a fresh Flakiness instance to aggregate swarming rerun data.
flakiness_thus_far = Flakiness(
build_number=get_isolate_sha_output.build_number,
build_url=build_url,
commit_position=commit_position,
total_test_run_seconds=0,
error=None,
failed_swarming_task_attempts=0,
iterations=0,
pass_rate=None,
revision=parameters.revision,
task_ids=ListOfBasestring.FromSerializable([]),
try_job_url=try_job_url)
# Bail out if there were too many errors.
if (error and
flakiness_util.MaximumSwarmingTaskRetriesReached(flakiness_thus_far)):
logging.error(
'Swarming task ended in error after %d failed attempts. Giving '
'up' % flakiness_thus_far.failed_swarming_task_attempts)
flakiness_thus_far.error = error
yield AggregateFlakinessPipeline(
self.CreateInputObjectInstance(
AggregateFlakinessInput,
flakiness_thus_far=flakiness_thus_far,
incoming_swarming_task_output=None))
return
# Move on if the maximum number of iterations has been reached or exceeded.
if flakiness_util.MaximumIterationsReached(flakiness_thus_far):
logging.info(
'Max iterations reached for commit_position %d' % commit_position)
yield AggregateFlakinessPipeline(
self.CreateInputObjectInstance(
AggregateFlakinessInput,
flakiness_thus_far=flakiness_thus_far,
incoming_swarming_task_output=None))
return
# Move on if the test doesn't exist.
if pass_rate_util.TestDoesNotExist(pass_rate_at_commit_position):
logging.info('No test found at commit position %d' % commit_position)
yield AggregateFlakinessPipeline(
self.CreateInputObjectInstance(
AggregateFlakinessInput,
flakiness_thus_far=flakiness_thus_far,
incoming_swarming_task_output=None))
return
# Move on if there is sufficient information about the pass rate.
if pass_rate_util.HasSufficientInformation(
pass_rate_at_commit_position, flakiness_thus_far.iterations,
previous_pass_rate, previous_iterations):
logging.info(
'There is sufficient information for commit position %d with pass '
'rate %s after %d iterations' % (commit_position,
pass_rate_at_commit_position,
flakiness_thus_far.iterations))
yield AggregateFlakinessPipeline(
self.CreateInputObjectInstance(
AggregateFlakinessInput,
flakiness_thus_far=flakiness_thus_far,
incoming_swarming_task_output=None))
return
# Another swarming task is needed. Determine parameters for it to run.
iterations_for_task, time_for_task_seconds = (
run_swarming_util.CalculateRunParametersForSwarmingTask(
flakiness_thus_far, error))
# Run swarming task, update data points with results, and recurse.
with pipeline.InOrder():
swarming_task_output = yield RunFlakeSwarmingTaskPipeline(
self.CreateInputObjectInstance(
RunFlakeSwarmingTaskInput,
master_name=master_name,
builder_name=builder_name,
reference_build_number=reference_build_number,
step_name=step_name,
test_name=test_name,
commit_position=commit_position,
isolate_sha=get_isolate_sha_output.isolate_sha,
iterations=iterations_for_task,
timeout_seconds=time_for_task_seconds))
aggregated_flakiness = yield AggregateFlakinessPipeline(
self.CreateInputObjectInstance(
AggregateFlakinessInput,
flakiness_thus_far=flakiness_thus_far,
incoming_swarming_task_output=swarming_task_output))
yield DetermineApproximatePassRatePipelineWrapper(
self.CreateInputObjectInstance(
DetermineApproximatePassRateInput,
builder_name=parameters.builder_name,
commit_position=commit_position,
flakiness_thus_far=aggregated_flakiness,
get_isolate_sha_output=get_isolate_sha_output,
master_name=parameters.master_name,
previous_swarming_task_output=swarming_task_output,
reference_build_number=parameters.reference_build_number,
revision=parameters.revision,
step_name=parameters.step_name,
test_name=parameters.test_name))
class DetermineApproximatePassRatePipelineWrapper(GeneratorPipeline):
"""A wrapper for DetermineApproximatePassRatePipeline for testability only.
Because DetermineApproximatePassRatePipeline is recursive, in unit tests it
is not possible to mock only the recursive call to validate its input
independently of the original call.
"""
input_type = DetermineApproximatePassRateInput
def RunImpl(self, parameters):
yield DetermineApproximatePassRatePipeline(parameters)