blob: fbe481ab6568fe2e55565d3eaa32582f5406f25e [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Queries CIDB for innocent CLs in failed CQ builds"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
import collections
import datetime
from chromite.lib import constants
from chromite.lib import cros_logging as logging
from google.cloud import datastore # pylint: disable=E0611,import-error
import pytz
from exonerator import gerrit_urls
from exonerator import checkpointlib
# datetime.datetime.min can't be used because %Y requires year >= 1900
_DATETIME_MIN = pytz.utc.localize(datetime.datetime(1900, 1, 2))
CQ_PROCESSED_KEY = 'CQProcessed'
_LAST_BUILD_KEY = 'LastBuild'
_LAST_ANNOTATION_GRACE_PERIOD = datetime.timedelta(days=2)
_IGNORE_CL_WITH_NEWER_ACTIONS = frozenset([
constants.CL_ACTION_KICKED_OUT,
constants.CL_ACTION_SUBMITTED,
# TODO: we can't use this optimization because slave builds will be
# considered "newer" builds which the CL was picked up in.
# constants.CL_ACTION_PICKED_UP,
])
ChangeWithBuild = collections.namedtuple('ChangeBuild', 'change build')
def NewInnocentCLs(conn, limit=None, checkpoint=True):
"""Finds new innocent CLs since the last run, with checkpointing.
Looping over the results with a for loop will result in a checkpoint for
each build as it is processed. Do not convert the results to a list unless you
don't care that this will checkpoint immediately during the list construction.
Args:
conn: The CIDBConnection to use.
limit: Maximum number of finalize messages to process. Defaults to None.
checkpoint: Whether to save progress after every buildMessage processed.
Yields:
lists of ChangeWithBuild objects, representing innocent CLs that were
kicked out of the build with build_id but not annoted with BAD_CL in the
annotator. Each list is a batch associated with a single Finalize message.
"""
messages = _FetchNewFinalizeMessages(conn, limit=limit, checkpoint=checkpoint)
for build_message in messages:
build_id = build_message['build_id']
batch = []
for change in _InnocentCLsFromAnnotatedBuild(conn, build_id):
# At this point we know that the CL was kicked out at some point,
# but we need to verify that this was the latest time this CL was
# kicked out. We don't want to exonerate a CL which was later
# blamed.
if not _ExistsNewerActionForCL(conn, change, build_id,
_IGNORE_CL_WITH_NEWER_ACTIONS):
batch.append(ChangeWithBuild(change, build_id))
logging.info('Found candidate CLs to exonerate from build %d: %s',
build_id, batch)
yield batch
def _FetchNewFinalizeMessages(conn, limit=None, checkpoint=True):
"""Fetches finalize messages that haven't been processed.
Checkpoints processed messages' timestamp with a LastBuild row. Specifically,
x = next(fetch)
# x is not checkpointed yet
y = next(fetch) # This checkpoints that x is done
Args:
conn: The CIDBConnection to use.
limit: Maximum number of finalize messages to process. Defaults to None.
checkpoint: Whether to save progress after every buildMessage processed.
Yields:
build messages which are newer than the LastBuild entity's timestamp.
"""
last_build_processed = _GetLastBuildEntity()
time_constraint = (
last_build_processed['timestamp'] - _LAST_ANNOTATION_GRACE_PERIOD)
messages = conn.GetBuildMessagesNewerThan(
timestamp=time_constraint,
message_type=constants.MESSAGE_TYPE_ANNOTATIONS_FINALIZED)
messages.sort(key=lambda m: m['timestamp'])
if not checkpoint:
return messages
ds = datastore.Client()
def _AlreadyProcessed(build_message):
build_id = build_message['build_id']
entity = ds.get(key=ds.key(CQ_PROCESSED_KEY, build_id))
return entity is not None
def _Save(build_message):
_Checkpoint(ds, build_message, last_build_processed)
return checkpointlib.CheckpointSequence(
_AlreadyProcessed, _Save, messages, limit=limit)
def _Checkpoint(ds, build_message, last_build_processed):
"""Checkpoints after every buildmessage consumed."""
build_id = build_message['build_id']
logging.info('Processed build %s, checkpointing...', build_id)
_UpdateLastBuildProcessed(ds, build_message, last_build_processed)
# This is necessary for only processing builds once because the LastBuild
# timestamp is just used for reducing the search space - it isn't used as a
# hard constraint. See _LAST_ANNOTATION_GRACE_PERIOD
entity = datastore.Entity(key=ds.key(CQ_PROCESSED_KEY, build_id))
ds.put(entity)
def _UpdateLastBuildProcessed(ds, build_message, last_build_processed):
"""Updates the LastBuild row with a later timestamp, if newer."""
timestamp = pytz.utc.localize(build_message['timestamp'])
if timestamp > last_build_processed['timestamp']:
logging.info('updating checkpoint to %s', timestamp)
last_build_processed['timestamp'] = timestamp
ds.put(last_build_processed)
def _InnocentCLsFromAnnotatedBuild(conn, build_id):
"""Find all innocent CLs in a build.
Args:
conn: A CIDBConnection
build_id: The build_id in question.
Returns:
A set of CLs which were in the build which were not blamed in a BAD_CL
annotation. Note: this may include CLs which didn't have their CQ+1
gerrit annotation removed.
"""
bad_cls = set(_BadCLsForBuild(conn, build_id))
cls_marked_not_ready = _KickedOutCLs(conn, build_id)
logging.debug('For build %s, CLs marked bad_cl were: %s', build_id, bad_cls)
logging.debug('For build %s, CLs kicked out were: %s', build_id,
cls_marked_not_ready)
return set(cl for cl in cls_marked_not_ready
if cl.GetChangeTuple() not in bad_cls)
def _BadCLsForBuild(conn, build_id):
"""Finds all annotated bad CLs for a build.
Args:
conn: A CIDBConnection
build_id: The build_id in question.
Yields:
GerritChangeTuples which were annotated with BAD_CL.
"""
annotations = conn.GetAnnotationsForBuilds([build_id]).get(build_id, [])
for annotation in annotations:
if annotation['failure_category'] != constants.FAILURE_CATEGORY_BAD_CL:
continue
try:
yield gerrit_urls.CLFromBlameURL(annotation['blame_url'])
except ValueError:
pass
def _KickedOutCLs(conn, build_id):
"""Returns the CLs which were kicked out from a given build
Args:
conn: A CIDBConnection
build_id: The build_id in question.
Returns:
An iterable of GerritPatchTuples which were kicked out in the build.
"""
cl_actions = conn.GetActionsForBuild(build_id)
return [
cl_action.patch for cl_action in cl_actions
if cl_action.action == constants.CL_ACTION_KICKED_OUT]
def _GetLastBuildEntity():
"""Gets the entity tracking the last timestamp processed by this service."""
ds = datastore.Client()
entity = ds.get(key=ds.key(_LAST_BUILD_KEY, _LAST_BUILD_KEY))
if entity is None:
entity = datastore.Entity(key=ds.key(_LAST_BUILD_KEY, _LAST_BUILD_KEY))
entity['timestamp'] = _DATETIME_MIN
return entity
def _ExistsNewerActionForCL(conn, change, build_id, action_types):
"""Whether there is a newer action for a given CL of a certain type.
Args:
conn: The CIDBConnection
change: A GerritPatchTuple to find actions for.
build_id: The threshold build_id.
action_types: A sequence of CL action types we are interested in.
Returns:
A boolean indicating whether there are any new CLActions of the given
|action_types|.
"""
# TODO(phobbs) this could be done with a batch query for all the candidate CLs
actions = conn.GetActionsForChanges([change])
newer_actions = [a for a in actions
if a.action in frozenset(action_types)
and a.build_id > build_id]
if newer_actions:
logging.info('Skipped exonerating %s because there exists newer actions,'
'including %s',
change, newer_actions[:1])
return bool(newer_actions)