blob: 91b0fb6695a56f1f50f9a95f6bdfe73c8325833b [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This module is to provide Findit service APIs through Cloud Endpoints:
Current APIs include:
1. Analysis of compile/test failures in Chromium waterfalls.
Analyzes failures and detects suspected CLs.
2. Analysis of flakes on Commit Queue.
"""
from collections import defaultdict
import json
import logging
import pickle
import endpoints
from google.appengine.api import taskqueue
from protorpc import messages
from protorpc import remote
import gae_ts_mon
from common import appengine_util
from common import constants
from common.waterfall import failure_type
from gae_libs.http import auth_util
from libs import time_util
from model import analysis_approach_type
from model import analysis_status
from model.flake.flake_analysis_request import FlakeAnalysisRequest
from model.suspected_cl_confidence import SuspectedCLConfidence
from model.wf_analysis import WfAnalysis
from model.wf_suspected_cl import WfSuspectedCL
from model.wf_swarming_task import WfSwarmingTask
from model.wf_try_job import WfTryJob
from waterfall import buildbot
from waterfall import build_util
from waterfall import suspected_cl_util
from waterfall import waterfall_config
from waterfall.flake import flake_analysis_service
# This is used by the underlying ProtoRpc when creating names for the ProtoRPC
# messages below. This package name will show up as a prefix to the message
# class names in the discovery doc and client libraries.
package = 'FindIt'
# These subclasses of Message are basically definitions of Protocol RPC
# messages. https://cloud.google.com/appengine/docs/python/tools/protorpc/
class _BuildFailure(messages.Message):
master_url = messages.StringField(1, required=True)
builder_name = messages.StringField(2, required=True)
build_number = messages.IntegerField(3, variant=messages.Variant.INT32,
required=True)
# All failed steps of the build reported by the client.
failed_steps = messages.StringField(4, repeated=True, required=False)
class _BuildFailureCollection(messages.Message):
"""Represents a request from a client, eg. builder_alerts."""
builds = messages.MessageField(_BuildFailure, 1, repeated=True)
class _AnalysisApproach(messages.Enum):
HEURISTIC = analysis_approach_type.HEURISTIC
TRY_JOB = analysis_approach_type.TRY_JOB
class _SuspectedCL(messages.Message):
repo_name = messages.StringField(1, required=True)
revision = messages.StringField(2, required=True)
commit_position = messages.IntegerField(3, variant=messages.Variant.INT32)
confidence = messages.IntegerField(4, variant=messages.Variant.INT32)
analysis_approach = messages.EnumField(_AnalysisApproach, 5)
revert_cl_url = messages.StringField(6)
class _TryJobStatus(messages.Enum):
# Try job is pending or running. Can expect result from try job.
RUNNING = 1
# There is no try job, try job completed or try job finished with error.
# Result from try job is ready or no need to continue waiting for it.
FINISHED = 2
class _BuildFailureAnalysisResult(messages.Message):
master_url = messages.StringField(1, required=True)
builder_name = messages.StringField(2, required=True)
build_number = messages.IntegerField(3, variant=messages.Variant.INT32,
required=True)
step_name = messages.StringField(4, required=True)
is_sub_test = messages.BooleanField(5, variant=messages.Variant.BOOL,
required=True)
test_name = messages.StringField(6)
first_known_failed_build_number = messages.IntegerField(
7, variant=messages.Variant.INT32)
suspected_cls = messages.MessageField(_SuspectedCL, 8, repeated=True)
analysis_approach = messages.EnumField(_AnalysisApproach, 9)
try_job_status = messages.EnumField(_TryJobStatus, 10)
is_flaky_test = messages.BooleanField(11, variant=messages.Variant.BOOL)
# Indicates if Findit has any kind of findings: found the culprit or
# confirmed the test is flaky.
has_findings = messages.BooleanField(12, variant=messages.Variant.BOOL)
# If analysis is finished.
is_finished = messages.BooleanField(13, variant=messages.Variant.BOOL)
class _BuildFailureAnalysisResultCollection(messages.Message):
"""Represents a response to the client, eg. builder_alerts."""
results = messages.MessageField(_BuildFailureAnalysisResult, 1, repeated=True)
class _BuildStep(messages.Message):
master_name = messages.StringField(1, required=True)
builder_name = messages.StringField(2, required=True)
build_number = messages.IntegerField(
3, variant=messages.Variant.INT32, required=True)
step_name = messages.StringField(4, required=True)
class _Flake(messages.Message):
name = messages.StringField(1, required=True)
is_step = messages.BooleanField(2, required=False, default=False)
bug_id = messages.IntegerField(
3, variant=messages.Variant.INT32, required=True)
build_steps = messages.MessageField(_BuildStep, 4, repeated=True)
class _Build(messages.Message):
master_name = messages.StringField(1, required=True)
builder_name = messages.StringField(2, required=True)
build_number = messages.IntegerField(
3, variant=messages.Variant.INT32, required=True)
class _FlakeAnalysis(messages.Message):
queued = messages.BooleanField(1, required=True)
def _AsyncProcessFailureAnalysisRequests(builds):
"""Pushes a task on the backend to process requests of failure analysis."""
target = appengine_util.GetTargetNameForModule(constants.WATERFALL_BACKEND)
payload = json.dumps({'builds': builds})
taskqueue.add(
url=constants.WATERFALL_PROCESS_FAILURE_ANALYSIS_REQUESTS_URL,
payload=payload, target=target,
queue_name=constants.WATERFALL_FAILURE_ANALYSIS_REQUEST_QUEUE)
def _AsyncProcessFlakeReport(flake_analysis_request, user_email, is_admin):
"""Pushes a task on the backend to process the flake report."""
target = appengine_util.GetTargetNameForModule(constants.WATERFALL_BACKEND)
payload = pickle.dumps((flake_analysis_request, user_email, is_admin))
taskqueue.add(
url=constants.WATERFALL_PROCESS_FLAKE_ANALYSIS_REQUEST_URL,
payload=payload, target=target,
queue_name=constants.WATERFALL_FLAKE_ANALYSIS_REQUEST_QUEUE)
# Create a Cloud Endpoints API.
# https://cloud.google.com/appengine/docs/python/endpoints/create_api
@endpoints.api(name='findit', version='v1', description='FindIt API')
class FindItApi(remote.Service):
"""FindIt API v1."""
def _GetAdditionalInformationForCL(
self, repo_name, revision, confidences, build, reference_build_key):
"""Gets additional information for a cl.
Currently additional information contains:
confidence of the result;
approaches that found this cl: HEURISTIC, TRY_JOB or both;
revert_cl_url if the cl has been reverted by Findit.
"""
additional_info = {}
cl = WfSuspectedCL.Get(repo_name, revision)
if not cl:
return additional_info
master_name = buildbot.GetMasterNameFromUrl(build.master_url)
builder_name = build.builder_name
current_build = build.build_number
# If the CL is found by a try job, only the first failure will be recorded.
# So we might need to go to the first failure to get CL information.
build_info = cl.GetBuildInfo(master_name, builder_name, current_build)
first_build_info = None if not reference_build_key else cl.GetBuildInfo(
*build_util.GetBuildInfoFromId(reference_build_key))
additional_info['confidence'], additional_info['cl_approach'] = (
suspected_cl_util.GetSuspectedCLConfidenceScoreAndApproach(
confidences, build_info, first_build_info))
# Gets the revert_cl_url for the CL if there is one.
if cl.revert_cl_url:
additional_info['revert_cl_url'] = cl.revert_cl_url
return additional_info
def _GenerateBuildFailureAnalysisResult(
self, build, step_name, suspected_cls_in_result=None, first_failure=None,
test_name=None, analysis_approach=_AnalysisApproach.HEURISTIC,
confidences=None, try_job_status=None, is_flaky_test=False,
reference_build_key=None, has_findings=True, is_finished=True):
suspected_cls_in_result = suspected_cls_in_result or []
suspected_cls = []
for suspected_cl in suspected_cls_in_result:
repo_name = suspected_cl['repo_name']
revision = suspected_cl['revision']
commit_position = suspected_cl['commit_position']
additional_info = self._GetAdditionalInformationForCL(
repo_name, revision, confidences, build, reference_build_key)
if additional_info.get('cl_approach'):
cl_approach = (
_AnalysisApproach.HEURISTIC if
additional_info['cl_approach'] == analysis_approach_type.HEURISTIC
else _AnalysisApproach.TRY_JOB)
else:
cl_approach = analysis_approach
suspected_cls.append(_SuspectedCL(
repo_name=repo_name, revision=revision,
commit_position=commit_position,
confidence=additional_info.get('confidence'),
analysis_approach=cl_approach,
revert_cl_url=additional_info.get('revert_cl_url')))
return _BuildFailureAnalysisResult(
master_url=build.master_url,
builder_name=build.builder_name,
build_number=build.build_number,
step_name=step_name,
is_sub_test=test_name is not None,
test_name=test_name,
first_known_failed_build_number=first_failure,
suspected_cls=suspected_cls,
analysis_approach=analysis_approach,
try_job_status=try_job_status,
is_flaky_test=is_flaky_test,
has_findings=has_findings,
is_finished=is_finished)
def _GetStatusAndCulpritFromTryJob(
self, try_job, swarming_task, build_failure_type, step_name,
test_name=None):
"""Returns the culprit found by try-job for the given step or test."""
if swarming_task and swarming_task.status in (
analysis_status.PENDING, analysis_status.RUNNING):
return _TryJobStatus.RUNNING, None
if not try_job or try_job.failed:
return _TryJobStatus.FINISHED, None
if not try_job.completed:
return _TryJobStatus.RUNNING, None
if build_failure_type == failure_type.COMPILE:
if not try_job.compile_results: # pragma: no cover.
return _TryJobStatus.FINISHED, None
return (
_TryJobStatus.FINISHED,
try_job.compile_results[-1].get('culprit', {}).get(step_name))
if not try_job.test_results: # pragma: no cover.
return _TryJobStatus.FINISHED, None
if test_name is None:
step_info = try_job.test_results[-1].get('culprit', {}).get(step_name)
if not step_info or step_info.get('tests'): # pragma: no cover.
# TODO(chanli): For some steps like checkperms/sizes/etc, the culprit
# finding try-job might have test-level results.
return _TryJobStatus.FINISHED, None
return _TryJobStatus.FINISHED, step_info
ref_name = (swarming_task.parameters.get('ref_name') if swarming_task and
swarming_task.parameters else None)
return (
_TryJobStatus.FINISHED, try_job.test_results[-1].get('culprit', {}).get(
ref_name or step_name, {}).get('tests', {}).get(test_name))
def _CheckIsFlaky(self, swarming_task, test_name):
"""Checks if the test is flaky."""
if not swarming_task or not swarming_task.classified_tests:
return False
return test_name in swarming_task.classified_tests.get('flaky_tests', [])
def _PopulateResult(
self, results, build, step_name, build_failure_type=None,
heuristic_result=None, confidences=None, reference_build_key=None,
swarming_task=None, try_job=None, test_name=None, has_findings=True,
is_finished=True):
"""Appends an analysis result for the given step or test.
Try-job results are always given priority over heuristic results.
"""
if not has_findings or not is_finished:
results.append(self._GenerateBuildFailureAnalysisResult(
build, step_name, has_findings=has_findings, is_finished=is_finished))
return
# Default to heuristic analysis.
suspected_cls = heuristic_result['suspected_cls']
analysis_approach = _AnalysisApproach.HEURISTIC
# Check if the test is flaky.
is_flaky_test = self._CheckIsFlaky(swarming_task, test_name)
if is_flaky_test:
suspected_cls = []
try_job_status = _TryJobStatus.FINISHED # There will be no try job.
else:
# Check analysis result from try-job.
try_job_status, culprit = self._GetStatusAndCulpritFromTryJob(
try_job, swarming_task, build_failure_type, step_name,
test_name=test_name)
if culprit:
suspected_cls = [culprit]
analysis_approach = _AnalysisApproach.TRY_JOB
if not is_flaky_test and not suspected_cls:
# No findings for the test.
has_findings = False
if try_job_status == _TryJobStatus.RUNNING:
is_finished = False
results.append(self._GenerateBuildFailureAnalysisResult(
build, step_name, suspected_cls, heuristic_result['first_failure'],
test_name, analysis_approach, confidences, try_job_status,
is_flaky_test, reference_build_key, has_findings, is_finished))
def _GetAllSwarmingTasks(self, failure_result_map):
"""Returns all swarming tasks related to one build.
Args:
A dict to map each step/test with the key to the build when it failed the
first time.
{
'step1': 'm/b/1',
'step2': {
'test1': 'm/b/1',
'test2': 'm/b/2'
}
}
Returns:
A dict of swarming tasks like below:
{
'step1': {
'm/b/1': WfSwarmingTask(
key=Key('WfBuild', 'm/b/1', 'WfSwarmingTask', 'step1'),...)
},
...
}
"""
if not failure_result_map:
return {}
swarming_tasks = defaultdict(dict)
for step_name, step_map in failure_result_map.iteritems():
if isinstance(step_map, basestring):
swarming_tasks[step_name][step_map] = (
WfSwarmingTask.Get(
*build_util.GetBuildInfoFromId(step_map), step_name=step_name))
else:
for task_key in step_map.values():
if not swarming_tasks[step_name].get(task_key):
swarming_tasks[step_name][task_key] = (
WfSwarmingTask.Get(*build_util.GetBuildInfoFromId(task_key),
step_name=step_name))
return swarming_tasks
def _GetAllTryJobs(self, failure_result_map):
"""Returns all try jobs related to one build.
Args:
A dict to map each step/test with the key to the build when it failed the
first time.
{
'step1': 'm/b/1',
'step2': {
'test1': 'm/b/1',
'test2': 'm/b/2'
}
}
Returns:
A dict of try jobs like below:
{
'm/b/1': WfTryJob(
key=Key('WfBuild', 'm/b/1'),...)
...
}
"""
if not failure_result_map:
return {}
try_jobs = {}
for step_map in failure_result_map.values():
if isinstance(step_map, basestring):
try_jobs[step_map] = WfTryJob.Get(*step_map.split('/'))
else:
for task_key in step_map.values():
if not try_jobs.get(task_key):
try_jobs[task_key] = WfTryJob.Get(*task_key.split('/'))
return try_jobs
def _GetSwarmingTaskAndTryJobForFailure(
self, step_name, test_name, failure_result_map, swarming_tasks, try_jobs):
"""Gets swarming task and try job for the specific step/test."""
if not failure_result_map:
return None, None, None
if test_name:
try_job_key = failure_result_map.get(step_name, {}).get(test_name)
else:
try_job_key = failure_result_map.get(step_name)
# Gets the swarming task for the test.
swarming_task = swarming_tasks.get(step_name, {}).get(try_job_key)
# Get the try job for the step/test.
try_job = try_jobs.get(try_job_key)
return try_job_key, swarming_task, try_job
def _GenerateResultsForBuild(
self, build, heuristic_analysis, results, confidences):
# Checks has_findings and is_finished for heuristic analysis.
has_findings = bool(heuristic_analysis.result
and not heuristic_analysis.failed)
is_finished = heuristic_analysis.completed
if not has_findings:
# No result.
for step_name in build.failed_steps:
self._PopulateResult(
results, build, step_name,
has_findings=has_findings, is_finished=is_finished)
return
swarming_tasks = self._GetAllSwarmingTasks(
heuristic_analysis.failure_result_map)
try_jobs = self._GetAllTryJobs(heuristic_analysis.failure_result_map)
steps_with_result = [
f.get('step_name') for f in heuristic_analysis.result['failures']]
steps_without_result = [
step_name for step_name in build.failed_steps if
step_name not in steps_with_result]
for step_name in steps_without_result:
has_findings = False # No findings for the step.
self._PopulateResult(
results, build, step_name,
has_findings=has_findings, is_finished=is_finished)
for failure in heuristic_analysis.result['failures']:
step_name = failure.get('step_name')
if failure.get('tests'): # Test-level analysis.
for test in failure['tests']:
test_name = test['test_name']
reference_build_key, swarming_task, try_job = (
self._GetSwarmingTaskAndTryJobForFailure(
step_name, test_name, heuristic_analysis.failure_result_map,
swarming_tasks, try_jobs))
self._PopulateResult(
results, build, step_name, heuristic_analysis.failure_type, test,
confidences, reference_build_key, swarming_task,
try_job, test_name=test_name)
else:
reference_build_key, swarming_task, try_job = (
self._GetSwarmingTaskAndTryJobForFailure(
step_name, None, heuristic_analysis.failure_result_map,
swarming_tasks, try_jobs))
self._PopulateResult(
results, build, step_name, heuristic_analysis.failure_type, failure,
confidences, reference_build_key, swarming_task, try_job)
@gae_ts_mon.instrument_endpoint()
@endpoints.method(
_BuildFailureCollection, _BuildFailureAnalysisResultCollection,
path='buildfailure', name='buildfailure')
def AnalyzeBuildFailures(self, request):
"""Returns analysis results for the given build failures in the request.
Analysis of build failures will be triggered automatically on demand.
Args:
request (_BuildFailureCollection): A list of build failures.
Returns:
_BuildFailureAnalysisResultCollection
A list of analysis results for the given build failures.
"""
results = []
supported_builds = []
confidences = SuspectedCLConfidence.Get()
for build in request.builds:
master_name = buildbot.GetMasterNameFromUrl(build.master_url)
if not (master_name and waterfall_config.MasterIsSupported(master_name)):
logging.info('%s/%s/%s is not supported',
build.master_url, build.builder_name, build.build_number)
continue
supported_builds.append({
'master_name': master_name,
'builder_name': build.builder_name,
'build_number': build.build_number,
'failed_steps': build.failed_steps,
})
# If the build failure was already analyzed and a new analysis is
# scheduled to analyze new failed steps, the returned WfAnalysis will
# still have the result from last completed analysis.
# If there is no analysis yet, no result is returned.
heuristic_analysis = WfAnalysis.Get(
master_name, build.builder_name, build.build_number)
if not heuristic_analysis:
continue
self._GenerateResultsForBuild(
build, heuristic_analysis, results, confidences)
logging.info('%d build failure(s), while %d are supported',
len(request.builds), len(supported_builds))
try:
_AsyncProcessFailureAnalysisRequests(supported_builds)
except Exception: # pragma: no cover.
# If we fail to post a task to the task queue, we ignore and wait for next
# request.
logging.exception('Failed to add analysis request to task queue: %s',
repr(supported_builds))
return _BuildFailureAnalysisResultCollection(results=results)
@gae_ts_mon.instrument_endpoint()
@endpoints.method(_Flake, _FlakeAnalysis, path='flake', name='flake')
def AnalyzeFlake(self, request):
"""Analyze a flake on Commit Queue. Currently only supports flaky tests."""
user_email = auth_util.GetUserEmail()
is_admin = auth_util.IsCurrentUserAdmin()
if not flake_analysis_service.IsAuthorizedUser(user_email, is_admin):
raise endpoints.UnauthorizedException(
'No permission to run a new analysis! User is %s' % user_email)
def CreateFlakeAnalysisRequest(flake):
analysis_request = FlakeAnalysisRequest.Create(
flake.name, flake.is_step, flake.bug_id)
for step in flake.build_steps:
analysis_request.AddBuildStep(step.master_name, step.builder_name,
step.build_number, step.step_name,
time_util.GetUTCNow())
return analysis_request
flake_analysis_request = CreateFlakeAnalysisRequest(request)
logging.info('Flake report: %s', flake_analysis_request)
try:
_AsyncProcessFlakeReport(flake_analysis_request, user_email, is_admin)
queued = True
except Exception:
# Ignore the report when fail to queue it for async processing.
queued = False
logging.exception('Failed to queue flake report for async processing')
return _FlakeAnalysis(queued=queued)