blob: eaf62c295b3a0e630ac89daef000def26493c54c [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package reminder holds Reminder to avoid circular dependencies.
package reminder
import (
"time"
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/server/tq/internal/tqpb"
)
// FreshUntilPrecision is precision of Reminder.FreshUntil, to which it is
// always truncated.
const FreshUntilPrecision = time.Millisecond
// Reminder reminds to enqueue a task.
//
// It is persisted transactionally with some other user logic to the database.
// Later, a task is actually scheduled and a reminder can be deleted
// non-transactionally.
//
// Its payload is represented either by a raw byte buffer (when the reminder
// is stored and loaded), or by a more complex Go value (when the reminder
// is manipulated by Dispatcher and Submitter). The Go value representation
// is described by Payload struct and it can be "attached" to the reminder
// via AttachReminder() or deserialized from the raw byte buffer via Payload().
type Reminder struct {
// ID identifies a reminder.
//
// ID values are always in hex-encoded and are well distributed in keyspace.
ID string
// FreshUntil is the expected time by which the happy path should complete.
//
// If the sweeper encounters a Reminder before this time, the sweeper ignores
// it to allow the happy path to complete.
//
// Truncated to FreshUntilPrecision.
FreshUntil time.Time
// RawPayload is a proto-serialized tqpb.Payload.
//
// It is what is actually stored in the database.
RawPayload []byte
// payload, if non-nil, is attached (or deserialized) payload.
payload *Payload
}
// AttachPayload attaches the given payload to this reminder.
//
// It mutates `p` with reminder's ID, which should already be populated.
//
// Panics if `r` has a payload attached already.
func (r *Reminder) AttachPayload(p *Payload) error {
if r.payload != nil {
panic("the reminder has a payload attached already")
}
p.injectReminderID(r.ID)
msg := &tqpb.Payload{
TaskClass: p.TaskClass,
Created: timestamppb.New(p.Created),
}
switch {
case p.CreateTaskRequest != nil:
blob, err := proto.Marshal(p.CreateTaskRequest)
if err != nil {
return errors.Annotate(err, "failed to marshal CreateTaskRequest").Err()
}
msg.Payload = &tqpb.Payload_CreateTaskRequest{CreateTaskRequest: blob}
case p.PublishRequest != nil:
blob, err := proto.Marshal(p.PublishRequest)
if err != nil {
return errors.Annotate(err, "failed to marshal PublishRequest").Err()
}
msg.Payload = &tqpb.Payload_PublishRequest{PublishRequest: blob}
default:
panic("malformed payload")
}
raw, err := proto.Marshal(msg)
if err != nil {
return errors.Annotate(err, "failed to marshal Payload").Err()
}
r.RawPayload = raw
r.payload = p
return nil
}
// DropPayload returns a copy of the reminder without attached payload.
func (r *Reminder) DropPayload() *Reminder {
return &Reminder{
ID: r.ID,
FreshUntil: r.FreshUntil,
RawPayload: r.RawPayload,
}
}
// MustHavePayload returns an attached payload or panics if `r` doesn't have
// a payload attached.
//
// Does not attempt to deserialize RawPayload.
func (r *Reminder) MustHavePayload() *Payload {
if r.payload == nil {
panic("the reminder doesn't have a payload attached")
}
return r.payload
}
// Payload returns an attached payload, perhaps deserializing it first.
func (r *Reminder) Payload() (*Payload, error) {
if r.payload != nil {
return r.payload, nil
}
var msg tqpb.Payload
if err := proto.Unmarshal(r.RawPayload, &msg); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal Payload").Err()
}
p := &Payload{
TaskClass: msg.TaskClass,
Created: msg.Created.AsTime(),
}
switch blob := msg.Payload.(type) {
case *tqpb.Payload_CreateTaskRequest:
req := &taskspb.CreateTaskRequest{}
if err := proto.Unmarshal(blob.CreateTaskRequest, req); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal CreateTaskRequest").Err()
}
p.CreateTaskRequest = req
case *tqpb.Payload_PublishRequest:
req := &pubsubpb.PublishRequest{}
if err := proto.Unmarshal(blob.PublishRequest, req); err != nil {
return nil, errors.Annotate(err, "failed to unmarshal PublishRequest").Err()
}
p.PublishRequest = req
default:
return nil, errors.New("unrecognized task payload kind")
}
r.payload = p
return p, nil
}