blob: f935e56d41741719d7423f50038ea3303fad765f [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
ds "go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcutil"
admin "infra/tricium/api/admin/v1"
tricium "infra/tricium/api/v1"
"infra/tricium/appengine/common/track"
)
// WorkerLaunched tracks the launch of a worker.
func (*trackerServer) WorkerLaunched(c context.Context, req *admin.WorkerLaunchedRequest) (res *admin.WorkerLaunchedResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(c, err)
}()
if req.RunId == 0 {
return nil, errors.New("missing run ID", grpcutil.InvalidArgumentTag)
}
if req.Worker == "" {
return nil, errors.New("missing worker", grpcutil.InvalidArgumentTag)
}
if req.BuildbucketBuildId == 0 {
return nil, errors.New("missing buildbucket ID", grpcutil.InvalidArgumentTag)
}
if err := workerLaunched(c, req); err != nil {
return nil, errors.Annotate(err, "failed to track worker launched").
Tag(grpcutil.InternalTag).Err()
}
return &admin.WorkerLaunchedResponse{}, nil
}
func workerLaunched(c context.Context, req *admin.WorkerLaunchedRequest) error {
logging.Fields{
"runID": req.RunId,
"worker": req.Worker,
"buildID": req.BuildbucketBuildId,
}.Infof(c, "Request received.")
// Compute needed keys.
requestKey := ds.NewKey(c, "AnalyzeRequest", "", req.RunId, nil)
runKey := ds.NewKey(c, "WorkflowRun", "", 1, requestKey)
name, _, err := track.ExtractFunctionPlatform(req.Worker)
if err != nil {
return errors.Annotate(err, "failed to extract function name").Err()
}
functionRunKey := ds.NewKey(c, "FunctionRun", name, 0, runKey)
workerKey := ds.NewKey(c, "WorkerRun", req.Worker, 0, functionRunKey)
if err := ds.RunInTransaction(c, func(c context.Context) (err error) {
return parallel.FanOutIn(func(taskC chan<- func() error) {
// Update worker state to launched.
taskC <- func() error {
wr := &track.WorkerRunResult{ID: 1, Parent: workerKey}
if err := ds.Get(c, wr); err != nil {
return errors.Annotate(err, "failed to get WorkerRunResult").Err()
}
if wr.State == tricium.State_PENDING {
wr.State = tricium.State_RUNNING
wr.BuildbucketBuildID = req.BuildbucketBuildId
if err := ds.Put(c, wr); err != nil {
return errors.Annotate(err, "failed to update WorkerRunResult").Err()
}
} else {
logging.Fields{
"runID": req.RunId,
"worker": req.Worker,
}.Warningf(c, "Worker not in PENDING state when launched.")
}
return nil
}
// Update function state to launched if necessary.
taskC <- func() error {
fr := &track.FunctionRunResult{ID: 1, Parent: functionRunKey}
if err := ds.Get(c, fr); err != nil {
return errors.Annotate(err, "failed to get FunctionRunResult").Err()
}
if fr.State == tricium.State_PENDING {
fr.State = tricium.State_RUNNING
if err := ds.Put(c, fr); err != nil {
return errors.Annotate(err, "failed to update FunctionRunResult to launched").Err()
}
}
return nil
}
})
}, nil); err != nil {
return nil
}
// Notify reporter.
request := &track.AnalyzeRequest{ID: req.RunId}
if err := ds.Get(c, request); err != nil {
return errors.Annotate(err, "failed to get AnalyzeRequest entity (run ID: %d)", req.RunId).Err()
}
return nil
}