blob: 181fc9684fecac4b34114f5a1361b8258a929223 [file] [log] [blame]
// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"github.com/golang/protobuf/proto"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
ds "go.chromium.org/luci/gae/service/datastore"
tq "go.chromium.org/luci/gae/service/taskqueue"
"go.chromium.org/luci/grpc/grpcutil"
admin "infra/tricium/api/admin/v1"
"infra/tricium/appengine/common"
"infra/tricium/appengine/common/config"
)
// LauncherServer represents the Tricium pRPC Launcher server.
type launcherServer struct{}
// Launch processes one launch request.
//
// Specifically, this is responsible for generating the workflow, and enqueuing
// subsequent requests to trigger initial workers and update datastore.
func (r *launcherServer) Launch(c context.Context, req *admin.LaunchRequest) (res *admin.LaunchResponse, err error) {
defer func() {
err = grpcutil.GRPCifyAndLogErr(c, err)
}()
logging.Fields{
"runID": req.RunId,
}.Infof(c, "Request received.")
if err := validateLaunchRequest(req); err != nil {
return nil, errors.Annotate(err, "invalid request").
Tag(grpcutil.InvalidArgumentTag).Err()
}
if err := launch(c, req, config.LuciConfigServer, common.PubsubServer); err != nil {
return nil, err
}
return &admin.LaunchResponse{}, nil
}
// validateLaunchRequest returns an error if the request is invalid.
func validateLaunchRequest(req *admin.LaunchRequest) error {
if req.RunId == 0 {
return errors.New("missing run ID")
}
if req.Project == "" {
return errors.New("missing project")
}
if req.GitUrl == "" {
return errors.New("missing git URL")
}
if req.GitRef == "" {
return errors.New("missing git ref")
}
if len(req.Files) == 0 {
return errors.New("missing files to analyze")
}
return nil
}
func launch(c context.Context, req *admin.LaunchRequest, cp config.ProviderAPI, pubsub common.PubSubAPI) error {
// Guard checking if there is already a stored workflow for the run ID
// in the request; if so stop here.
w := &config.Workflow{ID: req.RunId}
if err := ds.Get(c, w); err != ds.ErrNoSuchEntity {
logging.Infof(c, "Launch request for already-launched workflow")
return nil
}
// Generate workflow and convert to string.
sc, err := cp.GetServiceConfig(c)
if err != nil {
return errors.Annotate(err, "failed to get service config").Err()
}
pc, err := cp.GetProjectConfig(c, req.Project)
if err != nil {
return errors.Annotate(err, "failed to get project config").Err()
}
logging.Fields{
"runID": req.RunId,
"project": req.Project,
}.Infof(c, "About to generate workflow.")
wf, err := config.Generate(sc, pc, req.Files, req.GitRef, req.GitUrl)
if err != nil {
// Generate may fail if there are non-recipe-based analyzers.
// To help ease the transition, just log a warning and abort.
logging.Fields{
"project": req.Project,
}.Warningf(c, "Workflow generation failed, %s.", err)
return nil
}
// If there is nothing to run, abort and log a warning.
if len(wf.Functions) == 0 {
logging.Fields{
"project": req.Project,
}.Warningf(c, "Workflow had no functions to run.")
return nil
}
// Set up pubsub for worker completion notification.
err = pubsub.Setup(c)
if err != nil {
return errors.Annotate(err, "failed to setup pubsub for workflow").Err()
}
wfb, err := proto.Marshal(wf)
if err != nil {
return errors.Annotate(err, "failed to marshal workflow proto").Err()
}
// Prepare workflow config entry to store.
wfConfig := &config.Workflow{
ID: req.RunId,
SerializedWorkflow: wfb,
}
// Prepare workflow launched request.
b, err := proto.Marshal(&admin.WorkflowLaunchedRequest{RunId: req.RunId})
if err != nil {
return errors.Annotate(err, "failed to marshal trigger request proto").Err()
}
wfTask := tq.NewPOSTTask("/tracker/internal/workflow-launched", nil)
wfTask.Payload = b
// Prepare trigger requests for root workers.
wTasks := []*tq.Task{}
for _, worker := range wf.RootWorkers() {
b, err := proto.Marshal(&admin.TriggerRequest{
RunId: req.RunId,
Worker: worker,
})
if err != nil {
return errors.Annotate(err, "failed to encode driver request").Err()
}
t := tq.NewPOSTTask("/driver/internal/trigger", nil)
t.Payload = b
wTasks = append(wTasks, t)
}
return ds.RunInTransaction(c, func(c context.Context) (err error) {
// Store workflow config.
if err = ds.Put(c, wfConfig); err != nil {
return errors.Annotate(err, "failed to store workflow").Err()
}
// Run the below two operations in parallel.
done := make(chan error)
defer func() {
if err2 := <-done; err2 != nil {
err = err2
}
}()
go func() {
// Mark workflow as launched. Processing of this request needs the
// stored workflow config.
if err := tq.Add(c, common.TrackerQueue, wfTask); err != nil {
done <- errors.Annotate(err, "failed to enqueue workflow launched track request").Err()
}
done <- nil
}()
// Trigger root workers. Processing of this request needs the stored
// workflow config.
if err := tq.Add(c, common.DriverQueue, wTasks...); err != nil {
return errors.Annotate(err, "failed to enqueue trigger request(s) for root worker(s)").Err()
}
return nil
}, nil)
}