blob: 211b6b8504fa4256ff277cdef012b42e6f2bdb40 [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package tasks contains task queue implementations.
package tasks
import (
"context"
"encoding/json"
"fmt"
"time"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/buildbucket/appengine/internal/resultdb"
taskdefs "go.chromium.org/luci/buildbucket/appengine/tasks/defs"
pb "go.chromium.org/luci/buildbucket/proto"
// Enable datastore transactional tasks support.
_ "go.chromium.org/luci/server/tq/txn/datastore"
)
// rejectionHandler returns a tq.Handler which rejects the given task.
// Used by tasks which are handled in Python.
// TODO(crbug/1042991): Remove once all handlers are implemented in Go.
func rejectionHandler(tq string) tq.Handler {
return func(ctx context.Context, payload proto.Message) error {
logging.Errorf(ctx, "tried to handle %s: %q", tq, payload)
return errors.Reason("handler called").Err()
}
}
func init() {
tq.RegisterTaskClass(tq.TaskClass{
ID: "cancel-backend-task",
Kind: tq.FollowsContext,
Prototype: (*taskdefs.CancelBackendTask)(nil),
Queue: "cancel-backend-task",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.CancelBackendTask)
return HandleCancelBackendTask(ctx, t.Project, t.Target, t.TaskId)
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "cancel-swarming-task",
Custom: func(ctx context.Context, m proto.Message) (*tq.CustomPayload, error) {
task := m.(*taskdefs.CancelSwarmingTask)
body, err := json.Marshal(map[string]any{
"hostname": task.Hostname,
"task_id": task.TaskId,
"realm": task.Realm,
})
if err != nil {
return nil, errors.Annotate(err, "error marshaling payload").Err()
}
return &tq.CustomPayload{
Body: body,
Method: "POST",
RelativeURI: fmt.Sprintf("/internal/task/buildbucket/cancel_swarming_task/%s/%s", task.Hostname, task.TaskId),
}, nil
},
Handler: rejectionHandler("cancel-swarming-task"),
Kind: tq.FollowsContext,
Prototype: (*taskdefs.CancelSwarmingTask)(nil),
Queue: "backend-default",
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "cancel-swarming-task-go",
Kind: tq.FollowsContext,
Prototype: (*taskdefs.CancelSwarmingTaskGo)(nil),
Queue: "backend-go-default",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.CancelSwarmingTaskGo)
return HandleCancelSwarmingTask(ctx, t.Hostname, t.TaskId, t.Realm)
},
})
// TODO(crbug.com/1328646): Delete it after swarming-build-create migration is done.
tq.RegisterTaskClass(tq.TaskClass{
ID: "create-swarming-task",
Custom: func(ctx context.Context, m proto.Message) (*tq.CustomPayload, error) {
task := m.(*taskdefs.CreateSwarmingTask)
body, err := json.Marshal(map[string]any{
"generation": 0,
"id": task.BuildId,
})
if err != nil {
return nil, errors.Annotate(err, "error marshaling payload").Err()
}
return &tq.CustomPayload{
Body: body,
Method: "POST",
RelativeURI: fmt.Sprintf("/internal/task/swarming/sync-build/%d", task.BuildId),
}, nil
},
Handler: rejectionHandler("create-swarming-task"),
Kind: tq.Transactional,
Prototype: (*taskdefs.CreateSwarmingTask)(nil),
Queue: "swarming-build-create",
})
// TODO(crbug.com/1356766): remove it after bq-exporter runs in Go.
tq.RegisterTaskClass(tq.TaskClass{
ID: "export-bigquery",
Custom: func(ctx context.Context, m proto.Message) (*tq.CustomPayload, error) {
task := m.(*taskdefs.ExportBigQuery)
body, err := json.Marshal(map[string]any{
"id": task.BuildId,
})
if err != nil {
return nil, errors.Annotate(err, "error marshaling payload").Err()
}
return &tq.CustomPayload{
Body: body,
Method: "POST",
RelativeURI: fmt.Sprintf("/internal/task/bq/export/%d", task.BuildId),
}, nil
},
Handler: rejectionHandler("export-bigquery"),
Kind: tq.Transactional,
Prototype: (*taskdefs.ExportBigQuery)(nil),
Queue: "backend-default",
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "finalize-resultdb",
Custom: func(ctx context.Context, m proto.Message) (*tq.CustomPayload, error) {
task := m.(*taskdefs.FinalizeResultDB)
body, err := json.Marshal(map[string]any{
"id": task.BuildId,
})
if err != nil {
return nil, errors.Annotate(err, "error marshaling payload").Err()
}
return &tq.CustomPayload{
Body: body,
Method: "POST",
RelativeURI: fmt.Sprintf("/internal/task/resultdb/finalize/%d", task.BuildId),
}, nil
},
Handler: rejectionHandler("finalize-resultdb"),
Kind: tq.Transactional,
Prototype: (*taskdefs.FinalizeResultDB)(nil),
Queue: "backend-default",
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "finalize-resultdb-go",
Kind: tq.Transactional,
Prototype: (*taskdefs.FinalizeResultDBGo)(nil),
Queue: "finalize-resultdb-go",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.FinalizeResultDBGo)
return resultdb.FinalizeInvocation(ctx, t.BuildId)
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "notify-pubsub",
Custom: func(ctx context.Context, m proto.Message) (*tq.CustomPayload, error) {
task := m.(*taskdefs.NotifyPubSub)
mode := "global"
if task.Callback {
mode = "callback"
}
body, err := json.Marshal(map[string]any{
"id": task.BuildId,
"mode": mode,
})
if err != nil {
return nil, errors.Annotate(err, "error marshaling payload").Err()
}
return &tq.CustomPayload{
Body: body,
Method: "POST",
RelativeURI: fmt.Sprintf("/internal/task/buildbucket/notify/%d", task.BuildId),
}, nil
},
Handler: rejectionHandler("notify-pubsub"),
Kind: tq.Transactional,
Prototype: (*taskdefs.NotifyPubSub)(nil),
Queue: "backend-default",
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "notify-pubsub-go",
Kind: tq.FollowsContext,
Prototype: (*taskdefs.NotifyPubSubGo)(nil),
Queue: "notify-pubsub-go",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.NotifyPubSubGo)
return PublishBuildsV2Notification(ctx, t.BuildId, t.Topic, t.Callback)
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "notify-pubsub-go-proxy",
Kind: tq.Transactional,
Prototype: (*taskdefs.NotifyPubSubGoProxy)(nil),
Queue: "notify-pubsub-go",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.NotifyPubSubGoProxy)
return EnqueueNotifyPubSubGo(ctx, t.BuildId, t.Project)
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "builds_v2",
Kind: tq.NonTransactional,
Prototype: (*pb.BuildsV2PubSub)(nil),
Topic: "builds_v2",
Custom: func(ctx context.Context, m proto.Message) (*tq.CustomPayload, error) {
t := m.(*pb.BuildsV2PubSub)
blob, err := (protojson.MarshalOptions{Indent: "\t"}).Marshal(m)
if err != nil {
logging.Errorf(ctx, "failed to marshal builds_v2 pubsub message body - %s", err)
return nil, err
}
return &tq.CustomPayload{
Body: blob,
Meta: generateBuildsV2Attributes(t.Build),
}, nil
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "cancel-build",
Kind: tq.Transactional,
Prototype: (*taskdefs.CancelBuildTask)(nil),
Queue: "cancel-build",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.CancelBuildTask)
_, err := Cancel(ctx, t.BuildId)
return err
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "create-swarming-task-go",
Kind: tq.Transactional,
Prototype: (*taskdefs.CreateSwarmingBuildTask)(nil),
Queue: "swarming-build-create-go",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.CreateSwarmingBuildTask)
return SyncBuild(ctx, t.GetBuildId(), 0)
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "create-backend-task-go",
Kind: tq.Transactional,
Prototype: (*taskdefs.CreateBackendBuildTask)(nil),
Queue: "create-backend-task-go",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.CreateBackendBuildTask)
return CreateBackendTask(ctx, t.GetBuildId(), t.GetRequestId())
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "sync-swarming-task-go",
Kind: tq.NonTransactional,
Prototype: (*taskdefs.SyncSwarmingBuildTask)(nil),
Queue: "swarming-build-sync-go",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.SyncSwarmingBuildTask)
return SyncBuild(ctx, t.GetBuildId(), t.GetGeneration())
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "export-bigquery-go",
Kind: tq.Transactional,
Prototype: (*taskdefs.ExportBigQueryGo)(nil),
Queue: "export-bigquery-go",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.ExportBigQueryGo)
return ExportBuild(ctx, t.BuildId)
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "sync-builds-with-backend-tasks",
Kind: tq.NonTransactional,
Prototype: (*taskdefs.SyncBuildsWithBackendTasks)(nil),
Queue: "sync-builds-with-backend-tasks",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.SyncBuildsWithBackendTasks)
return SyncBuildsWithBackendTasks(ctx, t.Backend, t.Project)
},
})
tq.RegisterTaskClass(tq.TaskClass{
ID: "check-build-liveness",
Kind: tq.FollowsContext,
Prototype: (*taskdefs.CheckBuildLiveness)(nil),
Queue: "check-build-liveness",
Handler: func(ctx context.Context, payload proto.Message) error {
t := payload.(*taskdefs.CheckBuildLiveness)
return CheckLiveness(ctx, t.BuildId, t.HeartbeatTimeout)
},
})
}
// CancelBackendTask enqueues a task queue task to cancel the given Backend
// task.
func CancelBackendTask(ctx context.Context, task *taskdefs.CancelBackendTask) error {
switch {
case task.Project == "":
return errors.Reason("project is required").Err()
case task.TaskId == "":
return errors.Reason("task_id is required").Err()
case task.Target == "":
return errors.Reason("task target is required").Err()
}
return tq.AddTask(ctx, &tq.Task{
Payload: task,
})
}
// CancelSwarmingTask enqueues a task queue task to cancel the given Swarming
// task.
func CancelSwarmingTask(ctx context.Context, task *taskdefs.CancelSwarmingTaskGo) error {
switch {
case task.GetHostname() == "":
return errors.Reason("hostname is required").Err()
case task.TaskId == "":
return errors.Reason("task_id is required").Err()
}
return tq.AddTask(ctx, &tq.Task{
Payload: task,
})
}
// CreateSwarmingTask enqueues a task queue task to create a Swarming task from
// the given build.
func CreateSwarmingTask(ctx context.Context, task *taskdefs.CreateSwarmingTask) error {
if task.GetBuildId() == 0 {
return errors.Reason("build_id is required").Err()
}
return tq.AddTask(ctx, &tq.Task{
Payload: task,
})
}
// CreateSwarmingBuildTask enqueues a Cloud Tasks task to create a Swarming task
// from the given build.
func CreateSwarmingBuildTask(ctx context.Context, task *taskdefs.CreateSwarmingBuildTask) error {
if task.GetBuildId() == 0 {
return errors.Reason("build_id is required").Err()
}
return tq.AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("create-swarming-task-%d", task.BuildId),
Payload: task,
})
}
// CreateBackendBuildTask enqueues a Cloud Tasks task to create a backend task
// from the given build.
func CreateBackendBuildTask(ctx context.Context, task *taskdefs.CreateBackendBuildTask) error {
if task.GetBuildId() == 0 {
return errors.Reason("build_id is required").Err()
}
return tq.AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("create-backend-task-%d", task.BuildId),
Payload: task,
})
}
// SyncSwarmingBuildTask enqueues a Cloud Tasks task to sync the Swarming task
// with the given build.
func SyncSwarmingBuildTask(ctx context.Context, task *taskdefs.SyncSwarmingBuildTask, delay time.Duration) error {
switch {
case task.GetBuildId() == 0:
return errors.Reason("build_id is required").Err()
case task.GetGeneration() == 0:
return errors.Reason("generation should be larger than 0").Err()
}
return tq.AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("sync-swarming-task-%d", task.BuildId),
Payload: task,
Delay: delay,
DeduplicationKey: fmt.Sprintf("%d-%d", task.BuildId, task.Generation),
})
}
// ExportBigQuery enqueues a task queue task to export the given build to Bq.
// TODO(crbug.com/1356766): routeToGo will be removed once migration is done.
func ExportBigQuery(ctx context.Context, buildID int64, routeToGo bool) error {
if buildID <= 0 {
return errors.Reason("build_id is invalid").Err()
}
if routeToGo {
return tq.AddTask(ctx, &tq.Task{
Payload: &taskdefs.ExportBigQueryGo{BuildId: buildID},
})
}
return tq.AddTask(ctx, &tq.Task{
Payload: &taskdefs.ExportBigQuery{BuildId: buildID},
})
}
// FinalizeResultDB enqueues a task queue task to finalize the invocation for
// the given build in ResultDB.
func FinalizeResultDB(ctx context.Context, task *taskdefs.FinalizeResultDBGo) error {
if task.GetBuildId() == 0 {
return errors.Reason("build_id is required").Err()
}
return tq.AddTask(ctx, &tq.Task{
Payload: task,
})
}
// SyncWithBackend enqueues a task queue task to sync builds in one project
// running on a backend.
func SyncWithBackend(ctx context.Context, backend, project string) error {
switch {
case backend == "":
return errors.Reason("backend is required").Err()
case project == "":
return errors.Reason("project is required").Err()
}
return tq.AddTask(ctx, &tq.Task{
Payload: &taskdefs.SyncBuildsWithBackendTasks{
Backend: backend,
Project: project,
},
})
}
// CheckBuildLiveness enqueues a task queue task to check a build's liveness.
func CheckBuildLiveness(ctx context.Context, buildID int64, heartbeatTimeout uint32, delay time.Duration) error {
if buildID <= 0 {
return errors.Reason("build_id is invalid").Err()
}
return tq.AddTask(ctx, &tq.Task{
Payload: &taskdefs.CheckBuildLiveness{
BuildId: buildID,
HeartbeatTimeout: heartbeatTimeout,
},
Delay: delay,
})
}