package engine
import (
api ""
// pokeCron instantiates a cron state machine and calls the callback to advance
// its state.
// Should be part of a Job transaction. If the callback succeeds, job.State
// is updated with the new state and all emitted actions are dispatched to the
// task queue. The job entity should eventually be saved as part of this
// transaction.
// Returns fatal errors if something is not right with the job definition or
// state. Returns transient errors on task queue failures.
func pokeCron(c context.Context, job *Job, disp *tq.Dispatcher, cb func(m *cron.Machine) error) error {
sched, err := job.ParseSchedule()
if err != nil {
return errors.Annotate(err, "bad schedule %q", job.EffectiveSchedule()).Err()
now := clock.Now(c).UTC()
rnd := mathrand.Get(c)
machine := &cron.Machine{
Now: now,
Schedule: sched,
Nonce: func() int64 { return rnd.Int63() + 1 },
State: job.Cron,
if err := cb(machine); err != nil {
return errors.Annotate(err, "callback error").Err()
tasks := []*tq.Task{}
for _, action := range machine.Actions {
switch a := action.(type) {
case cron.TickLaterAction:
delay := a.When.Sub(now)
// Some very infrequent schedules may have next tick weeks in the future.
// However, Task Queue limits tasks to at most 30 days in the future.
// Thus, waiting may be done via a chain of several TQ tasks. To allow
// cron.Machine.OnTimerTick to distinguish such chains from accidentally
// too early execution of a specific TQ task, ensure intermediate TQ tasks
// are at least 1 hour before actual tick.
maxDelay := 15 * 24 * time.Hour // conservative 15 days.
fudge := 1 * time.Hour
if strings.HasSuffix(info.AppID(c), "-dev") {
// Use lower numbers on -dev to exercise this codepath frequently.
maxDelay = 2 * time.Minute
fudge = 1 * time.Minute
if delay > maxDelay+fudge {
logging.Infof(c, "Scheduling intermediary tick %d after %s instead of intended %s", a.TickNonce, maxDelay, delay)
delay = maxDelay
} else {
logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, delay)
tasks = append(tasks, &tq.Task{
Payload: &internal.CronTickTask{
JobId: job.JobID,
TickNonce: a.TickNonce,
Delay: delay,
case cron.StartInvocationAction:
trigger := cronTrigger(a, now)
logging.Infof(c, "Emitting cron trigger %s", trigger.Id)
tasks = append(tasks, &tq.Task{
Payload: &internal.EnqueueTriggersTask{
JobId: job.JobID,
Triggers: []*internal.Trigger{trigger},
return errors.Reason("unknown action %T emitted by the cron machine", action).Err()
if err := disp.AddTask(c, tasks...); err != nil {
return errors.Annotate(err, "failed to enqueue emitted actions").Err()
job.Cron = machine.State
return nil
// cronTrigger generates a trigger struct from an invocation request generated
// by the cron state machine.
func cronTrigger(a cron.StartInvocationAction, now time.Time) *internal.Trigger {
return &internal.Trigger{
Id: fmt.Sprintf("cron:v1:%d", a.Generation),
Created: timestamppb.New(now),
Payload: &internal.Trigger_Cron{
Cron: &api.CronTrigger{
Generation: a.Generation,