| // Copyright 2015 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package task defines interface between Scheduler engine and implementations |
| // of particular tasks (such as URL fetch tasks, Swarming tasks, DM tasks, etc). |
| // |
| // Its subpackages contain concrete realizations of Manager interface. |
| package task |
| |
| import ( |
| "context" |
| "net/http" |
| "time" |
| |
| "github.com/golang/protobuf/proto" |
| "google.golang.org/api/pubsub/v1" |
| "google.golang.org/protobuf/types/known/structpb" |
| |
| "go.chromium.org/luci/auth/identity" |
| "go.chromium.org/luci/config/validation" |
| "go.chromium.org/luci/server/auth" |
| |
| "go.chromium.org/luci/scheduler/appengine/internal" |
| ) |
| |
| // Status is status of a single job invocation. |
| type Status string |
| |
| const ( |
| // StatusStarting means the task is about to start. |
| StatusStarting Status = "STARTING" |
| // StatusRetrying means the task was starting, but the launch failed in some |
| // transient way. The start attempt is retried in this case a bunch of times, |
| // until eventually the task moves into either StatusRunning or one of the |
| // final states. The only possible transition into StatusRetrying is from |
| // StatusStarting. A running task can only succeed or fail. |
| StatusRetrying Status = "RETRYING" |
| // StatusRunning means the task has started and is running now. |
| StatusRunning Status = "RUNNING" |
| // StatusSucceeded means the task finished with success. |
| StatusSucceeded Status = "SUCCEEDED" |
| // StatusFailed means the task finished with error or failed to start. |
| StatusFailed Status = "FAILED" |
| // StatusOverrun means the task should have been started, but previous one is |
| // still running. |
| StatusOverrun Status = "OVERRUN" |
| // StatusAborted means the task was forcefully aborted (manually or due to |
| // hard deadline). |
| StatusAborted Status = "ABORTED" |
| ) |
| |
| // Initial returns true if Status is Starting or Retrying. |
| // |
| // These statuses indicate an invocation before LaunchTask (perhaps, a retry of |
| // it) is finished with the invocation. |
| func (s Status) Initial() bool { |
| return s == StatusStarting || s == StatusRetrying |
| } |
| |
| // Final returns true if Status represents some final status. |
| func (s Status) Final() bool { |
| switch s { |
| case StatusSucceeded, StatusFailed, StatusOverrun, StatusAborted: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // Traits describes properties that influence how the scheduler engine manages |
| // tasks handled by this Manager. |
| type Traits struct { |
| // Multistage is true if Manager uses Starting -> Running -> Finished |
| // state chain for all invocations (instead of just Starting -> Finished). |
| // |
| // This is the case for "heavy" tasks that can run for undetermined amount |
| // of time (e.g. Swarming and Buildbucket tasks). By switching invocation |
| // state to Running, the Manager acknowledges that it takes responsibility for |
| // eventually moving the invocation to Finished state (perhaps in response to |
| // a PubSub notification or a timer tick). In other words, once an invocation |
| // is in Running state, the schedule engine will not automatically keep track |
| // of it's healthiness (it's the responsibility of the Manager now). |
| // |
| // For smaller tasks (that finish in seconds, e.g. gitiles poller tasks) it is |
| // simpler and more efficient just to do everything in LaunchTask and then |
| // move the invocation to Finished state. By doing so, the Manager avoids |
| // implementing healthiness checks, piggybacking on LaunchTask retries |
| // automatically performed by the scheduler engine. |
| // |
| // Currently this trait only influences the UI. Invocations with |
| // Multistage == false don't show up as "Starting" in the UI (they are |
| // displayed as "Running" instead, since it makes more sense from end-user |
| // perspective). |
| Multistage bool |
| } |
| |
| // Manager knows how to work with a particular kind of tasks (e.g URL fetch |
| // tasks, Swarming tasks, etc): how to deserialize, validate and execute them. |
| // |
| // Manager uses Controller to talk back to the scheduler engine. |
| type Manager interface { |
| // Name returns task manager name. It identifies the corresponding kind |
| // of tasks and used in various resource names (e.g. PubSub topic names). |
| Name() string |
| |
| // ProtoMessageType returns a pointer to protobuf message struct that |
| // describes config for the task kind, e.g. &UrlFetchTask{}. Will be used |
| // only for its type signature. |
| ProtoMessageType() proto.Message |
| |
| // Traits returns properties that influence how the scheduler engine manages |
| // tasks handled by this Manager. |
| // |
| // See Traits struct for more details. |
| Traits() Traits |
| |
| // ValidateProtoMessage verifies task definition proto message makes sense. |
| // msg must have same underlying type as ProtoMessageType() return value. |
| // |
| // realmID is a full realm name (as "<project>:<realm>") of the job whose |
| // definition is being validated. It is never empty, but may be a @legacy |
| // realm. |
| // |
| // Errors are returned via validation.Context. |
| ValidateProtoMessage(c *validation.Context, msg proto.Message, realmID string) |
| |
| // LaunchTask starts (or starts and finishes in one go) the task. |
| // |
| // Manager's responsibilities: |
| // * To move the task to some state other than StatusStarting |
| // (by changing ctl.State().Status). If at some point the task has moved |
| // to StatusRunning, the manager MUST setup some way to track the task's |
| // progress to eventually move it to some final state. It can be a status |
| // check via a timer (see `AddTimer` below), or a PubSub callback (see |
| // `PrepareTopic` below). |
| // * Be idempotent, if possible, using ctl.InvocationID() as an operation |
| // key. |
| // * Not to use supplied controller outside of LaunchTask call. |
| // * Not to use supplied controller concurrently without synchronization. |
| // |
| // If `LaunchTask` crashes before returning or returns a transient error, it |
| // will be called again later, receiving exact same ctl.InvocationID(). |
| // |
| // TaskManager may optionally use ctl.Save() to checkpoint progress and save |
| // debug log. ctl.Save() is also implicitly called by the engine when |
| // `LaunchTask` returns. |
| LaunchTask(c context.Context, ctl Controller) error |
| |
| // AbortTask is called to opportunistically abort launched task. |
| // |
| // It is called right before the job is forcefully switched to a failed state. |
| // The engine does not wait for the task runner to acknowledge this action. |
| // |
| // AbortTask must be idempotent since it may be called multiple times in case |
| // of errors. |
| AbortTask(c context.Context, ctl Controller) error |
| |
| // ExamineNotification is called to extract the auth token from the incoming |
| // PubSub message. |
| // |
| // It should return an empty string if the message is unrecognized/malformed |
| // or there's no auth token in it. Note that the PubSub message here is not |
| // yet validated and can be a total garbage (or even be malicious). |
| // |
| // See PrepareTopic for more info. |
| ExamineNotification(c context.Context, msg *pubsub.PubsubMessage) string |
| |
| // HandleNotification is called whenever engine receives a PubSub message sent |
| // to a topic created with Controller.PrepareTopic. Expect duplicated and |
| // out-of-order messages here. HandleNotification must be idempotent. |
| // |
| // Returns transient error to trigger a redeliver of the message, no error to |
| // to acknowledge the message and fatal error to move the invocation to failed |
| // state. |
| // |
| // Any modifications made to the invocation state will be saved regardless of |
| // the return value (to save the debug log). |
| HandleNotification(c context.Context, ctl Controller, msg *pubsub.PubsubMessage) error |
| |
| // HandleTimer is called to process timers set up by Controller.AddTimer. |
| // |
| // Expect duplicated or delayed events here. HandleTimer must be idempotent. |
| // |
| // Returns a transient error to trigger a redelivery of the event (the |
| // invocation state won't be saved in this case), no error to acknowledge the |
| // event and a fatal error to move the invocation to failed state. |
| HandleTimer(c context.Context, ctl Controller, name string, payload []byte) error |
| |
| // GetDebugState returns debug info about the state persisted by the manager. |
| GetDebugState(c context.Context, ctl ControllerReadOnly) (*internal.DebugManagerState, error) |
| } |
| |
| // Controller is passed to LaunchTask by the scheduler engine. It gives Manager |
| // control over one job invocation. Manager must not use it outside of |
| // LaunchTask. Controller implementation is generally not thread safe (but it's |
| // fine to use it from multiple goroutines if access is protected by a lock). |
| // |
| // All methods that accept context.Context expect contexts derived from ones |
| // passed to 'Manager' methods. A derived context can be used to set custom |
| // deadlines for some potentially expensive methods like 'PrepareTopic'. |
| type Controller interface { |
| ControllerReadOnly |
| |
| // State returns a mutable portion of task invocation state. |
| // |
| // TaskManager can modify it in-place and then call Controller.Save to persist |
| // the changes. The state will also be saved by the engine automatically if |
| // Manager doesn't call Save. |
| State() *State |
| |
| // AddTimer sets up a new delayed call to Manager.HandleTimer. |
| // |
| // Timers are active as long as the invocation is not in one of the final |
| // states. There is no way to cancel a timer (ignore HandleTimer call |
| // instead). |
| // |
| // 'title' will be visible in logs, it should convey a purpose for this timer. |
| // It doesn't have to be unique. |
| // |
| // 'payload' is any byte blob carried verbatim to Manager.HandleTimer. |
| // |
| // All timers are actually enabled in Save(), in the same transaction that |
| // updates the job state. |
| AddTimer(c context.Context, delay time.Duration, title string, payload []byte) |
| |
| // PrepareTopic create PubSub topic for notifications related to the task and |
| // adds given publisher to its ACL. |
| // |
| // It returns full name of the topic and a token that will be used to |
| // authenticate the PubSub message and bind it to the task the Controller is |
| // operating on now. Topic name and its configuration are controlled by the |
| // Engine. The publisher to the topic must put the token somewhere inside |
| // the message. The engine will ask the task manager to extract the token |
| // from the message via ExamineNotification, then it will validate the token |
| // and eventually call HandleNotification. |
| // |
| // 'publisher' can be a service account email, or an URL to some luci service. |
| // If URL is given, its /auth/api/v1/server/info endpoint will be used to |
| // grab a corresponding service account name. All service that use luci auth |
| // component expose this endpoint. |
| PrepareTopic(c context.Context, publisher string) (topic string, token string, err error) |
| |
| // EmitTrigger delivers a given trigger to all jobs which are triggered by |
| // current one. |
| EmitTrigger(ctx context.Context, trigger *internal.Trigger) |
| |
| // Save updates the state of the task in the persistent store. |
| // |
| // It also schedules all pending timer ticks added via AddTimer. |
| // |
| // Will be called by the engine after it launches the task. May also be called |
| // by the Manager itself, even multiple times (e.g. once to notify that the |
| // task has started, a second time to notify it has finished). |
| // |
| // Returns error if it couldn't save the invocation state. It is fine to |
| // ignore it. The engine will attempt to Save the invocation at the end anyway |
| // and it will properly handle the error if it happens again. |
| Save(c context.Context) error |
| } |
| |
| // ControllerReadOnly is a subset of Controller interface with methods that do |
| // not mutate the job's state. |
| type ControllerReadOnly interface { |
| // JobID returns full job ID the controller is operating on. |
| JobID() string |
| |
| // InvocationID returns unique identifier of this particular invocation. |
| InvocationID() int64 |
| |
| // RealmID returns the full realm ID of the job ("<project>:<realm>"). |
| RealmID() string |
| |
| // Request contains parameters of the invocation supplied when it was created. |
| Request() Request |
| |
| // Task is proto message with task definition. |
| // |
| // It is guaranteed to have same underlying type as manager.ProtoMessageType() |
| // return value. |
| Task() proto.Message |
| |
| // DebugLog appends a line to the free form text log of the task. |
| DebugLog(format string, args ...interface{}) |
| |
| // GetClient returns http.Client that is configured to use job's service |
| // account credentials to talk to other services. |
| GetClient(c context.Context, opts ...auth.RPCOption) (*http.Client, error) |
| } |
| |
| // State is mutable portion of the task invocation state. |
| // |
| // It can be mutated by TaskManager directly. |
| type State struct { |
| Status Status // overall status of the invocation, see the enum |
| TaskData []byte // storage for TaskManager-specific task data |
| ViewURL string // URL to human readable task page, shows in UI |
| } |
| |
| // Request contains parameters of the invocation supplied when it was created. |
| // |
| // They are calculated from the pending triggers when the invocation is |
| // initiated. |
| type Request struct { |
| // TriggeredBy contains ID of an end user that triggered this invocation (e.g |
| // through UI or API) or an empty string if it was triggered by the engine or |
| // it is a result of a multiple different triggers. |
| // |
| // Mostly FYI. |
| TriggeredBy identity.Identity |
| |
| // IncomingTriggers is a list of all triggers consumed by this invocation. |
| // |
| // Already sorted by time they were emitted (oldest first). |
| IncomingTriggers []*internal.Trigger |
| |
| // Properties are arbitrary key-value pairs derived from the triggers by the |
| // triggering policy function and interpreted by the triggered task manager. |
| Properties *structpb.Struct |
| |
| // Tags are arbitrary "<key>:<value>" pairs derived from the triggers by the |
| // triggering policy function. |
| // |
| // Primarily used for indexing and correlation of jobs/invocations with |
| // each other (including across different services). Task managers can pass |
| // them down the stack. |
| Tags []string |
| |
| // DebugLog is optional multi-line string to put in the invocation debug log |
| // when it starts. |
| // |
| // It is used to report debug information (produced by the engine triggering |
| // guts) to the invocation debug log (visible via UI). |
| // |
| // This field is used internally by the engine. Task managers will never see |
| // it set. |
| DebugLog string |
| } |
| |
| // LastTrigger is the most recent trigger from IncomingTriggers list or nil. |
| func (r *Request) LastTrigger() *internal.Trigger { |
| if t := r.IncomingTriggers; len(t) > 0 { |
| return t[len(t)-1] |
| } |
| return nil |
| } |
| |
| // TriggerIDs extracts list of IDs from IncomingTriggers. |
| // |
| // This is useful in tests for asserts. |
| func (r *Request) TriggerIDs() []string { |
| ids := make([]string, len(r.IncomingTriggers)) |
| for i, t := range r.IncomingTriggers { |
| ids[i] = t.Id |
| } |
| return ids |
| } |
| |
| // StringProperty returns a value of string property or "" if no such property |
| // or it has a different type. |
| // |
| // This is useful in tests for asserts. |
| func (r *Request) StringProperty(k string) string { |
| if r.Properties == nil { |
| return "" |
| } |
| prop := r.Properties.Fields[k] |
| if prop == nil { |
| return "" |
| } |
| return prop.GetStringValue() |
| } |