blob: 7f42378e9cce421bbc23654540654e8576209490 [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
ds "go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcutil"
admin "infra/tricium/api/admin/v1"
tricium "infra/tricium/api/v1"
"infra/tricium/appengine/common/config"
"infra/tricium/appengine/common/track"
)
// trackerServer represents the Tricium pRPC Tracker service.
type trackerServer struct{}
// WorkflowLaunched tracks the launch of a workflow.
func (*trackerServer) WorkflowLaunched(c context.Context, req *admin.WorkflowLaunchedRequest) (res *admin.WorkflowLaunchedResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(c, err)
}()
logging.Fields{
"runID": req.RunId,
}.Infof(c, "Received workflow launched request.")
if req.RunId == 0 {
return nil, errors.Reason("missing run ID").Tag(grpcutil.InvalidArgumentTag).Err()
}
if err := workflowLaunched(c, req, config.WorkflowCache); err != nil {
return nil, errors.Annotate(err, "failed to track workflow launched").Tag(grpcutil.InternalTag).Err()
}
return &admin.WorkflowLaunchedResponse{}, nil
}
func workflowLaunched(c context.Context, req *admin.WorkflowLaunchedRequest, wp config.WorkflowCacheAPI) error {
wf, err := wp.GetWorkflow(c, req.RunId)
if err != nil {
return errors.Annotate(err, "failed to read workflow config").Err()
}
// Prepare function and worker invocation tracking entries to store.
fw, functions := extractFunctionWorkerStructure(c, wf)
// In most cases, when a workflow is launched, we update the state of
// the workflow and the analyze request to RUNNING.
newState := tricium.State_RUNNING
// However, if there are no functions (rare edge case), then the workflow
// is trivially considered "done" and is marked as such immediately.
if len(fw) == 0 && len(functions) == 0 {
if err != nil {
return errors.Annotate(err, "failed to read workflow config").Err()
}
logging.Warningf(c, "No functions found in workflow, nothing to do")
newState = tricium.State_SUCCESS
}
logging.Debugf(c, "Extracted function/worker entries for tracking: %#v", fw)
requestKey := ds.NewKey(c, "AnalyzeRequest", "", req.RunId, nil)
if err := ds.RunInTransaction(c, func(c context.Context) (err error) {
// Store the root of the workflow.
workflowRun := &track.WorkflowRun{
ID: 1,
Parent: requestKey,
BuildbucketServerHost: wf.BuildbucketServerHost,
Functions: functions,
}
if err := ds.Put(c, workflowRun); err != nil {
return errors.Reason("failed to store WorkflowRun entity (run ID: %d): %v", req.RunId, err).Err()
}
runKey := ds.KeyForObj(c, workflowRun)
return parallel.FanOutIn(func(taskC chan<- func() error) {
// Update AnalyzeRequestResult.
taskC <- func() error {
r := &track.AnalyzeRequestResult{
ID: 1,
Parent: requestKey,
State: newState,
}
if err := ds.Put(c, r); err != nil {
return errors.Annotate(err, "failed to mark request as launched").Err()
}
return nil
}
// Update WorkflowRunResult.
taskC <- func() error {
r := &track.WorkflowRunResult{
ID: 1,
Parent: runKey,
State: newState,
}
if err := ds.Put(c, r); err != nil {
return errors.Annotate(err, "failed to mark workflow as launched").Err()
}
return nil
}
// Store Function and WorkerRun entities for tracking.
taskC <- func() error {
entities := make([]interface{}, 0, len(fw))
for _, v := range fw {
v.Function.Parent = runKey
functionKey := ds.KeyForObj(c, v.Function)
entities = append(entities, []interface{}{
v.Function,
&track.FunctionRunResult{
ID: 1,
Parent: functionKey,
Name: v.Function.ID,
State: tricium.State_PENDING,
},
}...)
for _, worker := range v.Workers {
worker.Parent = functionKey
entities = append(entities, worker)
workerKey := ds.KeyForObj(c, worker)
entities = append(entities, []interface{}{
worker,
&track.WorkerRunResult{
ID: 1,
Name: worker.ID,
Function: v.Function.ID,
Parent: workerKey,
State: tricium.State_PENDING,
},
}...)
}
}
if err := ds.Put(c, entities); err != nil {
return errors.Annotate(err, "failed to store function and worker entities").Err()
}
return nil
}
})
}, nil); err != nil {
return err
}
return nil
}
type functionRunWorkers struct {
Function *track.FunctionRun
Workers []*track.WorkerRun
}
// extractFunctionWorkerStructure extracts a map of function names to
// functionRunWorkers structures from a workflow config.
func extractFunctionWorkerStructure(
c context.Context, wf *admin.Workflow) (map[string]*functionRunWorkers, []string) {
m := map[string]*functionRunWorkers{}
var functions []string
for _, w := range wf.Workers {
function, _, err := track.ExtractFunctionPlatform(w.Name)
if err != nil {
logging.WithError(err).Errorf(c, "Failed to extract function name")
}
a, ok := m[function]
if !ok {
a = &functionRunWorkers{Function: &track.FunctionRun{ID: function}}
m[function] = a
}
workerRun := &track.WorkerRun{ID: w.Name, Platform: w.ProvidesForPlatform}
for _, n := range w.Next {
workerRun.Next = append(workerRun.Next, n)
}
a.Workers = append(a.Workers, workerRun)
a.Function.Workers = append(a.Function.Workers, w.Name)
functions = append(functions, function)
logging.Debugf(c, "Found function %q with %d workers", function, len(a.Workers))
}
return m, functions
}