blob: 12f3fccfc60fa5a4c4ce6190b613836a95ca18ca [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
// Package bq provides functions for preparing a row to send to BigQuery upon
// completion of a Run.
package bq
import (
cvbqpb ""
cvbq ""
func SendRun(ctx context.Context, id common.RunID) error {
a, err := makeAttempt(ctx, id)
if err != nil {
return errors.Annotate(err, "failed to make Attempt").Err()
// During the migration period when CQDaemon does most checks and triggers
// builds, CV can't populate all of the fields of Attempt without the
// information from CQDaemon; so for finished Attempts reported by
// CQDaemon, we can fill in the remaining fields.
switch cqda, err := fetchCQDAttempt(ctx, id); {
case err != nil:
return err
case cqda != nil:
a = reconcileAttempts(a, cqda)
// TODO(crbug/1173168): Change the destination table name after
// CQDaemon stops sending rows; and sent to "commit-queue" project.
// When we first start sending rows, we want to send to a separate table
// name.
return cvbq.SendRow(ctx, "raw", "attempts_cv", string(id), a)
func makeAttempt(ctx context.Context, id common.RunID) (*cvbqpb.Attempt, error) {
r := run.Run{ID: id}
switch err := datastore.Get(ctx, &r); {
case err == datastore.ErrNoSuchEntity:
return nil, errors.Reason("Run not found").Err()
case err != nil:
return nil, errors.Annotate(err, "failed to fetch Run").Tag(transient.Tag).Err()
if !run.IsEnded(r.Status) {
panic("Run status must be final before sending to BQ.")
// Load CLs and convert them to GerritChanges including submit status.
runCLs, err := run.LoadRunCLs(ctx, id, r.CLs)
if err != nil {
return nil, err
submittedSet := make(map[int64]struct{}, len(r.Submission.GetSubmittedCls()))
for _, clid := range r.Submission.GetSubmittedCls() {
submittedSet[clid] = struct{}{}
gerritChanges := make([]*cvbqpb.GerritChange, len(runCLs))
for i, cl := range runCLs {
gerritChanges[i] = toGerritChange(cl, submittedSet, r.Mode)
// TODO(crbug/1173168, crbug/1105669): We want to change the BQ
// schema so that StartTime is processing start time and CreateTime is
// trigger time.
a := &cvbqpb.Attempt{
Key: r.ID.AttemptKey(),
LuciProject: r.ID.LUCIProject(),
ConfigGroup: r.ConfigGroupID.Name(),
ClGroupKey: computeCLGroupKey(runCLs, false),
EquivalentClGroupKey: computeCLGroupKey(runCLs, true),
// Run.CreateTime is trigger time, which corresponds to what CQD sends for
// StartTime.
StartTime: timestamppb.New(r.CreateTime),
EndTime: timestamppb.New(r.EndTime),
GerritChanges: gerritChanges,
// Builds, Substatus and HasCustomRequirement are not known to CV yet
// during the migration state, so they should be filled in with Attempt
// from CQD if possible.
Builds: nil,
Status: attemptStatus(ctx, &r),
// TODO(crbug/1114686): Add a new FAILED_SUBMIT substatus, which
// should be used in the case that some CLs failed to submit after
// passing checks. (In this case, for backwards compatibility, we
// will set status = SUCCESS, substatus = FAILED_SUBMIT.)
Substatus: cvbqpb.AttemptSubstatus_NO_SUBSTATUS,
return a, nil
// toGerritChange creates a GerritChange for the given RunCL.
// This includes the submit status of the CL.
func toGerritChange(cl *run.RunCL, submitted map[int64]struct{}, mode run.Mode) *cvbqpb.GerritChange {
detail := cl.Detail
ci := detail.GetGerrit().GetInfo()
gc := &cvbqpb.GerritChange{
Host: detail.GetGerrit().Host,
Project: ci.Project,
Change: ci.Number,
Patchset: int64(detail.Patchset),
EarliestEquivalentPatchset: int64(detail.MinEquivalentPatchset),
TriggerTime: cl.Trigger.Time,
Mode: mode.BQAttemptMode(),
SubmitStatus: cvbqpb.GerritChange_PENDING,
if mode == run.FullRun {
// Mark the CL submit status as success if it appears in the submitted CLs
// list, and failure if it does not.
if _, ok := submitted[int64(cl.ID)]; ok {
gc.SubmitStatus = cvbqpb.GerritChange_SUCCESS
} else {
gc.SubmitStatus = cvbqpb.GerritChange_FAILURE
return gc
// fetchCQDAttempt fetches an Attempt from CQDaemon if available.
// Returns nil if no Attempt is available.
func fetchCQDAttempt(ctx context.Context, id common.RunID) (*cvbqpb.Attempt, error) {
v := migration.VerifiedCQDRun{ID: id}
switch err := datastore.Get(ctx, &v); {
case err == datastore.ErrNoSuchEntity:
// A Run may end without a VerifiedCQDRun stored if the Run is canceled.
logging.Debugf(ctx, "no VerifiedCQDRun found for Run %q", id)
case err != nil:
return nil, errors.Annotate(err, "failed to fetch VerifiedCQDRun").Tag(transient.Tag).Err()
return v.Payload.GetRun().GetAttempt(), nil
// reconcileAttempts merges the CV Attempt and CQDaemon Attempt.
// Modifies and returns the CV Attempt.
// Once CV does the relevant work (keeping track of builds, reading the CL
// description footers, and performing checks) these will no longer have to be
// filled in with the CQDaemon Attempt values.
func reconcileAttempts(a, cqda *cvbqpb.Attempt) *cvbqpb.Attempt {
// The list of Builds will be known to CV after it starts triggering
// and tracking builds; until then CQD is the source of truth.
a.Builds = cqda.Builds
// Substatus generally indicates a failure reason, which is
// known once one of the checks fails. CQDaemon may specify
// a substatus in the case of abort (substatus: MANUAL_CANCEL)
// or failure (FAILED_TRYJOBS etc.).
if a.Status == cvbqpb.AttemptStatus_ABORTED || a.Status == cvbqpb.AttemptStatus_FAILURE {
a.Status = cqda.Status
a.Substatus = cqda.Substatus
a.Status = cqda.Status
a.Substatus = cqda.Substatus
// The HasCustomRequirement is determined by CL description footers.
a.HasCustomRequirement = cqda.HasCustomRequirement
return a
// attemptStatus converts a Run status to Attempt status.
func attemptStatus(ctx context.Context, r *run.Run) cvbqpb.AttemptStatus {
switch r.Status {
case run.Status_SUCCEEDED:
return cvbqpb.AttemptStatus_SUCCESS
case run.Status_FAILED:
// In the case that the checks passed but not all CLs were submitted
// successfully, the Attempt will still have status set to SUCCESS for
// backwards compatibility. Note that r.Submission is expected to be
// set only if a submission is attempted, meaning all checks passed.
if r.Submission != nil && len(r.Submission.Cls) != len(r.Submission.SubmittedCls) {
return cvbqpb.AttemptStatus_SUCCESS
return cvbqpb.AttemptStatus_FAILURE
case run.Status_CANCELLED:
return cvbqpb.AttemptStatus_ABORTED
logging.Errorf(ctx, "Unexpected attempt status %q", r.Status)
return cvbqpb.AttemptStatus_ATTEMPT_STATUS_UNSPECIFIED
// computeCLGroupKey constructs keys for ClGroupKey and the related
// EquivalentClGroupKey.
// These are meant to be opaque keys unique to particular set of CLs and
// patchsets for the purpose of grouping together runs for the same sets of
// patchsets. if isEquivalent is true, then the "min equivalent patchset" is
// used instead of the latest patchset, so that trivial patchsets such as minor
// rebases and CL description updates don't change the key.
func computeCLGroupKey(cls []*run.RunCL, isEquivalent bool) string {
// First sort by in a deterministic way by (host, number, patchset).
sort.Slice(cls, func(i, j int) bool {
// ExternalID includes host and change number but not patchset.
if cls[i].ExternalID != cls[j].ExternalID {
return cls[i].ExternalID < cls[j].ExternalID
if isEquivalent {
return cls[i].Detail.GetMinEquivalentPatchset() < cls[j].Detail.GetMinEquivalentPatchset()
} else {
return cls[i].Detail.GetPatchset() < cls[j].Detail.GetPatchset()
h := sha256.New()
// CL group keys are meant to be opaque keys. We'd like to avoid people
// depending on CL group key and equivalent CL group key sometimes being
// equal. We can do this by adding a salt to the hash.
if isEquivalent {
separator := []byte{0}
for i, cl := range cls {
if i > 0 {
h.Write([]byte(strconv.FormatInt(cl.Detail.GetGerrit().GetInfo().GetNumber(), 10)))
if isEquivalent {
h.Write([]byte(strconv.FormatInt(int64(cl.Detail.GetMinEquivalentPatchset()), 10)))
} else {
h.Write([]byte(strconv.FormatInt(int64(cl.Detail.GetPatchset()), 10)))
return hex.EncodeToString(h.Sum(nil)[:8])