| // Copyright 2019 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package backend implements the core logic of Arquebus service. |
| package backend |
| |
| import ( |
| "context" |
| "net/http" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| "go.chromium.org/luci/appengine/tq" |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/grpc/prpc" |
| "go.chromium.org/luci/server/auth" |
| "go.chromium.org/luci/server/router" |
| |
| "infra/appengine/arquebus/app/backend/model" |
| "infra/appengine/arquebus/app/config" |
| "infra/appengine/arquebus/app/util" |
| "infra/appengine/rotang/proto/rotangapi" |
| rotationproxy "infra/appengine/rotation-proxy/proto" |
| monorail "infra/monorailv2/api/api_proto" |
| ) |
| |
| var ( |
| // maxIssueUpdatesExecutionTime is the maximum duration that a single Task |
| // run can spend for issue searches and updates. |
| maxIssueUpdatesExecutionTime = time.Second * 40 |
| |
| // nRetriesForSavingTaskEntity is the maximum number of trasnsactions that |
| // should be attempted to save Task entity in endTaskRun(). |
| // |
| // After searchAndUpdateIssues(), the status of the Task entity is set |
| // with one of Failed, Succeeded, and Aborted. However, if it fails to |
| // save the Task entity in datastore, Arquebus returns nil to TaskQueue |
| // to prevent the TQ work from being retried and performing another |
| // searchAndUpdateIssues(). |
| nRetriesForSavingTaskEntity = 8 |
| ) |
| |
| var ctxKeyMonorailClient = "monorail client" |
| var ctxKeyRotaNGClient = "rotang client" |
| var ctxKeyRotationProxyClient = "rotation-proxy client" |
| |
| func setMonorailClient(c context.Context, mc monorail.IssuesClient) context.Context { |
| return context.WithValue(c, &ctxKeyMonorailClient, mc) |
| } |
| |
| func setRotaNGClient(c context.Context, rc rotangapi.OncallInfoClient) context.Context { |
| return context.WithValue(c, &ctxKeyRotaNGClient, rc) |
| } |
| |
| func setRotationProxyClient(c context.Context, rc rotationproxy.RotationProxyServiceClient) context.Context { |
| return context.WithValue(c, &ctxKeyRotationProxyClient, rc) |
| } |
| |
| func getMonorailClient(c context.Context) monorail.IssuesClient { |
| return c.Value(&ctxKeyMonorailClient).(monorail.IssuesClient) |
| } |
| |
| func getRotaNGClient(c context.Context) rotangapi.OncallInfoClient { |
| return c.Value(&ctxKeyRotaNGClient).(rotangapi.OncallInfoClient) |
| } |
| |
| func getRotationProxyClient(c context.Context) rotationproxy.RotationProxyServiceClient { |
| return c.Value(&ctxKeyRotationProxyClient).(rotationproxy.RotationProxyServiceClient) |
| } |
| |
| func createMonorailClient(c context.Context) (monorail.IssuesClient, error) { |
| transport, err := auth.GetRPCTransport(c, auth.AsSelf) |
| if err != nil { |
| return nil, err |
| } |
| return monorail.NewIssuesPRPCClient( |
| &prpc.Client{ |
| C: &http.Client{Transport: transport}, |
| Host: config.Get(c).MonorailHostname, |
| }, |
| ), nil |
| } |
| |
| func createRotaNGClient(c context.Context) (rotangapi.OncallInfoClient, error) { |
| transport, err := auth.GetRPCTransport(c, auth.AsSelf) |
| if err != nil { |
| return nil, err |
| } |
| return rotangapi.NewOncallInfoPRPCClient( |
| &prpc.Client{ |
| C: &http.Client{Transport: transport}, |
| Host: config.Get(c).RotangHostname, |
| }, |
| ), nil |
| } |
| |
| func createRotationProxyClient(c context.Context) (rotationproxy.RotationProxyServiceClient, error) { |
| transport, err := auth.GetRPCTransport(c, auth.AsSelf) |
| if err != nil { |
| return nil, err |
| } |
| return rotationproxy.NewRotationProxyServicePRPCClient( |
| &prpc.Client{ |
| C: &http.Client{Transport: transport}, |
| Host: config.Get(c).RotationProxyHostname, |
| }, |
| ), nil |
| } |
| |
| // InstallHandlers installs TaskQueue handlers into a given task queue. |
| func InstallHandlers(r *router.Router, dispatcher *tq.Dispatcher, m router.MiddlewareChain) { |
| registerTaskHandlers(dispatcher) |
| |
| // install the dispatcher and RPC clients into the context so that |
| // they can be accessed via the context and overwritten in unit tests. |
| m = m.Extend(func(rc *router.Context, next router.Handler) { |
| rc.Context = util.SetDispatcher(rc.Context, dispatcher) |
| |
| monorailClient, err := createMonorailClient(rc.Context) |
| if err != nil { |
| util.ErrStatus( |
| rc, http.StatusInternalServerError, |
| "failed to create an RPC channel for Monorail: %s", err, |
| ) |
| return |
| } |
| rc.Context = setMonorailClient(rc.Context, monorailClient) |
| |
| rotaNGClient, err := createRotaNGClient(rc.Context) |
| if err != nil { |
| util.ErrStatus( |
| rc, http.StatusInternalServerError, |
| "failed to create an RPC channel for RotaNG: %s", err, |
| ) |
| return |
| } |
| rc.Context = setRotaNGClient(rc.Context, rotaNGClient) |
| |
| rotationProxyClient, err := createRotationProxyClient(rc.Context) |
| if err != nil { |
| util.ErrStatus( |
| rc, http.StatusInternalServerError, |
| "failed to create an RPC channel for Rotation Proxy: %s", err, |
| ) |
| return |
| } |
| rc.Context = setRotationProxyClient(rc.Context, rotationProxyClient) |
| next(rc) |
| }) |
| dispatcher.InstallRoutes(r, m) |
| } |
| |
| func registerTaskHandlers(dispatcher *tq.Dispatcher) { |
| dispatcher.RegisterTask( |
| &ScheduleAssignerTask{}, scheduleAssignerTaskHandler, |
| "schedule-assigners", nil, |
| ) |
| dispatcher.RegisterTask( |
| &RunAssignerTask{}, runAssignerTaskHandler, |
| "run-assigners", nil, |
| ) |
| } |
| |
| // GetAllAssigners returns all assigners. |
| func GetAllAssigners(c context.Context) ([]*model.Assigner, error) { |
| return model.GetAllAssigners(c) |
| } |
| |
| // GetAssigner returns the Assigner matching with a given ID. |
| func GetAssigner(c context.Context, aid string) (*model.Assigner, error) { |
| return model.GetAssigner(c, aid) |
| } |
| |
| // UpdateAssigners updates all the Assigner entities, based on the configs, |
| // and remove the assigners of which configs no longer exist. |
| func UpdateAssigners(c context.Context, cfgs []*config.Assigner, rev string) error { |
| return model.UpdateAssigners(c, cfgs, rev) |
| } |
| |
| // GetAssignerWithTasks returns up to |limit| of Task entities for |
| // the Assigner in ExpectedStart desc order. |
| // |
| // If includeNoopSuccess is true, the return includes the Task entities |
| // that were completed successfully without issue updates. |
| func GetAssignerWithTasks(c context.Context, assignerID string, limit int32, includeNoopSuccess bool) (assigner *model.Assigner, tasks []*model.Task, err error) { |
| if assigner, err = model.GetAssigner(c, assignerID); err == nil { |
| tasks, err = model.GetTasks(c, assigner, limit, includeNoopSuccess) |
| } |
| return |
| } |
| |
| // GetTask returns the task entity matching with the assigner and task IDs. |
| func GetTask(c context.Context, assignerID string, taskID int64) (*model.Assigner, *model.Task, error) { |
| return model.GetTask(c, assignerID, taskID) |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // |
| // TaskQueue handlers |
| |
| // scheduleAssignerTaskHandler ensures that a given assigner has at least |
| // one task scheduled. |
| func scheduleAssignerTaskHandler(c context.Context, tqTask proto.Message) error { |
| msg := tqTask.(*ScheduleAssignerTask) |
| return datastore.RunInTransaction(c, func(c context.Context) error { |
| tasks, err := model.EnsureScheduledTasks(c, msg.AssignerId) |
| if err != nil { |
| return err |
| } |
| return scheduleRuns(c, msg.AssignerId, tasks) |
| }, &datastore.TransactionOptions{}) |
| } |
| |
| func scheduleRuns(c context.Context, assignerID string, tasks []*model.Task) error { |
| tqTasks := make([]*tq.Task, len(tasks)) |
| for i, task := range tasks { |
| tqTasks[i] = &tq.Task{ |
| Payload: &RunAssignerTask{ |
| AssignerId: assignerID, |
| TaskId: task.ID, |
| }, |
| ETA: task.ExpectedStart, |
| } |
| } |
| return util.GetDispatcher(c).AddTask(c, tqTasks...) |
| } |
| |
| // startTaskRun updates the task status, based on the current status of |
| // the assigner and task. |
| func startTaskRun(c context.Context, assignerID string, taskID int64) (assigner *model.Assigner, task *model.Task, err error) { |
| err = datastore.RunInTransaction(c, func(c context.Context) error { |
| assigner, task, err = model.GetTask(c, assignerID, taskID) |
| if err != nil { |
| return err |
| } |
| if task.Status != model.TaskStatus_Scheduled { |
| logging.Warningf(c, ""+ |
| `the status is not "scheduled.", but "%q". It's likely `+ |
| `that this Task run has been already processed by `+ |
| `another worker`, |
| task.Status, |
| ) |
| // return nil for assigner and task so that this TQ work will end |
| // immediately without processing the Assigner Task. |
| assigner = nil |
| task = nil |
| return nil |
| } |
| |
| now := clock.Now(c).UTC() |
| task.Started = now |
| nextSch := task.ExpectedStart.Add(assigner.Interval) |
| |
| // check for drained assigner and stale tasks, |
| switch { |
| case assigner.IsDrained: |
| task.Status = model.TaskStatus_Cancelled |
| task.WriteLog(c, "the assigner has been drained; cancelling") |
| task.Ended = now |
| |
| case !assigner.HasMostRecentFormat(): |
| // Don't risk running the task with a stale Assigner. Better to wait until |
| // it is updated. |
| task.Status = model.TaskStatus_Cancelled |
| task.WriteLog(c, "Skipping the task, its assigner config has stale format.") |
| task.Ended = now |
| |
| case nextSch.Before(now.Add(maxIssueUpdatesExecutionTime)): |
| // It's either the task is stale or the remaining time is not long |
| // enough to have the maximum issue update execution time. |
| task.Status = model.TaskStatus_Cancelled |
| task.WriteLog(c, ""+ |
| "stale task or the remaining time before the next schedule "+ |
| "is too short; there should be at least %s left; cancelling", |
| maxIssueUpdatesExecutionTime) |
| task.Ended = now |
| |
| default: |
| task.Status = model.TaskStatus_Running |
| } |
| |
| if err := datastore.Put(c, task); err != nil { |
| return err |
| } |
| return nil |
| }, &datastore.TransactionOptions{}) |
| return |
| } |
| |
| // endTaskRun updates the task status, based on the current status of |
| // the assigner and task. |
| func endTaskRun(c context.Context, task *model.Task, nIssuesUpdated int32, issueUpdateError error) error { |
| switch { |
| case issueUpdateError == context.DeadlineExceeded: |
| task.Status = model.TaskStatus_Aborted |
| case issueUpdateError != nil: |
| task.Status = model.TaskStatus_Failed |
| default: |
| // TODO(crbug/967525): replace Task.WasNoopSuccess with |
| // Task.nIssuesUpdated. |
| task.WasNoopSuccess = nIssuesUpdated == 0 |
| task.Status = model.TaskStatus_Succeeded |
| } |
| task.Ended = clock.Now(c).UTC() |
| |
| return datastore.RunInTransaction(c, func(c context.Context) error { |
| return datastore.Put(c, task) |
| }, &datastore.TransactionOptions{Attempts: nRetriesForSavingTaskEntity}) |
| } |
| |
| // runAssignerTaskHandler runs an Assigner task, based on the Task entity. |
| func runAssignerTaskHandler(c context.Context, tqTask proto.Message) error { |
| msg := tqTask.(*RunAssignerTask) |
| |
| assigner, task, err := startTaskRun(c, msg.AssignerId, msg.TaskId) |
| if err != nil { |
| // if it fails to update the Task entity with a new status, then |
| // returns an error to trigger retries. |
| return err |
| } else if task == nil || task.Status != model.TaskStatus_Running { |
| return nil |
| } |
| |
| // At this moment, the assigner might have been drained. However, this |
| // task run should continue, as draining an assigner doesn't cancel |
| // a running task. |
| timedCtx, cancel := context.WithTimeout(c, maxIssueUpdatesExecutionTime) |
| defer cancel() |
| nIssuesUpdated, issueUpdateErr := searchAndUpdateIssues( |
| timedCtx, assigner, task, |
| ) |
| // if the error was due to the context timeout, override issueUpdateErr |
| // with it so that endTaskRun() can recognize the timeout error. |
| if issueUpdateErr != nil && timedCtx.Err() == context.DeadlineExceeded { |
| issueUpdateErr = context.DeadlineExceeded |
| } |
| |
| if err := endTaskRun(c, task, nIssuesUpdated, issueUpdateErr); err != nil { |
| // If datastore.Put() fails, ignore the error. Returning the error |
| // will cause the TQ task retried and searchAndUpdateIssues() will |
| // be performed again. |
| issueUpdateResult := "successful searchAndUpdateIssues()." |
| if issueUpdateErr != nil { |
| issueUpdateResult = "un" + issueUpdateResult |
| } |
| logging.Errorf( |
| c, "Failed to update Task entity after %s; %s", |
| issueUpdateResult, err, |
| ) |
| } |
| return nil |
| } |