blob: e3923a3331319028bbc848034d6d19de97fe8d95 [file] [log] [blame]
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
from google.appengine.ext import ndb
from common import monitoring
from dto.int_range import IntRange
from dto.step_metadata import StepMetadata
from gae_libs.pipelines import GeneratorPipeline
from gae_libs.pipelines import pipeline
from libs import analysis_status
from libs import time_util
from libs.list_of_basestring import ListOfBasestring
from libs.structured_object import StructuredObject
from model import entity_util
from model import result_status
from model.base_build_model import BaseBuildModel
from pipelines.delay_pipeline import DelayPipeline
from pipelines.flake_failure.analyze_recent_flakiness_pipeline import (
AnalyzeRecentFlakinessInput)
from pipelines.flake_failure.analyze_recent_flakiness_pipeline import (
AnalyzeRecentFlakinessPipeline)
from pipelines.flake_failure.create_and_submit_revert_pipeline import (
CreateAndSubmitRevertInput)
from pipelines.flake_failure.create_and_submit_revert_pipeline import (
CreateAndSubmitRevertPipeline)
from pipelines.flake_failure.determine_approximate_pass_rate_pipeline import (
DetermineApproximatePassRateInput)
from pipelines.flake_failure.determine_approximate_pass_rate_pipeline import (
DetermineApproximatePassRatePipeline)
from pipelines.flake_failure.get_isolate_sha_pipeline import (
GetIsolateShaForCommitPositionParameters)
from pipelines.flake_failure.get_isolate_sha_pipeline import (
GetIsolateShaForCommitPositionPipeline)
from pipelines.flake_failure.next_commit_position_pipeline import (
NextCommitPositionInput)
from pipelines.flake_failure.next_commit_position_pipeline import (
NextCommitPositionOutput)
from pipelines.flake_failure.next_commit_position_pipeline import (
NextCommitPositionPipeline)
from pipelines.flake_failure.notify_culprit_pipeline import NotifyCulpritInput
from pipelines.flake_failure.notify_culprit_pipeline import (
NotifyCulpritPipeline)
from pipelines.flake_failure.update_flake_analysis_data_points_pipeline import (
UpdateFlakeAnalysisDataPointsInput)
from pipelines.flake_failure.update_flake_analysis_data_points_pipeline import (
UpdateFlakeAnalysisDataPointsPipeline)
from pipelines.report_event_pipeline import ReportAnalysisEventPipeline
from pipelines.report_event_pipeline import ReportEventInput
from services.flake_failure import confidence_score_util
from services.actions import flake_analysis_actions
from services.flake_failure import flake_analysis_util
class _PerformAutoActionsInput(StructuredObject):
analysis_urlsafe_key = basestring
class _PerformAutoActionsPipeline(GeneratorPipeline):
"""Temporary pipeline to execute auto actions.
TODO(crbug.com/893787): All auto-action pipelines should be refactored into
the auto action layer, which already accounts for bug merging by culprit.
Eventually auto revert, notifying culprit, etc. should all live inside there
as well, and their outer pipelines deprecated, including this one.
"""
input_type = _PerformAutoActionsInput
def RunImpl(self, parameters):
analysis_urlsafe_key = parameters.analysis_urlsafe_key
analysis = entity_util.GetEntityFromUrlsafeKey(analysis_urlsafe_key)
assert analysis, 'Cannot retrieve analysis entry from datastore {}'.format(
analysis_urlsafe_key)
# Determine if flakiness is still persistent.
still_flaky = flake_analysis_util.FlakyAtMostRecentlyAnalyzedCommit(
analysis)
# TODO(crbug.com/905754): Call auto actions as an async taskqueue task.
flake_analysis_actions.OnCulpritIdentified(analysis_urlsafe_key)
if not still_flaky: # pragma: no cover.
analysis.LogInfo(
'No further actions taken due to latest commit being stable.')
return
# Data needed for reverts.
build_key = BaseBuildModel.CreateBuildKey(analysis.original_master_name,
analysis.original_builder_name,
analysis.original_build_number)
with pipeline.InOrder():
# Revert culprit if applicable.
yield CreateAndSubmitRevertPipeline(
self.CreateInputObjectInstance(
CreateAndSubmitRevertInput,
analysis_urlsafe_key=analysis.key.urlsafe(),
build_key=build_key))
# Update culprit code review.
yield NotifyCulpritPipeline(
self.CreateInputObjectInstance(
NotifyCulpritInput, analysis_urlsafe_key=analysis_urlsafe_key))
class AnalyzeFlakeInput(StructuredObject):
# The urlsafe key to the MasterFlakeAnalysis in progress.
analysis_urlsafe_key = basestring
# Information on the exact commit position to analyze.
analyze_commit_position_parameters = NextCommitPositionOutput
# The lower/upper bound commit positions not to exceed.
commit_position_range = IntRange
# Dimensions of the bot that will be used to trigger try jobs.
dimensions = ListOfBasestring
# A flag indicating this pipeline was triggered by a human request.
manually_triggered = bool
# Whether this is an admin-triggered rerun of an existing analysis. Not to be
# confused with manually_triggered, which can be a manual request via Findit's
# homepage.
rerun = bool
# The number of times bots have been checked for availability.
retries = int
# Information about the test used to find bots that will run swarming tasks.
step_metadata = StepMetadata
class AnalyzeFlakePipeline(GeneratorPipeline):
"""The main driving pipeline for flake analysis."""
input_type = AnalyzeFlakeInput
def OnFinalized(self, parameters):
if not self.IsRootPipeline():
# AnalyzeFlakePipeline is recursive. Only the root pipeline should update.
return
analysis_urlsafe_key = parameters.analysis_urlsafe_key
analysis = ndb.Key(urlsafe=analysis_urlsafe_key).get()
assert analysis, 'Cannot retrieve analysis entry from datastore'
# Get the analysis' already-detected error, if any.
error = analysis.error
if self.was_aborted:
error = analysis.GetError() # Capture any undetected error.
monitoring.aborted_pipelines.increment({'type': 'flake'})
status = analysis_status.ERROR if error else analysis_status.COMPLETED
analysis.Update(error=error, end_time=time_util.GetUTCNow(), status=status)
# TODO(crbug.com/847644): If error is set, report to ts_mon.
# Monitor completion of pipeline.
monitoring.completed_pipelines.increment({'type': 'flake'})
def RunImpl(self, parameters):
analysis_urlsafe_key = parameters.analysis_urlsafe_key
analysis = ndb.Key(urlsafe=analysis_urlsafe_key).get()
assert analysis, 'Cannot retrieve analysis entry from datastore'
if analysis.request_time:
monitoring.pipeline_times.increment_by(
int((time_util.GetUTCNow() - analysis.request_time).total_seconds()),
{'type': 'flake'})
commit_position_parameters = parameters.analyze_commit_position_parameters
commit_position_to_analyze = (
commit_position_parameters.next_commit_id.commit_position
if commit_position_parameters.next_commit_id else None)
if commit_position_to_analyze is None:
# No further commit position to analyze. The analysis is completed.
culprit_commit_position = (
commit_position_parameters.culprit_commit_id.commit_position
if commit_position_parameters.culprit_commit_id else None)
if culprit_commit_position is None:
analysis.LogInfo('Analysis completed with no findings')
analysis.Update(result_status=result_status.NOT_FOUND_UNTRIAGED)
if not parameters.rerun: # pragma: no branch
# Don't double report for reruns.
yield ReportAnalysisEventPipeline(
self.CreateInputObjectInstance(
ReportEventInput, analysis_urlsafe_key=analysis_urlsafe_key))
return
# Create a FlakeCulprit.
culprit_revision = commit_position_parameters.culprit_commit_id.revision
assert culprit_revision, 'No revision for commit {}'.format(
culprit_commit_position)
culprit = flake_analysis_util.UpdateCulprit(
analysis_urlsafe_key, culprit_revision, culprit_commit_position)
confidence_score = confidence_score_util.CalculateCulpritConfidenceScore(
analysis, culprit_commit_position)
# Associate FlakeCulprit with the analysis.
analysis.Update(
confidence_in_culprit=confidence_score,
culprit_urlsafe_key=culprit.key.urlsafe(),
result_status=result_status.FOUND_UNTRIAGED)
with pipeline.InOrder():
if flake_analysis_util.ShouldTakeAutoAction(
analysis, parameters.rerun): # pragma: no branch
# Check recent flakiness.
yield AnalyzeRecentFlakinessPipeline(
self.CreateInputObjectInstance(
AnalyzeRecentFlakinessInput,
analysis_urlsafe_key=analysis_urlsafe_key))
# Perform auto actions after checking recent flakiness.
yield _PerformAutoActionsPipeline(
self.CreateInputObjectInstance(
_PerformAutoActionsInput,
analysis_urlsafe_key=analysis_urlsafe_key))
if not parameters.rerun: # pragma: no branch
# Report events to BQ.
yield ReportAnalysisEventPipeline(
self.CreateInputObjectInstance(
ReportEventInput, analysis_urlsafe_key=analysis_urlsafe_key))
return
revision_to_analyze = commit_position_parameters.next_commit_id.revision
assert revision_to_analyze, 'No revision for commit {}'.format(
commit_position_to_analyze)
# Check for bot availability. If this is a user rerun or the maximum retries
# have been reached, continue regardless of bot availability.
if flake_analysis_util.CanStartAnalysisImmediately(
parameters.step_metadata, parameters.retries,
parameters.manually_triggered):
# Set analysis status to RUNNING if not already.
analysis.InitializeRunning()
analysis.LogInfo(
'Analyzing commit position {}'.format(commit_position_to_analyze))
with pipeline.InOrder():
# Determine isolate sha to run swarming tasks on.
upper_bound_build_number = analysis.GetLowestUpperBoundBuildNumber(
commit_position_to_analyze)
get_sha_output = yield GetIsolateShaForCommitPositionPipeline(
self.CreateInputObjectInstance(
GetIsolateShaForCommitPositionParameters,
analysis_urlsafe_key=analysis_urlsafe_key,
commit_position=commit_position_to_analyze,
dimensions=parameters.dimensions,
step_metadata=parameters.step_metadata,
revision=revision_to_analyze,
upper_bound_build_number=upper_bound_build_number))
# Determine approximate pass rate at the commit position/isolate sha.
flakiness = yield DetermineApproximatePassRatePipeline(
self.CreateInputObjectInstance(
DetermineApproximatePassRateInput,
builder_name=analysis.builder_name,
commit_position=commit_position_to_analyze,
flakiness_thus_far=None,
get_isolate_sha_output=get_sha_output,
master_name=analysis.master_name,
previous_swarming_task_output=None,
reference_build_number=analysis.build_number,
revision=revision_to_analyze,
step_name=analysis.step_name,
test_name=analysis.test_name))
yield UpdateFlakeAnalysisDataPointsPipeline(
self.CreateInputObjectInstance(
UpdateFlakeAnalysisDataPointsInput,
analysis_urlsafe_key=analysis_urlsafe_key,
flakiness=flakiness))
# Determine the next commit position to analyze.
next_commit_position_output = yield NextCommitPositionPipeline(
self.CreateInputObjectInstance(
NextCommitPositionInput,
analysis_urlsafe_key=analysis_urlsafe_key,
commit_position_range=parameters.commit_position_range,
step_metadata=parameters.step_metadata))
# Recurse on the new commit position.
yield RecursiveAnalyzeFlakePipeline(
self.CreateInputObjectInstance(
AnalyzeFlakeInput,
analysis_urlsafe_key=analysis_urlsafe_key,
analyze_commit_position_parameters=next_commit_position_output,
commit_position_range=parameters.commit_position_range,
dimensions=parameters.dimensions,
manually_triggered=parameters.manually_triggered,
rerun=parameters.rerun,
retries=0,
step_metadata=parameters.step_metadata))
else:
# Can't start the analysis just yet, reschedule.
parameters.retries += 1
delay_seconds = flake_analysis_util.CalculateDelaySecondsBetweenRetries(
analysis, parameters.retries, parameters.manually_triggered)
delay = yield DelayPipeline(delay_seconds)
with pipeline.After(delay):
yield RecursiveAnalyzeFlakePipeline(parameters)
class RecursiveAnalyzeFlakePipeline(GeneratorPipeline):
"""A wrapper for AnalyzeFlakePipeline for testability only.
Because AnalyzeFlakePipeline is recursive, in unit tests it is not possible
to mock only the recursive call to validate its input independently of the
original call.
"""
input_type = AnalyzeFlakeInput
def RunImpl(self, parameters):
yield AnalyzeFlakePipeline(parameters)