| // Copyright 2017 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package engine |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "sort" |
| "strings" |
| "testing" |
| "time" |
| |
| "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/runtime/protoiface" |
| |
| "google.golang.org/api/pubsub/v1" |
| "google.golang.org/protobuf/types/known/timestamppb" |
| |
| "go.chromium.org/luci/gae/filter/featureBreaker" |
| "go.chromium.org/luci/gae/service/datastore" |
| "go.chromium.org/luci/gae/service/taskqueue" |
| |
| "go.chromium.org/luci/appengine/tq" |
| "go.chromium.org/luci/appengine/tq/tqtesting" |
| "go.chromium.org/luci/auth/identity" |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/clock/testclock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/server/auth" |
| "go.chromium.org/luci/server/auth/authtest" |
| |
| api "go.chromium.org/luci/scheduler/api/scheduler/v1" |
| "go.chromium.org/luci/scheduler/appengine/catalog" |
| "go.chromium.org/luci/scheduler/appengine/engine/cron" |
| "go.chromium.org/luci/scheduler/appengine/internal" |
| "go.chromium.org/luci/scheduler/appengine/messages" |
| "go.chromium.org/luci/scheduler/appengine/schedule" |
| "go.chromium.org/luci/scheduler/appengine/task" |
| "go.chromium.org/luci/scheduler/appengine/task/noop" |
| |
| . "github.com/smartystreets/goconvey/convey" |
| |
| . "go.chromium.org/luci/common/testing/assertions" |
| ) |
| |
| func TestGetAllProjects(t *testing.T) { |
| t.Parallel() |
| |
| Convey("works", t, func() { |
| c := newTestContext(epoch) |
| e, _ := newTestEngine() |
| |
| // Empty. |
| projects, err := e.GetAllProjects(c) |
| So(err, ShouldBeNil) |
| So(len(projects), ShouldEqual, 0) |
| |
| // Non empty. |
| So(datastore.Put(c, |
| &Job{JobID: "abc/1", ProjectID: "abc", Enabled: true}, |
| &Job{JobID: "abc/2", ProjectID: "abc", Enabled: true}, |
| &Job{JobID: "def/1", ProjectID: "def", Enabled: true}, |
| &Job{JobID: "xyz/1", ProjectID: "xyz", Enabled: false}, |
| ), ShouldBeNil) |
| datastore.GetTestable(c).CatchupIndexes() |
| projects, err = e.GetAllProjects(c) |
| So(err, ShouldBeNil) |
| So(projects, ShouldResemble, []string{"abc", "def"}) |
| }) |
| } |
| |
| func TestUpdateProjectJobs(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with test context", t, func() { |
| c := newTestContext(epoch) |
| e, _ := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| jobDef := catalog.Definition{ |
| JobID: "proj/1", |
| RealmID: "proj:testing", |
| Revision: "rev1", |
| Schedule: "*/5 * * * * * *", |
| Task: []uint8{1, 2, 3}, // note: this is actually gibberish, but we don't care here |
| TriggeringPolicy: []uint8{4, 5, 6}, // same |
| } |
| |
| Convey("noop", func() { |
| So(e.UpdateProjectJobs(c, "proj", nil), ShouldBeNil) |
| So(allJobs(c), ShouldResemble, []Job{}) |
| }) |
| |
| Convey("adding a job", func() { |
| // Adding a job that ticks each 5 sec. |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{jobDef}), ShouldBeNil) |
| |
| // Added. |
| So(allJobs(c), ShouldResemble, []Job{ |
| { |
| JobID: "proj/1", |
| ProjectID: "proj", |
| RealmID: "proj:testing", |
| Revision: "rev1", |
| Enabled: true, |
| Schedule: "*/5 * * * * * *", |
| Cron: cron.State{ |
| Enabled: true, |
| Generation: 2, |
| LastRewind: epoch, |
| LastTick: cron.TickLaterAction{ |
| When: epoch.Add(5 * time.Second), |
| TickNonce: 6278013164014963328, |
| }, |
| }, |
| Task: jobDef.Task, |
| TriggeringPolicyRaw: jobDef.TriggeringPolicy, |
| }, |
| }) |
| |
| // The first tick is scheduled. |
| So(tq.GetScheduledTasks().Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| &internal.CronTickTask{JobId: "proj/1", TickNonce: 6278013164014963328}, |
| }) |
| |
| // Again, should be noop. |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{jobDef}), ShouldBeNil) |
| So(len(tq.GetScheduledTasks()), ShouldEqual, 1) // no new tasks |
| }) |
| |
| Convey("adding a very infrequent job", func() { |
| jobDef.Schedule = "59 23 31 12 *" // every Dec 31st. |
| nextTick := time.Date(epoch.Year(), time.December, 31, 23, 59, 00, 0, time.UTC) |
| So(nextTick.Sub(epoch), ShouldBeGreaterThan, 30*24*time.Hour) |
| |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{jobDef}), ShouldBeNil) |
| // Added. |
| So(allJobs(c), ShouldResemble, []Job{ |
| { |
| JobID: "proj/1", |
| ProjectID: "proj", |
| RealmID: "proj:testing", |
| Revision: "rev1", |
| Enabled: true, |
| Schedule: "59 23 31 12 *", |
| Cron: cron.State{ |
| Enabled: true, |
| Generation: 2, |
| LastRewind: epoch, |
| LastTick: cron.TickLaterAction{ |
| When: nextTick, |
| TickNonce: 6278013164014963328, |
| }, |
| }, |
| Task: jobDef.Task, |
| TriggeringPolicyRaw: jobDef.TriggeringPolicy, |
| }, |
| }) |
| |
| // The first tick is scheduled with ETA substantially before actual next |
| // tick. |
| So(tq.GetScheduledTasks().Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| &internal.CronTickTask{JobId: "proj/1", TickNonce: 6278013164014963328}, |
| }) |
| So(tq.GetScheduledTasks()[0].Task.Delay, ShouldBeLessThan, 30*24*time.Hour) |
| }) |
| |
| Convey("adding a job with txn retry", func() { |
| // Simulate the transaction retry. |
| datastore.GetTestable(c).SetTransactionRetryCount(2) |
| // Add a job. |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{jobDef}), ShouldBeNil) |
| // Added only one task, even though we had 2 retries. |
| So(len(tq.GetScheduledTasks()), ShouldEqual, 1) |
| }) |
| |
| Convey("adding a job with txn collision", func() { |
| // Simulate the transaction refusing to land even after many tries. |
| datastore.GetTestable(c).SetTransactionRetryCount(15) |
| // Attempt to add a job. |
| err := e.UpdateProjectJobs(c, "proj", []catalog.Definition{jobDef}) |
| // Failed transiently, nothing in the datastore or in TQ. |
| So(transient.Tag.In(err), ShouldBeTrue) |
| So(allJobs(c), ShouldResemble, []Job{}) |
| So(len(tq.GetScheduledTasks()), ShouldEqual, 0) |
| }) |
| |
| Convey("updating job's schedule", func() { |
| // Adding a job that ticks every 5 sec. Make sure its tick is scheduled. |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{jobDef}), ShouldBeNil) |
| So(tq.GetScheduledTasks().Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| &internal.CronTickTask{JobId: "proj/1", TickNonce: 6278013164014963328}, |
| }) |
| |
| // Changing it to tick every 30 sec. |
| newDef := jobDef |
| newDef.Schedule = "*/30 * * * * * *" |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{newDef}), ShouldBeNil) |
| |
| // The job is updated now. |
| So(allJobs(c), ShouldResemble, []Job{ |
| { |
| JobID: "proj/1", |
| ProjectID: "proj", |
| RealmID: "proj:testing", |
| Revision: "rev1", |
| Enabled: true, |
| Schedule: "*/30 * * * * * *", |
| Cron: cron.State{ |
| Enabled: true, |
| Generation: 3, |
| LastRewind: epoch, |
| LastTick: cron.TickLaterAction{ |
| When: epoch.Add(30 * time.Second), // new tick time |
| TickNonce: 2673062197574995716, |
| }, |
| }, |
| Task: jobDef.Task, |
| TriggeringPolicyRaw: jobDef.TriggeringPolicy, |
| }, |
| }) |
| |
| // The new tick is scheduled now too. |
| So(tq.GetScheduledTasks().Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| &internal.CronTickTask{JobId: "proj/1", TickNonce: 6278013164014963328}, |
| &internal.CronTickTask{JobId: "proj/1", TickNonce: 2673062197574995716}, |
| }) |
| }) |
| |
| Convey("updating job's triggering policy", func() { |
| // Adding a job that is triggered externally (doesn't tick). Schedules no |
| // tasks. |
| job := jobDef |
| job.Schedule = "triggered" |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{job}), ShouldBeNil) |
| So(tq.GetScheduledTasks().Payloads(), ShouldHaveLength, 0) |
| |
| // Update its triggering policy. It should emit a triage to evaluate |
| // the state of the job using this new policy. |
| job.TriggeringPolicy = []uint8{1, 1, 1, 1} |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{job}), ShouldBeNil) |
| |
| // The job is updated now. |
| So(allJobs(c), ShouldResemble, []Job{ |
| { |
| JobID: "proj/1", |
| ProjectID: "proj", |
| RealmID: "proj:testing", |
| Revision: "rev1", |
| Enabled: true, |
| Schedule: "triggered", |
| Cron: cron.State{ |
| Enabled: true, |
| Generation: 2, |
| LastRewind: epoch, |
| LastTick: cron.TickLaterAction{ |
| When: schedule.DistantFuture, |
| TickNonce: 6278013164014963328, |
| }, |
| }, |
| Task: jobDef.Task, |
| TriggeringPolicyRaw: job.TriggeringPolicy, |
| }, |
| }) |
| |
| // Kicked the triage indeed. |
| So(tq.GetScheduledTasks().Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| &internal.KickTriageTask{JobId: "proj/1"}, |
| }) |
| }) |
| |
| Convey("removing a job", func() { |
| // Adding a job first. |
| So(e.UpdateProjectJobs(c, "proj", []catalog.Definition{jobDef}), ShouldBeNil) |
| datastore.GetTestable(c).CatchupIndexes() |
| |
| // And now removing it. |
| So(e.UpdateProjectJobs(c, "proj", nil), ShouldBeNil) |
| |
| // Switched to disabled state. |
| So(allJobs(c), ShouldResemble, []Job{ |
| { |
| JobID: "proj/1", |
| ProjectID: "proj", |
| RealmID: "proj:testing", |
| Revision: "rev1", |
| Enabled: false, |
| Schedule: "*/5 * * * * * *", |
| Cron: cron.State{ |
| Enabled: false, |
| Generation: 3, |
| }, |
| Task: jobDef.Task, |
| TriggeringPolicyRaw: jobDef.TriggeringPolicy, |
| }, |
| }) |
| }) |
| }) |
| } |
| |
| func TestGenerateInvocationID(t *testing.T) { |
| t.Parallel() |
| |
| Convey("generateInvocationID does not collide", t, func() { |
| c := newTestContext(epoch) |
| |
| // Bunch of ids generated at the exact same moment in time do not collide. |
| ids := map[int64]struct{}{} |
| for i := 0; i < 20; i++ { |
| id, err := generateInvocationID(c) |
| So(err, ShouldBeNil) |
| ids[id] = struct{}{} |
| } |
| So(len(ids), ShouldEqual, 20) |
| }) |
| |
| Convey("generateInvocationID gen IDs with most recent first", t, func() { |
| c := newTestContext(epoch) |
| |
| older, err := generateInvocationID(c) |
| So(err, ShouldBeNil) |
| |
| clock.Get(c).(testclock.TestClock).Add(5 * time.Second) |
| |
| newer, err := generateInvocationID(c) |
| So(err, ShouldBeNil) |
| |
| So(newer, ShouldBeLessThan, older) |
| }) |
| } |
| |
| func TestQueries(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with mock data", t, func() { |
| c := newTestContext(epoch) |
| e, _ := newTestEngine() |
| |
| db := authtest.NewFakeDB( |
| authtest.MockMembership("user:admin@example.com", adminGroup), |
| |
| authtest.MockPermission("user:one@example.com", "abc:one", PermJobsGet), |
| authtest.MockPermission("user:some@example.com", "abc:some", PermJobsGet), |
| |
| authtest.MockPermission("anonymous:anonymous", "abc:public", PermJobsGet), |
| authtest.MockPermission("user:one@example.com", "abc:public", PermJobsGet), |
| authtest.MockPermission("user:some@example.com", "abc:public", PermJobsGet), |
| authtest.MockPermission("user:admin@example.com", "abc:public", PermJobsGet), |
| |
| authtest.MockPermission( |
| "user:some@example.com", |
| "abc:secret", |
| PermJobsGet, |
| authtest.RestrictAttribute("scheduler.job.name", "restricted"), |
| ), |
| ) |
| |
| ctxAnon := auth.WithState(c, &authtest.FakeState{ |
| Identity: "anonymous:anonymous", |
| FakeDB: db, |
| }) |
| ctxOne := auth.WithState(c, &authtest.FakeState{ |
| Identity: "user:one@example.com", |
| FakeDB: db, |
| }) |
| ctxSome := auth.WithState(c, &authtest.FakeState{ |
| Identity: "user:some@example.com", |
| FakeDB: db, |
| }) |
| ctxAdmin := auth.WithState(c, &authtest.FakeState{ |
| Identity: "user:admin@example.com", |
| FakeDB: db, |
| }) |
| |
| job1 := "abc/1" |
| job2 := "abc/2" |
| job3 := "abc/3" |
| |
| So(datastore.Put(c, |
| &Job{JobID: job1, ProjectID: "abc", Enabled: true, RealmID: "abc:one"}, |
| &Job{JobID: job2, ProjectID: "abc", Enabled: true, RealmID: "abc:some"}, |
| &Job{JobID: job3, ProjectID: "abc", Enabled: true, RealmID: "abc:public"}, |
| &Job{JobID: "def/1", ProjectID: "def", Enabled: true, RealmID: "abc:public"}, |
| &Job{JobID: "def/2", ProjectID: "def", Enabled: false, RealmID: "abc:public"}, |
| &Job{JobID: "secret/admin-only", ProjectID: "secret", Enabled: true, RealmID: "abc:secret"}, |
| &Job{JobID: "secret/restricted", ProjectID: "secret", Enabled: true, RealmID: "abc:secret"}, |
| ), ShouldBeNil) |
| |
| // Mocked invocations, all in finished state (IndexedJobID set). |
| So(datastore.Put(c, |
| &Invocation{ID: 1, JobID: job1, IndexedJobID: job1}, |
| &Invocation{ID: 2, JobID: job1, IndexedJobID: job1}, |
| &Invocation{ID: 3, JobID: job1, IndexedJobID: job1}, |
| &Invocation{ID: 4, JobID: job2, IndexedJobID: job2}, |
| &Invocation{ID: 5, JobID: job2, IndexedJobID: job2}, |
| &Invocation{ID: 6, JobID: job2, IndexedJobID: job2}, |
| &Invocation{ID: 7, JobID: job3, IndexedJobID: job3}, |
| ), ShouldBeNil) |
| |
| datastore.GetTestable(c).CatchupIndexes() |
| |
| Convey("GetAllProjects ignores ACLs and CurrentIdentity", func() { |
| test := func(ctx context.Context) { |
| r, err := e.GetAllProjects(c) |
| So(err, ShouldBeNil) |
| So(r, ShouldResemble, []string{"abc", "def", "secret"}) |
| } |
| test(c) |
| test(ctxAnon) |
| test(ctxAdmin) |
| }) |
| |
| Convey("GetVisibleJobs works", func() { |
| get := func(ctx context.Context) []string { |
| jobs, err := e.GetVisibleJobs(ctx) |
| So(err, ShouldBeNil) |
| return sortedJobIds(jobs) |
| } |
| |
| Convey("Anonymous users see only public jobs", func() { |
| // Only 3 jobs with default ACLs granting read access to everyone, but |
| // def/2 is disabled and so shouldn't be returned. |
| So(get(ctxAnon), ShouldResemble, []string{"abc/3", "def/1"}) |
| }) |
| Convey("Owners can see their own jobs + public jobs", func() { |
| // abc/1 is owned by one@example.com. |
| So(get(ctxOne), ShouldResemble, []string{"abc/1", "abc/3", "def/1"}) |
| }) |
| Convey("Explicit readers", func() { |
| So(get(ctxSome), ShouldResemble, []string{ |
| "abc/2", |
| "abc/3", |
| "def/1", |
| "secret/restricted", // via the conditional permission |
| }) |
| }) |
| Convey("Admins have implicit read access to all jobs", func() { |
| So(get(ctxAdmin), ShouldResemble, []string{ |
| "abc/1", |
| "abc/2", |
| "abc/3", |
| "def/1", |
| "secret/admin-only", |
| "secret/restricted", |
| }) |
| }) |
| }) |
| |
| Convey("GetProjectJobsRA works", func() { |
| get := func(ctx context.Context, project string) []string { |
| jobs, err := e.GetVisibleProjectJobs(ctx, project) |
| So(err, ShouldBeNil) |
| return sortedJobIds(jobs) |
| } |
| Convey("Anonymous can still see public jobs", func() { |
| So(get(ctxAnon, "def"), ShouldResemble, []string{"def/1"}) |
| }) |
| Convey("Admin have implicit read access to all jobs", func() { |
| So(get(ctxAdmin, "abc"), ShouldResemble, []string{"abc/1", "abc/2", "abc/3"}) |
| }) |
| Convey("Owners can still see their jobs", func() { |
| So(get(ctxOne, "abc"), ShouldResemble, []string{"abc/1", "abc/3"}) |
| }) |
| Convey("Readers can see their jobs", func() { |
| So(get(ctxSome, "abc"), ShouldResemble, []string{"abc/2", "abc/3"}) |
| }) |
| }) |
| |
| Convey("GetVisibleJob works", func() { |
| _, err := e.GetVisibleJob(ctxAdmin, "missing/job") |
| So(err, ShouldEqual, ErrNoSuchJob) |
| |
| _, err = e.GetVisibleJob(ctxAnon, "abc/1") // no "scheduler.jobs.get" permission. |
| So(err, ShouldEqual, ErrNoSuchJob) |
| |
| _, err = e.GetVisibleJob(ctxAnon, "def/2") // not enabled, hence not visible. |
| So(err, ShouldEqual, ErrNoSuchJob) |
| |
| job, err := e.GetVisibleJob(ctxAnon, "def/1") // OK. |
| So(job, ShouldNotBeNil) |
| So(err, ShouldBeNil) |
| }) |
| |
| Convey("ListInvocations works", func() { |
| job, err := e.GetVisibleJob(ctxOne, "abc/1") |
| So(err, ShouldBeNil) |
| |
| Convey("With paging", func() { |
| invs, cursor, err := e.ListInvocations(ctxOne, job, ListInvocationsOpts{ |
| PageSize: 2, |
| }) |
| So(err, ShouldBeNil) |
| So(len(invs), ShouldEqual, 2) |
| So(invs[0].ID, ShouldEqual, 1) |
| So(invs[1].ID, ShouldEqual, 2) |
| So(cursor, ShouldNotEqual, "") |
| |
| invs, cursor, err = e.ListInvocations(ctxOne, job, ListInvocationsOpts{ |
| PageSize: 2, |
| Cursor: cursor, |
| }) |
| So(err, ShouldBeNil) |
| So(len(invs), ShouldEqual, 1) |
| So(invs[0].ID, ShouldEqual, 3) |
| So(cursor, ShouldEqual, "") |
| }) |
| }) |
| |
| Convey("GetInvocation works", func() { |
| job, err := e.GetVisibleJob(ctxOne, "abc/1") |
| So(err, ShouldBeNil) |
| |
| Convey("NoSuchInvocation", func() { |
| _, err = e.GetInvocation(ctxOne, job, 666) // Missing invocation. |
| So(err, ShouldResemble, ErrNoSuchInvocation) |
| }) |
| |
| Convey("Existing invocation", func() { |
| inv, err := e.GetInvocation(ctxOne, job, 1) |
| So(inv, ShouldNotBeNil) |
| So(err, ShouldBeNil) |
| }) |
| }) |
| }) |
| } |
| |
| func TestPrepareTopic(t *testing.T) { |
| t.Parallel() |
| |
| Convey("PrepareTopic works", t, func(ctx C) { |
| c := newTestContext(epoch) |
| |
| e, _ := newTestEngine() |
| |
| pubSubCalls := 0 |
| e.configureTopic = func(c context.Context, topic, sub, pushURL, publisher string) error { |
| pubSubCalls++ |
| ctx.So(topic, ShouldEqual, fmt.Sprintf("projects/%s/topics/dev-scheduler.noop.some~publisher.com", fakeAppID)) |
| ctx.So(sub, ShouldEqual, fmt.Sprintf("projects/%s/subscriptions/dev-scheduler.noop.some~publisher.com", fakeAppID)) |
| ctx.So(pushURL, ShouldEqual, "") // pull on dev server |
| ctx.So(publisher, ShouldEqual, "some@publisher.com") |
| return nil |
| } |
| |
| ctl := &taskController{ |
| ctx: c, |
| eng: e, |
| manager: &noop.TaskManager{}, |
| saved: Invocation{ID: 123456}, |
| } |
| ctl.populateState() |
| |
| // Once. |
| topic, token, err := ctl.PrepareTopic(c, "some@publisher.com") |
| So(err, ShouldBeNil) |
| So(topic, ShouldEqual, fmt.Sprintf("projects/%s/topics/dev-scheduler.noop.some~publisher.com", fakeAppID)) |
| So(token, ShouldNotEqual, "") |
| So(pubSubCalls, ShouldEqual, 1) |
| |
| // Again. 'configureTopic' should not be called anymore. |
| _, _, err = ctl.PrepareTopic(c, "some@publisher.com") |
| So(err, ShouldBeNil) |
| So(pubSubCalls, ShouldEqual, 1) |
| }) |
| } |
| |
| func TestProcessPubSubPush(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with mock invocation", t, func() { |
| c := newTestContext(epoch) |
| e, mgr := newTestEngine() |
| |
| tc := clock.Get(c).(testclock.TestClock) |
| tc.SetTimerCallback(func(d time.Duration, t clock.Timer) { |
| tc.Add(d) |
| }) |
| |
| So(datastore.Put(c, &Job{ |
| JobID: "abc/1", |
| ProjectID: "abc", |
| Enabled: true, |
| }), ShouldBeNil) |
| |
| task, err := proto.Marshal(&messages.TaskDefWrapper{ |
| Noop: &messages.NoopTask{}, |
| }) |
| So(err, ShouldBeNil) |
| |
| inv := Invocation{ |
| ID: 1, |
| JobID: "abc/1", |
| Task: task, |
| } |
| So(datastore.Put(c, &inv), ShouldBeNil) |
| |
| // Skip talking to PubSub for real. |
| e.configureTopic = func(c context.Context, topic, sub, pushURL, publisher string) error { |
| return nil |
| } |
| |
| ctl, err := controllerForInvocation(c, e, &inv) |
| So(err, ShouldBeNil) |
| |
| // Grab the working auth token. |
| _, token, err := ctl.PrepareTopic(c, "some@publisher.com") |
| So(err, ShouldBeNil) |
| So(token, ShouldNotEqual, "") |
| |
| prepMessage := func(body, token string) []byte { |
| msg := struct { |
| Message pubsub.PubsubMessage `json:"message"` |
| }{ |
| Message: pubsub.PubsubMessage{ |
| Attributes: map[string]string{"auth_token": token}, |
| Data: body, |
| }, |
| } |
| blob, _ := json.Marshal(&msg) |
| return blob |
| } |
| |
| // Prep url.Values the same way PrepareTopic does. |
| urlValues := e.pushSubscriptionURLValues(ctl.manager, "some@publisher.com") |
| |
| // Extract the token from attributes where we put it below. |
| mgr.examineNotification = func(_ context.Context, msg *pubsub.PubsubMessage) string { |
| return msg.Attributes["auth_token"] |
| } |
| |
| Convey("ProcessPubSubPush works and retries tq.Retry errors", func() { |
| calls := 0 |
| mgr.handleNotification = func(ctx context.Context, msg *pubsub.PubsubMessage) error { |
| So(msg.Data, ShouldEqual, "blah") |
| calls++ |
| if calls == 1 { |
| return errors.New("should be retried", tq.Retry) |
| } |
| return nil |
| } |
| So(e.ProcessPubSubPush(c, prepMessage("blah", token), urlValues), ShouldBeNil) |
| So(calls, ShouldEqual, 2) // executed the retry |
| }) |
| |
| Convey("ProcessPubSubPush handles bad token", func() { |
| err := e.ProcessPubSubPush(c, prepMessage("blah", token+"blah"), urlValues) |
| So(err, ShouldErrLike, "bad token") |
| So(transient.Tag.In(err), ShouldBeFalse) |
| }) |
| |
| Convey("ProcessPubSubPush handles missing invocation", func() { |
| datastore.Delete(c, datastore.KeyForObj(c, &inv)) |
| |
| err := e.ProcessPubSubPush(c, prepMessage("blah", token), urlValues) |
| So(err, ShouldErrLike, "doesn't exist") |
| So(transient.Tag.In(err), ShouldBeFalse) |
| }) |
| |
| Convey("ProcessPubSubPush handles unknown task manager", func() { |
| // Pass `nil` instead of urlValue, so that the engine can't figure out |
| // what task manager to use. |
| err := e.ProcessPubSubPush(c, prepMessage("blah", token), nil) |
| So(err, ShouldErrLike, "unknown task manager") |
| So(transient.Tag.In(err), ShouldBeFalse) |
| }) |
| |
| Convey("ProcessPubSubPush can't find the auth token", func() { |
| mgr.examineNotification = func(context.Context, *pubsub.PubsubMessage) string { |
| return "" |
| } |
| err := e.ProcessPubSubPush(c, prepMessage("blah", token), urlValues) |
| So(err, ShouldErrLike, "failed to extract") |
| So(transient.Tag.In(err), ShouldBeFalse) |
| }) |
| }) |
| } |
| |
| func TestTrimDebugLog(t *testing.T) { |
| t.Parallel() |
| |
| ctx := clock.Set(context.Background(), testclock.New(epoch)) |
| junk := strings.Repeat("a", 1000) |
| |
| genLines := func(start, end int) string { |
| inv := Invocation{} |
| for i := start; i < end; i++ { |
| inv.debugLog(ctx, "Line %d - %s", i, junk) |
| } |
| return inv.DebugLog |
| } |
| |
| Convey("small log is not trimmed", t, func() { |
| inv := Invocation{ |
| DebugLog: genLines(0, 100), |
| } |
| inv.trimDebugLog() |
| So(inv.DebugLog, ShouldEqual, genLines(0, 100)) |
| }) |
| |
| Convey("huge log is trimmed", t, func() { |
| inv := Invocation{ |
| DebugLog: genLines(0, 500), |
| } |
| inv.trimDebugLog() |
| So(inv.DebugLog, ShouldEqual, |
| genLines(0, 94)+"--- the log has been cut here ---\n"+genLines(400, 500)) |
| }) |
| |
| Convey("writing lines to huge log and trimming", t, func() { |
| inv := Invocation{ |
| DebugLog: genLines(0, 500), |
| } |
| inv.trimDebugLog() |
| for i := 0; i < 10; i++ { |
| inv.debugLog(ctx, "Line %d - %s", i, junk) |
| inv.trimDebugLog() |
| } |
| // Still single cut only. New 10 lines are at the end. |
| So(inv.DebugLog, ShouldEqual, |
| genLines(0, 94)+"--- the log has been cut here ---\n"+genLines(410, 500)+genLines(0, 10)) |
| }) |
| |
| Convey("one huge line", t, func() { |
| inv := Invocation{ |
| DebugLog: strings.Repeat("z", 300000), |
| } |
| inv.trimDebugLog() |
| const msg = "\n--- the log has been cut here ---\n" |
| So(inv.DebugLog, ShouldEqual, strings.Repeat("z", debugLogSizeLimit-len(msg))+msg) |
| }) |
| } |
| |
| func TestEnqueueInvocations(t *testing.T) { |
| t.Parallel() |
| |
| Convey("Works", t, func() { |
| c := newTestContext(epoch) |
| e, _ := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| job := Job{JobID: "project/job"} |
| So(datastore.Put(c, &job), ShouldBeNil) |
| |
| var invs []*Invocation |
| err := runTxn(c, func(c context.Context) error { |
| var err error |
| invs, err = e.enqueueInvocations(c, &job, []task.Request{ |
| {TriggeredBy: "user:a@example.com"}, |
| {TriggeredBy: "user:b@example.com"}, |
| }) |
| datastore.Put(c, &job) |
| return err |
| }) |
| So(err, ShouldBeNil) |
| |
| // The order of new invocations is undefined (including IDs assigned to |
| // them), so convert them to map and clear IDs. |
| invsByTrigger := map[identity.Identity]Invocation{} |
| invIDs := map[int64]bool{} |
| for _, inv := range invs { |
| invIDs[inv.ID] = true |
| cpy := *inv |
| cpy.ID = 0 |
| invsByTrigger[inv.TriggeredBy] = cpy |
| } |
| So(invsByTrigger, ShouldResemble, map[identity.Identity]Invocation{ |
| "user:a@example.com": { |
| JobID: "project/job", |
| Started: epoch, |
| TriggeredBy: "user:a@example.com", |
| Status: task.StatusStarting, |
| DebugLog: "[22:42:00.000] New invocation is queued and will start shortly\n" + |
| "[22:42:00.000] Triggered by user:a@example.com\n", |
| }, |
| "user:b@example.com": { |
| JobID: "project/job", |
| Started: epoch, |
| TriggeredBy: "user:b@example.com", |
| Status: task.StatusStarting, |
| DebugLog: "[22:42:00.000] New invocation is queued and will start shortly\n" + |
| "[22:42:00.000] Triggered by user:b@example.com\n", |
| }, |
| }) |
| |
| // Both invocations are in ActiveInvocations list of the job. |
| So(len(job.ActiveInvocations), ShouldEqual, 2) |
| for _, invID := range job.ActiveInvocations { |
| So(invIDs[invID], ShouldBeTrue) |
| } |
| |
| // And we've emitted the launch task. |
| tasks := tq.GetScheduledTasks() |
| So(tasks[0].Payload, ShouldHaveSameTypeAs, &internal.LaunchInvocationsBatchTask{}) |
| batch := tasks[0].Payload.(*internal.LaunchInvocationsBatchTask) |
| So(len(batch.Tasks), ShouldEqual, 2) |
| for _, subtask := range batch.Tasks { |
| So(subtask.JobId, ShouldEqual, "project/job") |
| So(invIDs[subtask.InvId], ShouldBeTrue) |
| } |
| }) |
| } |
| |
| func TestTriageTaskDedup(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with fake env", t, func() { |
| c := newTestContext(epoch) |
| e, _ := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| Convey("single task", func() { |
| So(e.kickTriageNow(c, "fake/job"), ShouldBeNil) |
| |
| tasks := tq.GetScheduledTasks() |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Task.ETA.Equal(epoch.Add(2*time.Second)), ShouldBeTrue) |
| So(tasks[0].Payload, ShouldResembleProto, &internal.TriageJobStateTask{JobId: "fake/job"}) |
| }) |
| |
| Convey("a bunch of tasks, deduplicated by hitting memcache", func() { |
| So(e.kickTriageNow(c, "fake/job"), ShouldBeNil) |
| |
| clock.Get(c).(testclock.TestClock).Add(time.Second) |
| So(e.kickTriageNow(c, "fake/job"), ShouldBeNil) |
| |
| clock.Get(c).(testclock.TestClock).Add(900 * time.Millisecond) |
| So(e.kickTriageNow(c, "fake/job"), ShouldBeNil) |
| |
| tasks := tq.GetScheduledTasks() |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Task.ETA.Equal(epoch.Add(2*time.Second)), ShouldBeTrue) |
| So(tasks[0].Payload, ShouldResembleProto, &internal.TriageJobStateTask{JobId: "fake/job"}) |
| }) |
| |
| Convey("a bunch of tasks, deduplicated by hitting task queue", func() { |
| c, fb := featureBreaker.FilterMC(c, fmt.Errorf("omg, memcache error")) |
| fb.BreakFeatures(nil, "GetMulti", "SetMulti") |
| |
| So(e.kickTriageNow(c, "fake/job"), ShouldBeNil) |
| |
| clock.Get(c).(testclock.TestClock).Add(time.Second) |
| So(e.kickTriageNow(c, "fake/job"), ShouldBeNil) |
| |
| clock.Get(c).(testclock.TestClock).Add(900 * time.Millisecond) |
| So(e.kickTriageNow(c, "fake/job"), ShouldBeNil) |
| |
| tasks := tq.GetScheduledTasks() |
| So(len(tasks), ShouldEqual, 1) |
| So(tasks[0].Task.ETA.Equal(epoch.Add(2*time.Second)), ShouldBeTrue) |
| So(tasks[0].Payload, ShouldResembleProto, &internal.TriageJobStateTask{JobId: "fake/job"}) |
| }) |
| }) |
| } |
| |
| func TestLaunchInvocationTask(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with fake env", t, func() { |
| c := newTestContext(epoch) |
| e, mgr := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| // Add the job. |
| So(e.UpdateProjectJobs(c, "project", []catalog.Definition{ |
| { |
| JobID: "project/job", |
| RealmID: "project:testing", |
| Revision: "rev1", |
| Schedule: "triggered", |
| Task: noopTaskBytes(), |
| }, |
| }), ShouldBeNil) |
| |
| // Prepare Invocation in Starting state. |
| job := Job{JobID: "project/job"} |
| So(datastore.Get(c, &job), ShouldBeNil) |
| inv, err := e.allocateInvocation(c, &job, task.Request{ |
| IncomingTriggers: []*internal.Trigger{{Id: "a"}}, |
| }) |
| So(err, ShouldBeNil) |
| |
| callLaunchInvocation := func(c context.Context, execCount int64) error { |
| return tq.ExecuteTask(c, tqtesting.Task{ |
| Task: &taskqueue.Task{}, |
| Payload: &internal.LaunchInvocationTask{ |
| JobId: job.JobID, |
| InvId: inv.ID, |
| }, |
| }, &taskqueue.RequestHeaders{TaskExecutionCount: execCount}) |
| } |
| |
| fetchInvocation := func(c context.Context) *Invocation { |
| toFetch := Invocation{ID: inv.ID} |
| So(datastore.Get(c, &toFetch), ShouldBeNil) |
| return &toFetch |
| } |
| |
| Convey("happy path", func() { |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| So(ctl.InvocationID(), ShouldEqual, inv.ID) |
| So(ctl.RealmID(), ShouldEqual, "project:testing") |
| ctl.DebugLog("Succeeded!") |
| ctl.State().Status = task.StatusSucceeded |
| return nil |
| } |
| So(callLaunchInvocation(c, 0), ShouldBeNil) |
| |
| updated := fetchInvocation(c) |
| triggers, err := updated.IncomingTriggers() |
| updated.IncomingTriggersRaw = nil |
| |
| So(err, ShouldBeNil) |
| So(triggers, ShouldResemble, []*internal.Trigger{{Id: "a"}}) |
| So(updated, ShouldResemble, &Invocation{ |
| ID: inv.ID, |
| JobID: "project/job", |
| IndexedJobID: "project/job", |
| RealmID: "project:testing", |
| Started: epoch, |
| Finished: epoch, |
| Revision: job.Revision, |
| Task: job.Task, |
| Status: task.StatusSucceeded, |
| MutationsCount: 2, |
| DebugLog: "[22:42:00.000] New invocation is queued and will start shortly\n" + |
| "[22:42:00.000] Starting the invocation (attempt 1)\n" + |
| "[22:42:00.000] Succeeded!\n" + |
| "[22:42:00.000] Invocation finished in 0s with status SUCCEEDED\n", |
| }) |
| }) |
| |
| Convey("already aborted", func() { |
| inv.Status = task.StatusAborted |
| So(datastore.Put(c, inv), ShouldBeNil) |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| return fmt.Errorf("must not be called") |
| } |
| So(callLaunchInvocation(c, 0), ShouldBeNil) |
| So(fetchInvocation(c).Status, ShouldEqual, task.StatusAborted) |
| }) |
| |
| Convey("successful retry", func() { |
| // Attempt #1. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| return transient.Tag.Apply(fmt.Errorf("oops, failed to start")) |
| } |
| So(callLaunchInvocation(c, 0), ShouldEqual, errRetryingLaunch) |
| So(fetchInvocation(c).Status, ShouldEqual, task.StatusRetrying) |
| |
| // Attempt #2. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| ctl.DebugLog("Succeeded!") |
| ctl.State().Status = task.StatusSucceeded |
| return nil |
| } |
| So(callLaunchInvocation(c, 1), ShouldBeNil) |
| |
| updated := fetchInvocation(c) |
| So(updated.Status, ShouldEqual, task.StatusSucceeded) |
| So(updated.RetryCount, ShouldEqual, 1) |
| So(updated.DebugLog, ShouldEqual, "[22:42:00.000] New invocation is queued and will start shortly\n"+ |
| "[22:42:00.000] Starting the invocation (attempt 1)\n"+ |
| "[22:42:00.000] The invocation will be retried\n"+ |
| "[22:42:00.000] Starting the invocation (attempt 2)\n"+ |
| "[22:42:00.000] Succeeded!\n"+ |
| "[22:42:00.000] Invocation finished in 0s with status SUCCEEDED\n") |
| }) |
| |
| Convey("failed retry", func() { |
| // Attempt #1. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| return transient.Tag.Apply(fmt.Errorf("oops, failed to start")) |
| } |
| So(callLaunchInvocation(c, 0), ShouldEqual, errRetryingLaunch) |
| So(fetchInvocation(c).Status, ShouldEqual, task.StatusRetrying) |
| |
| // Attempt #2. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| return fmt.Errorf("boom, fatal shot") |
| } |
| So(callLaunchInvocation(c, 1), ShouldBeNil) // didn't ask for a retry |
| |
| updated := fetchInvocation(c) |
| So(updated.Status, ShouldEqual, task.StatusFailed) |
| So(updated.RetryCount, ShouldEqual, 1) |
| So(updated.DebugLog, ShouldEqual, "[22:42:00.000] New invocation is queued and will start shortly\n"+ |
| "[22:42:00.000] Starting the invocation (attempt 1)\n"+ |
| "[22:42:00.000] The invocation will be retried\n"+ |
| "[22:42:00.000] Starting the invocation (attempt 2)\n"+ |
| "[22:42:00.000] Invocation finished in 0s with status FAILED\n") |
| }) |
| }) |
| } |
| |
| func TestAbortJob(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with fake env", t, func() { |
| const jobID = "project/job" |
| const realmID = "project:testing" |
| const expectedInvID int64 = 9200093523825193008 |
| |
| c := newTestContext(epoch) |
| e, mgr := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| So(e.UpdateProjectJobs(c, "project", []catalog.Definition{ |
| { |
| JobID: jobID, |
| RealmID: realmID, |
| Revision: "rev1", |
| Schedule: "triggered", |
| Task: noopTaskBytes(), |
| }, |
| }), ShouldBeNil) |
| |
| // Launch a new invocation. |
| job, err := e.getJob(c, jobID) |
| So(err, ShouldBeNil) |
| inv := forceInvocation(c, e, jobID) |
| invID := inv.ID |
| So(invID, ShouldEqual, expectedInvID) |
| |
| Convey("inv aborted before it starts", func() { |
| // Kill it right away before it had a chance to start. |
| So(e.AbortJob(mockOwnerCtx(c, realmID), job), ShouldBeNil) |
| |
| // It is dead right away. |
| inv, err := e.getInvocation(c, jobID, invID) |
| So(err, ShouldBeNil) |
| So(inv, ShouldResemble, &Invocation{ |
| ID: expectedInvID, |
| JobID: jobID, |
| RealmID: realmID, |
| IndexedJobID: jobID, |
| Started: epoch, |
| Finished: epoch, |
| Revision: "rev1", |
| Task: noopTaskBytes(), |
| Status: task.StatusAborted, |
| MutationsCount: 1, |
| DebugLog: "[22:42:00.000] New invocation is queued and will start shortly\n" + |
| "[22:42:00.000] Invocation is manually aborted by user:owner@example.com\n" + |
| "[22:42:00.000] Invocation finished in 0s with status ABORTED\n", |
| }) |
| |
| // Unpause the task queue to confirm the new invocation doesn't actually |
| // start. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| panic("must not be called") |
| } |
| tasks, _, err := tq.RunSimulation(c, nil) |
| So(err, ShouldBeNil) |
| |
| // The sequence of tasks we've just performed. |
| So(tasks.Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| // The delayed triage directly from AbortJob. |
| &internal.KickTriageTask{JobId: jobID}, |
| // The invocation finalization from AbortInvocation. |
| &internal.InvocationFinishedTask{JobId: jobID, InvId: expectedInvID}, |
| |
| // Noop launch. We can't undo the already posted tasks. Note that the |
| // actual launch didn't happen, as checked by mgr.launchTask. |
| &internal.LaunchInvocationsBatchTask{ |
| Tasks: []*internal.LaunchInvocationTask{{JobId: jobID, InvId: expectedInvID}}, |
| }, |
| &internal.LaunchInvocationTask{JobId: jobID, InvId: expectedInvID}, |
| |
| // The triage from KickTriageTask and from InvocationFinishedTask |
| // finally arrives. |
| &internal.TriageJobStateTask{JobId: jobID}, |
| }) |
| |
| // The job state is updated (the invocation is no longer active). |
| job, err = e.getJob(c, jobID) |
| So(err, ShouldBeNil) |
| So(job.ActiveInvocations, ShouldBeNil) |
| |
| // The invocation is now in the list of finished invocations. |
| datastore.GetTestable(c).CatchupIndexes() |
| invs, _, _ := e.ListInvocations(mockOwnerCtx(c, realmID), job, ListInvocationsOpts{ |
| PageSize: 100, |
| FinishedOnly: true, |
| }) |
| So(invs, ShouldResemble, []*Invocation{inv}) |
| }) |
| |
| Convey("inv aborted while it is running", func() { |
| // Let the invocation start and set a timer. Abort the simulation before |
| // the timer ticks. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| ctl.State().Status = task.StatusRunning |
| ctl.AddTimer(ctx, time.Minute, "1 min", nil) |
| return nil |
| } |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| ShouldStopBefore: func(t tqtesting.Task) bool { |
| _, ok := t.Payload.(*internal.TimerTask) |
| return ok |
| }, |
| }) |
| So(err, ShouldBeNil) |
| |
| // The sequence of tasks we've just performed. |
| So(tasks.Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| &internal.LaunchInvocationsBatchTask{ |
| Tasks: []*internal.LaunchInvocationTask{{JobId: jobID, InvId: expectedInvID}}, |
| }, |
| &internal.LaunchInvocationTask{ |
| JobId: jobID, InvId: expectedInvID, |
| }, |
| }) |
| |
| // At this point the timer tick is scheduled to happen 1 min from now, but |
| // we abort the job. |
| mgr.abortTask = func(ctx context.Context, ctl task.Controller) error { |
| ctl.DebugLog("Really aborted!") |
| return nil |
| } |
| So(e.AbortJob(mockOwnerCtx(c, realmID), job), ShouldBeNil) |
| |
| // It is dead right away. |
| inv, err := e.getInvocation(c, jobID, invID) |
| So(inv.Status, ShouldEqual, task.StatusAborted) |
| |
| // And AbortTask callback was really called. |
| So(inv.DebugLog, ShouldContainSubstring, "Really aborted!") |
| |
| // Run all processes to completion. |
| mgr.handleTimer = func(ctx context.Context, ctl task.Controller, name string, payload []byte) error { |
| panic("must not be called") |
| } |
| tasks, _, err = tq.RunSimulation(c, nil) |
| So(err, ShouldBeNil) |
| |
| // The sequence of tasks we've just performed. |
| So(tasks.Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| // The delayed triage directly from AbortJob. |
| &internal.KickTriageTask{JobId: jobID}, |
| // The invocation finalization from AbortInvocation. |
| &internal.InvocationFinishedTask{JobId: jobID, InvId: expectedInvID}, |
| |
| // The triage from KickTriageTask and from InvocationFinishedTask |
| // finally arrives. |
| &internal.TriageJobStateTask{JobId: jobID}, |
| |
| // And delayed TimerTask arrives and gets skipped, as confirmed by |
| // mgr.handleTimer. |
| &internal.TimerTask{ |
| JobId: jobID, |
| InvId: expectedInvID, |
| Timer: &internal.Timer{ |
| Id: "project/job:9200093523825193008:1:0", |
| Created: timestamppb.New(epoch.Add(time.Second)), |
| Eta: timestamppb.New(epoch.Add(time.Minute + time.Second)), |
| Title: "1 min", |
| }, |
| }, |
| }) |
| }) |
| }) |
| } |
| |
| func TestEmitTriggers(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with fake env", t, func() { |
| const testingJob = "project/job" |
| const realmID = "project:testing" |
| |
| c := newTestContext(epoch) |
| e, mgr := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| So(e.UpdateProjectJobs(c, "project", []catalog.Definition{ |
| { |
| JobID: testingJob, |
| RealmID: realmID, |
| Revision: "rev1", |
| Schedule: "triggered", |
| Task: noopTaskBytes(), |
| }, |
| }), ShouldBeNil) |
| |
| Convey("happy path", func() { |
| var incomingTriggers []*internal.Trigger |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| incomingTriggers = ctl.Request().IncomingTriggers |
| ctl.State().Status = task.StatusSucceeded |
| return nil |
| } |
| |
| job, err := e.getJob(c, testingJob) |
| So(err, ShouldBeNil) |
| |
| // Simulate EmitTriggers call from an owner. |
| emittedTriggers := []*internal.Trigger{ |
| { |
| Id: "t1", |
| OrderInBatch: 1, |
| Payload: &internal.Trigger_Buildbucket{ |
| Buildbucket: &api.BuildbucketTrigger{ |
| Tags: []string{"a:b"}, |
| }, |
| }, |
| }, |
| { |
| Id: "t2", |
| OrderInBatch: 2, |
| }, |
| } |
| err = e.EmitTriggers(mockOwnerCtx(c, realmID), map[*Job][]*internal.Trigger{job: emittedTriggers}) |
| So(err, ShouldBeNil) |
| |
| // Run TQ until all activities stop. |
| tasks, _, err := tq.RunSimulation(c, nil) |
| So(err, ShouldBeNil) |
| |
| // We expect a triage invoked by EmitTrigger, and one full invocation. |
| expect := expectedTasks{JobID: testingJob, Epoch: epoch} |
| expect.triage() |
| expect.invocationSequence(9200093521727759856) |
| expect.triage() |
| So(tasks.Payloads(), ShouldResembleProto, expect.Tasks) |
| |
| // The task manager received all triggers (though maybe out of order, it |
| // is not defined). |
| sort.Slice(incomingTriggers, func(i, j int) bool { |
| return incomingTriggers[i].Id < incomingTriggers[j].Id |
| }) |
| So(incomingTriggers, ShouldResembleProto, emittedTriggers) |
| }) |
| |
| Convey("no scheduler.jobs.trigger permission", func() { |
| job, err := e.getJob(c, testingJob) |
| So(err, ShouldBeNil) |
| |
| err = e.EmitTriggers(mockReaderCtx(c, realmID), map[*Job][]*internal.Trigger{ |
| job: { |
| {Id: "t1"}, |
| }, |
| }) |
| So(err, ShouldEqual, ErrNoPermission) |
| }) |
| |
| Convey("paused job ignores triggers", func() { |
| job, err := e.getJob(c, testingJob) |
| So(err, ShouldBeNil) |
| So(e.setJobPausedFlag(c, job, true, "", ""), ShouldBeNil) |
| |
| // The pause emits a triage, get over it now. |
| tasks, _, err := tq.RunSimulation(c, nil) |
| So(err, ShouldBeNil) |
| expect := expectedTasks{JobID: testingJob, Epoch: epoch} |
| expect.kickTriage() |
| expect.triage() |
| So(tasks.Payloads(), ShouldResembleProto, expect.Tasks) |
| |
| // Make the RPC, which succeeds. |
| err = e.EmitTriggers(mockOwnerCtx(c, realmID), map[*Job][]*internal.Trigger{ |
| job: { |
| {Id: "t1"}, |
| }, |
| }) |
| So(err, ShouldBeNil) |
| |
| // But nothing really happens. |
| tasks, _, err = tq.RunSimulation(c, nil) |
| So(err, ShouldBeNil) |
| So(tasks.Payloads(), ShouldHaveLength, 0) |
| }) |
| }) |
| } |
| |
| func TestOneJobTriggersAnother(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with fake env", t, func() { |
| c := newTestContext(epoch) |
| e, mgr := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| triggeringJob := "project/triggering-job" |
| triggeredJob := "project/triggered-job" |
| |
| So(e.UpdateProjectJobs(c, "project", []catalog.Definition{ |
| { |
| JobID: triggeringJob, |
| RealmID: "project:testing", |
| TriggeredJobIDs: []string{triggeredJob}, |
| Revision: "rev1", |
| Schedule: "triggered", |
| Task: noopTaskBytes(), |
| }, |
| { |
| JobID: triggeredJob, |
| RealmID: "project:testing", |
| Revision: "rev1", |
| Schedule: "triggered", |
| Task: noopTaskBytes(), |
| }, |
| }), ShouldBeNil) |
| |
| Convey("happy path", func() { |
| const triggeringInvID int64 = 9200093523824911856 |
| const triggeredInvID int64 = 9200093521728457040 |
| |
| // Force launch the triggering job. |
| forceInvocation(c, e, triggeringJob) |
| |
| // Eventually it runs the task which emits a bunch of triggers, which |
| // causes the triggered job triage. We stop right before it and examine |
| // what we see. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| ctl.EmitTrigger(ctx, &internal.Trigger{Id: "t1"}) |
| So(ctl.Save(ctx), ShouldBeNil) |
| ctl.EmitTrigger(ctx, &internal.Trigger{Id: "t2"}) |
| ctl.State().Status = task.StatusSucceeded |
| return nil |
| } |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| ShouldStopBefore: func(t tqtesting.Task) bool { |
| _, ok := t.Payload.(*internal.TriageJobStateTask) |
| return ok |
| }, |
| }) |
| So(err, ShouldBeNil) |
| |
| // How these triggers are seen from outside the task. |
| expectedTrigger1 := &internal.Trigger{ |
| Id: "t1", |
| JobId: triggeringJob, |
| InvocationId: triggeringInvID, |
| Created: timestamppb.New(epoch.Add(1 * time.Second)), |
| } |
| expectedTrigger2 := &internal.Trigger{ |
| Id: "t2", |
| JobId: triggeringJob, |
| InvocationId: triggeringInvID, |
| Created: timestamppb.New(epoch.Add(1 * time.Second)), |
| OrderInBatch: 1, // second call to EmitTrigger done by the invocation |
| } |
| |
| // All the tasks we've just executed. |
| So(tasks.Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| // Triggering job begins execution. |
| &internal.LaunchInvocationsBatchTask{ |
| Tasks: []*internal.LaunchInvocationTask{{JobId: triggeringJob, InvId: triggeringInvID}}, |
| }, |
| &internal.LaunchInvocationTask{ |
| JobId: triggeringJob, InvId: triggeringInvID, |
| }, |
| |
| // It emits a trigger in the middle. |
| &internal.FanOutTriggersTask{ |
| JobIds: []string{triggeredJob}, |
| Triggers: []*internal.Trigger{expectedTrigger1}, |
| }, |
| &internal.EnqueueTriggersTask{ |
| JobId: triggeredJob, |
| Triggers: []*internal.Trigger{expectedTrigger1}, |
| }, |
| |
| // Triggering job finishes execution, emitting another trigger. |
| &internal.InvocationFinishedTask{ |
| JobId: triggeringJob, |
| InvId: triggeringInvID, |
| Triggers: &internal.FanOutTriggersTask{ |
| JobIds: []string{triggeredJob}, |
| Triggers: []*internal.Trigger{expectedTrigger2}, |
| }, |
| }, |
| &internal.EnqueueTriggersTask{ |
| JobId: triggeredJob, |
| Triggers: []*internal.Trigger{expectedTrigger2}, |
| }, |
| }) |
| |
| // Triggering invocation has finished (with triggers recorded). |
| triggeringInv, err := e.getInvocation(c, triggeringJob, triggeringInvID) |
| So(err, ShouldBeNil) |
| So(triggeringInv.Status, ShouldEqual, task.StatusSucceeded) |
| outgoing, err := triggeringInv.OutgoingTriggers() |
| So(err, ShouldBeNil) |
| So(outgoing, ShouldResembleProto, []*internal.Trigger{expectedTrigger1, expectedTrigger2}) |
| |
| // At this point triggered job's triage is about to start. Before it does, |
| // verify emitted trigger (sitting in the pending triggers set) is |
| // discoverable through ListTriggers. |
| tj, _ := e.getJob(c, triggeredJob) |
| triggers, err := e.ListTriggers(c, tj) |
| So(err, ShouldBeNil) |
| So(triggers, ShouldResembleProto, []*internal.Trigger{expectedTrigger1, expectedTrigger2}) |
| |
| // Resume the simulation to do the triages and start the triggered |
| // invocation. |
| var seen []*internal.Trigger |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| seen = ctl.Request().IncomingTriggers |
| ctl.State().Status = task.StatusSucceeded |
| return nil |
| } |
| tasks, _, err = tq.RunSimulation(c, nil) |
| So(err, ShouldBeNil) |
| |
| // All the tasks we've just executed. |
| So(tasks.Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| // Triggered job is getting triaged (because pending triggers). |
| &internal.TriageJobStateTask{ |
| JobId: triggeredJob, |
| }, |
| // Triggering job is getting triaged (because it has just finished). |
| &internal.TriageJobStateTask{ |
| JobId: triggeringJob, |
| }, |
| // The triggered job begins execution. |
| &internal.LaunchInvocationsBatchTask{ |
| Tasks: []*internal.LaunchInvocationTask{{JobId: triggeredJob, InvId: triggeredInvID}}, |
| }, |
| &internal.LaunchInvocationTask{ |
| JobId: triggeredJob, InvId: triggeredInvID, |
| }, |
| // ...and finishes. Note that the triage doesn't launch new invocation. |
| &internal.InvocationFinishedTask{ |
| JobId: triggeredJob, InvId: triggeredInvID, |
| }, |
| &internal.TriageJobStateTask{JobId: triggeredJob}, |
| }) |
| |
| // Verify LaunchTask callback saw the triggers. |
| So(seen, ShouldResembleProto, []*internal.Trigger{expectedTrigger1, expectedTrigger2}) |
| |
| // And they are recoded in IncomingTriggers set. |
| triggeredInv, err := e.getInvocation(c, triggeredJob, triggeredInvID) |
| So(err, ShouldBeNil) |
| So(triggeredInv.Status, ShouldEqual, task.StatusSucceeded) |
| incoming, err := triggeredInv.IncomingTriggers() |
| So(err, ShouldBeNil) |
| So(incoming, ShouldResembleProto, []*internal.Trigger{expectedTrigger1, expectedTrigger2}) |
| }) |
| }) |
| } |
| |
| func TestInvocationTimers(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with fake env", t, func() { |
| c := newTestContext(epoch) |
| e, mgr := newTestEngine() |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| const testJobID = "project/job" |
| So(e.UpdateProjectJobs(c, "project", []catalog.Definition{ |
| { |
| JobID: testJobID, |
| RealmID: "project:testing", |
| Revision: "rev1", |
| Schedule: "triggered", |
| Task: noopTaskBytes(), |
| }, |
| }), ShouldBeNil) |
| |
| Convey("happy path", func() { |
| const testInvID int64 = 9200093523825193008 |
| |
| // Force launch the job. |
| forceInvocation(c, e, testJobID) |
| |
| // See handelTimer. Name of the timer => time since epoch. |
| callTimes := map[string]time.Duration{} |
| |
| // Eventually it runs the task which emits a bunch of timers and then |
| // some more, and then stops. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| ctl.AddTimer(ctx, time.Minute, "1 min", []byte{1}) |
| ctl.AddTimer(ctx, 2*time.Minute, "2 min", []byte{2}) |
| ctl.State().Status = task.StatusRunning |
| return nil |
| } |
| mgr.handleTimer = func(ctx context.Context, ctl task.Controller, name string, payload []byte) error { |
| callTimes[name] = clock.Now(ctx).Sub(epoch) |
| switch name { |
| case "1 min": // ignore |
| case "2 min": |
| // Call us again later. |
| ctl.AddTimer(ctx, time.Minute, "stop", []byte{3}) |
| case "stop": |
| ctl.AddTimer(ctx, time.Minute, "ignored-timer", nil) |
| ctl.State().Status = task.StatusSucceeded |
| } |
| return nil |
| } |
| tasks, _, err := tq.RunSimulation(c, nil) |
| So(err, ShouldBeNil) |
| |
| timerMsg := func(idSuffix string, created, eta time.Duration, title string, payload []byte) *internal.Timer { |
| return &internal.Timer{ |
| Id: fmt.Sprintf("%s:%d:%s", testJobID, testInvID, idSuffix), |
| Created: timestamppb.New(epoch.Add(created)), |
| Eta: timestamppb.New(epoch.Add(eta)), |
| Title: title, |
| Payload: payload, |
| } |
| } |
| |
| // Individual timers emitted by the test. Note that 1 extra sec comes from |
| // the delay added by kickLaunchInvocationsBatchTask. |
| timer1 := timerMsg("1:0", time.Second, time.Second+time.Minute, "1 min", []byte{1}) |
| timer2 := timerMsg("1:1", time.Second, time.Second+2*time.Minute, "2 min", []byte{2}) |
| timer3 := timerMsg("3:0", time.Second+2*time.Minute, time.Second+3*time.Minute, "stop", []byte{3}) |
| |
| // All 'handleTimer' ticks happened at expected moments in time. |
| So(callTimes, ShouldResemble, map[string]time.Duration{ |
| "1 min": time.Second + time.Minute, |
| "2 min": time.Second + 2*time.Minute, |
| "stop": time.Second + 3*time.Minute, |
| }) |
| |
| // All the tasks we've just executed. |
| So(tasks.Payloads(), ShouldResembleProto, []protoiface.MessageV1{ |
| // Triggering job begins execution. |
| &internal.LaunchInvocationsBatchTask{ |
| Tasks: []*internal.LaunchInvocationTask{{JobId: testJobID, InvId: testInvID}}, |
| }, |
| &internal.LaunchInvocationTask{ |
| JobId: testJobID, InvId: testInvID, |
| }, |
| |
| // Request to schedule a bunch of timers. |
| &internal.ScheduleTimersTask{ |
| JobId: testJobID, |
| InvId: testInvID, |
| Timers: []*internal.Timer{timer1, timer2}, |
| }, |
| |
| // Actual individual timers. |
| &internal.TimerTask{ |
| JobId: testJobID, |
| InvId: testInvID, |
| Timer: timer1, |
| }, |
| &internal.TimerTask{ |
| JobId: testJobID, |
| InvId: testInvID, |
| Timer: timer2, |
| }, |
| |
| // One more, scheduled from handleTimer. |
| &internal.TimerTask{ |
| JobId: testJobID, |
| InvId: testInvID, |
| Timer: timer3, |
| }, |
| |
| // End of the invocation. |
| &internal.InvocationFinishedTask{ |
| JobId: testJobID, InvId: testInvID, |
| }, |
| &internal.TriageJobStateTask{JobId: testJobID}, |
| }) |
| }) |
| }) |
| } |
| |
| func TestCron(t *testing.T) { |
| t.Parallel() |
| |
| Convey("with fake env", t, func() { |
| const testJobID = "project/job" |
| |
| c := newTestContext(epoch) |
| e, mgr := newTestEngine() |
| |
| updateJob := func(schedule string) { |
| So(e.UpdateProjectJobs(c, "project", []catalog.Definition{ |
| { |
| JobID: testJobID, |
| RealmID: "project:testing", |
| Revision: "rev1", |
| Schedule: schedule, |
| Task: noopTaskBytes(), |
| }, |
| }), ShouldBeNil) |
| datastore.GetTestable(c).CatchupIndexes() |
| } |
| |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| ctl.State().Status = task.StatusSucceeded |
| return nil |
| } |
| |
| tq := tqtesting.GetTestable(c, e.cfg.Dispatcher) |
| tq.CreateQueues() |
| |
| Convey("relative schedule", func() { |
| updateJob("with 10s interval") |
| |
| Convey("happy path", func() { |
| // Let the TQ spin for two full cycles. |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| Deadline: epoch.Add(30 * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Collect the list of TQ tasks we expect to be executed. |
| expect := expectedTasks{JobID: testJobID, Epoch: epoch} |
| // 10 sec after the job is created, a tick comes and emits a trigger. |
| expect.cronTickSequence(6278013164014963328, 3, 10*time.Second) |
| // It causes an invocation. |
| expect.invocationSequence(9200093511241999856) |
| expect.triage() |
| // 10 sec after it finishes, new tick comes (4 extra seconds are from |
| // 2 sec delays induced by 2 triages). |
| expect.cronTickSequence(928953616732700780, 6, 24*time.Second) |
| // It causes an invocation. |
| expect.invocationSequence(9200093496562633040) |
| expect.triage() |
| // ... and so on |
| |
| So(tasks.Payloads(), ShouldResembleProto, expect.Tasks) |
| }) |
| |
| Convey("schedule changes", func() { |
| // Let the TQ spin until it hits the task execution. |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| ShouldStopBefore: func(t tqtesting.Task) bool { |
| _, ok := t.Payload.(*internal.LaunchInvocationTask) |
| return ok |
| }, |
| }) |
| So(err, ShouldBeNil) |
| |
| // At this point the job's schedule changes. |
| updateJob("with 30s interval") |
| |
| // We let the TQ spin some more. |
| moreTasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| Deadline: epoch.Add(50 * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Here's what we expect to execute. |
| expect := expectedTasks{JobID: testJobID, Epoch: epoch} |
| // 10 sec after the job is created, a tick comes and emits a trigger. |
| expect.cronTickSequence(6278013164014963328, 3, 10*time.Second) |
| // It causes an invocation. We changed the schedule to 30s after that. |
| expect.invocationSequence(9200093511241999856) |
| expect.triage() |
| // 30 sec after it finishes, new tick comes (4 extra seconds are from |
| // 2 sec delays induced by 2 triages). |
| expect.cronTickSequence(928953616732700780, 6, 44*time.Second) |
| // It causes an invocation. |
| expect.invocationSequence(9200093475591113040) |
| expect.triage() |
| |
| // Got it? |
| So(append(tasks, moreTasks...).Payloads(), ShouldResembleProto, expect.Tasks) |
| }) |
| |
| Convey("pause/unpause", func() { |
| // Let the TQ spin until it hits the end of task execution. |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| ShouldStopAfter: func(t tqtesting.Task) bool { |
| _, ok := t.Payload.(*internal.InvocationFinishedTask) |
| return ok |
| }, |
| }) |
| So(err, ShouldBeNil) |
| |
| // At this point we pause the job. |
| j, err := e.getJob(c, testJobID) |
| So(err, ShouldBeNil) |
| So(e.setJobPausedFlag(c, j, true, "user:someone@example.com", "pause reason"), ShouldBeNil) |
| |
| // The information about the pause was recorded in the entity. |
| job, err := e.getJob(c, testJobID) |
| So(err, ShouldBeNil) |
| So(job.Paused, ShouldBeTrue) |
| So(job.PausedOrResumedWhen, ShouldEqual, clock.Now(c).UTC()) |
| So(job.PausedOrResumedBy, ShouldEqual, identity.Identity("user:someone@example.com")) |
| So(job.PausedOrResumedReason, ShouldEqual, "pause reason") |
| |
| // We let the TQ spin some more. |
| moreTasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| Deadline: epoch.Add(time.Hour), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Here's what we expect to execute. |
| expect := expectedTasks{JobID: testJobID, Epoch: epoch} |
| // 10 sec after the job is created, a tick comes and emits a trigger. |
| expect.cronTickSequence(6278013164014963328, 3, 10*time.Second) |
| // It causes an invocation. We pause the job after that. |
| expect.invocationSequence(9200093511241999856) |
| // The pause kicks the triage, which later executes. |
| expect.kickTriage() |
| expect.triage() |
| // and nothing else happens ... |
| |
| // Got it? |
| So(append(tasks, moreTasks...).Payloads(), ShouldResembleProto, expect.Tasks) |
| |
| // Some time later we unpause the job, it starts again immediately. |
| clock.Get(c).(testclock.TestClock).Set(epoch.Add(time.Hour)) |
| So(e.setJobPausedFlag(c, j, false, "user:someone@example.com", "resume reason"), ShouldBeNil) |
| |
| tasks, _, err = tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| Deadline: epoch.Add(time.Hour + 10*time.Second), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Did it? |
| expect.clear() |
| expect.cronTickSequence(325298467681248558, 7, time.Hour) |
| expect.invocationSequence(9200089746854060480) |
| expect.triage() |
| So(tasks.Payloads(), ShouldResembleProto, expect.Tasks) |
| }) |
| |
| Convey("disabling", func() { |
| // Let the TQ spin until it hits the end of task execution. |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| ShouldStopAfter: func(t tqtesting.Task) bool { |
| _, ok := t.Payload.(*internal.InvocationFinishedTask) |
| return ok |
| }, |
| }) |
| So(err, ShouldBeNil) |
| |
| // At this point we disable the job. |
| So(e.UpdateProjectJobs(c, "project", nil), ShouldBeNil) |
| |
| // We let the TQ spin some more... |
| moreTasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| Deadline: epoch.Add(time.Hour), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Here's what we expect to execute. |
| expect := expectedTasks{JobID: testJobID, Epoch: epoch} |
| // 10 sec after the job is created, a tick comes and emits a trigger. |
| expect.cronTickSequence(6278013164014963328, 3, 10*time.Second) |
| // It causes an invocation. We disable the job after that. |
| expect.invocationSequence(9200093511241999856) |
| // Disabling kicks the triage, which later executes. |
| expect.kickTriage() |
| expect.triage() |
| // and nothing else happens ... |
| |
| // Got it? |
| So(append(tasks, moreTasks...).Payloads(), ShouldResembleProto, expect.Tasks) |
| }) |
| }) |
| |
| Convey("absolute schedule", func() { |
| updateJob("5,10 * * * * * *") // on 5th and 10th sec |
| |
| Convey("happy path", func() { |
| // Let the TQ spin for two full cycles. |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| Deadline: epoch.Add(14 * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Collect the list of TQ tasks we expect to be executed. |
| expect := expectedTasks{JobID: testJobID, Epoch: epoch} |
| // 5 sec after the job is created, a tick comes and emits a trigger. |
| expect.cronTickSequence(6278013164014963328, 3, 5*time.Second) |
| // It causes an invocation. |
| expect.invocationSequence(9200093517533908688) |
| expect.triage() |
| // Next tick comes right on schedule, 10 sec after the start. |
| expect.cronTickSequence(2673062197574995716, 6, 10*time.Second) |
| // It causes an invocation. |
| expect.invocationSequence(9200093511241900480) |
| expect.triage() |
| // ... and so on |
| |
| So(tasks.Payloads(), ShouldResembleProto, expect.Tasks) |
| }) |
| |
| Convey("overrun", func() { |
| // Currently cron just keeps submitting triggers that ends up in the |
| // pending triggers queue if there's some invocation currently running. |
| // Once the invocation finishes, the next one start right away (just |
| // like with any other kind of trigger). Overruns are not recorded. |
| |
| // Simulate "long" task with timers. |
| mgr.launchTask = func(ctx context.Context, ctl task.Controller) error { |
| ctl.AddTimer(ctx, 6*time.Second, "6 sec", nil) |
| ctl.State().Status = task.StatusRunning |
| return nil |
| } |
| mgr.handleTimer = func(ctx context.Context, ctl task.Controller, name string, payload []byte) error { |
| ctl.State().Status = task.StatusSucceeded |
| return nil |
| } |
| |
| // Let the TQ spin for two full cycles. |
| tasks, _, err := tq.RunSimulation(c, &tqtesting.SimulationParams{ |
| Deadline: epoch.Add(15 * time.Second), |
| }) |
| So(err, ShouldBeNil) |
| |
| // Collect the list of TQ tasks we expect to be executed. |
| expect := expectedTasks{JobID: testJobID, Epoch: epoch} |
| // 5 sec after the job is created, a tick comes and emits a trigger. |
| expect.cronTickSequence(6278013164014963328, 3, 5*time.Second) |
| // It causes an invocation to start (and keep running). |
| expect.invocationStart(9200093517533908688) |
| // The next tick comes right on the schedule, but it doesn't start an |
| // invocation yet, just submits a trigger. |
| expect.cronTickSequence(2673062197574995716, 6, 10*time.Second) |
| // A scheduled invocation timer arrives and finishes the invocation. |
| expect.invocationTimer(9200093517533908688, 1, "6 sec", 7*time.Second, 13*time.Second) |
| expect.invocationEnd(9200093517533908688) |
| expect.triage() |
| // The triage detects pending cron trigger and launches a new invocation |
| // right away. |
| expect.invocationStart(9200093509144748480) |
| |
| So(tasks.Payloads(), ShouldResembleProto, expect.Tasks) |
| }) |
| }) |
| }) |
| } |
| |
| func noopTaskBytes() []byte { |
| buf, _ := proto.Marshal(&messages.TaskDefWrapper{Noop: &messages.NoopTask{}}) |
| return buf |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| func forceInvocation(c context.Context, e *engineImpl, jobID string) (inv *Invocation) { |
| err := e.jobTxn(c, jobID, func(c context.Context, job *Job, isNew bool) (err error) { |
| invs, err := e.enqueueInvocations(c, job, []task.Request{ |
| {}, |
| }) |
| if err != nil { |
| return err |
| } |
| inv = invs[0] |
| return nil |
| }) |
| if err != nil { |
| panic(err) |
| } |
| return |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| type expectedTasks struct { |
| JobID string |
| Epoch time.Time |
| Tasks []protoiface.MessageV1 |
| } |
| |
| func (e *expectedTasks) clear() { |
| e.Tasks = nil |
| } |
| |
| func (e *expectedTasks) triage() { |
| e.Tasks = append(e.Tasks, &internal.TriageJobStateTask{JobId: e.JobID}) |
| } |
| |
| func (e *expectedTasks) kickTriage() { |
| e.Tasks = append(e.Tasks, &internal.KickTriageTask{JobId: e.JobID}) |
| } |
| |
| func (e *expectedTasks) cronTickSequence(nonce, gen int64, when time.Duration) { |
| e.Tasks = append(e.Tasks, |
| &internal.CronTickTask{JobId: e.JobID, TickNonce: nonce}, |
| &internal.EnqueueTriggersTask{ |
| JobId: e.JobID, |
| Triggers: []*internal.Trigger{ |
| { |
| Id: fmt.Sprintf("cron:v1:%d", gen), |
| Created: timestamppb.New(e.Epoch.Add(when)), |
| Payload: &internal.Trigger_Cron{ |
| Cron: &api.CronTrigger{Generation: gen}, |
| }, |
| }, |
| }, |
| }, |
| ) |
| e.triage() |
| } |
| |
| func (e *expectedTasks) invocationSequence(invID int64) { |
| e.invocationStart(invID) |
| e.invocationEnd(invID) |
| } |
| |
| func (e *expectedTasks) invocationStart(invID int64) { |
| e.Tasks = append(e.Tasks, |
| &internal.LaunchInvocationsBatchTask{ |
| Tasks: []*internal.LaunchInvocationTask{ |
| {JobId: e.JobID, InvId: invID}, |
| }, |
| }, |
| &internal.LaunchInvocationTask{JobId: e.JobID, InvId: invID}, |
| ) |
| } |
| |
| func (e *expectedTasks) invocationEnd(invID int64) { |
| e.Tasks = append(e.Tasks, &internal.InvocationFinishedTask{JobId: e.JobID, InvId: invID}) |
| } |
| |
| func (e *expectedTasks) invocationTimer(invID, seq int64, title string, created, eta time.Duration) { |
| e.Tasks = append(e.Tasks, &internal.TimerTask{ |
| JobId: e.JobID, |
| InvId: invID, |
| Timer: &internal.Timer{ |
| Id: fmt.Sprintf("%s:%d:%d:0", e.JobID, invID, seq), |
| Title: title, |
| Created: timestamppb.New(e.Epoch.Add(created)), |
| Eta: timestamppb.New(e.Epoch.Add(eta)), |
| }, |
| }) |
| } |