blob: 23e314401ce8af9d537dae25ba441cb56df8ef1e [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastore
import (
"context"
"fmt"
"strconv"
"strings"
"time"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/gae/filter/txndefer"
ds "go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/server/tq/internal/reminder"
)
type dsDB struct{}
func (dsDB) Kind() string {
return "datastore"
}
func (dsDB) Defer(ctx context.Context, cb func(context.Context)) {
txndefer.Defer(ctx, cb)
}
const reminderKind = "tq.Reminder"
type dsReminder struct {
_kind string `gae:"$kind,tq.Reminder"`
ID string `gae:"$id"` // "{Reminder.ID}_{Reminder.FreshUntil}".
Payload []byte `gae:",noindex"`
}
func (d *dsReminder) fromReminder(r *reminder.Reminder) *dsReminder {
d.ID = fmt.Sprintf("%s_%d", r.ID, r.FreshUntil.UnixNano())
d.Payload = r.RawPayload
return d
}
func (d dsReminder) toReminder(r *reminder.Reminder) *reminder.Reminder {
parts := strings.Split(d.ID, "_")
if len(parts) != 2 {
panic(errors.Reason("malformed dsReminder ID %q", d.ID).Err())
}
ns, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
panic(errors.Reason("malformed dsReminder ID %q: %s", d.ID, err).Err())
}
if r == nil {
r = &reminder.Reminder{}
}
r.ID = parts[0]
r.FreshUntil = time.Unix(0, ns).UTC()
r.RawPayload = d.Payload
return r
}
// SaveReminder persists reminder in a transaction context.
func (dsDB) SaveReminder(ctx context.Context, r *reminder.Reminder) error {
v := dsReminder{}
if err := ds.Put(ctx, v.fromReminder(r)); err != nil {
return errors.Annotate(err, "failed to persist to datastore").Tag(transient.Tag).Err()
}
return nil
}
// DeleteReminder deletes reminder in a non-tranasction context.
func (dsDB) DeleteReminder(ctx context.Context, r *reminder.Reminder) error {
v := dsReminder{}
if err := ds.Delete(ctx, v.fromReminder(r)); err != nil {
return errors.Annotate(err, "failed to delete the Reminder %s", r.ID).Tag(transient.Tag).Err()
}
return nil
}
// FetchRemindersMeta fetches Reminders with Ids in [low..high) range.
//
// Payload of Reminders should not be fetched.
// Both fresh & stale reminders should be fetched.
// The reminders should be returned in order of ascending Id.
//
// In case of error, partial result of fetched Reminders so far should be
// returned alongside the error. The caller will later call this method again
// to fetch the remaining of Reminders in range of [<lastReturned.ID+1> .. high).
func (dsDB) FetchRemindersMeta(ctx context.Context, low string, high string, limit int) (items []*reminder.Reminder, err error) {
q := ds.NewQuery(reminderKind).Order("__key__")
q = q.Gte("__key__", ds.NewKey(ctx, reminderKind, low, 0, nil))
q = q.Lt("__key__", ds.NewKey(ctx, reminderKind, high, 0, nil))
q = q.Limit(int32(limit)).KeysOnly(true)
err = ds.Run(ctx, q, func(k *ds.Key) {
items = append(items, dsReminder{ID: k.StringID()}.toReminder(nil))
})
if err != nil && err != context.DeadlineExceeded {
err = errors.Annotate(err, "failed to fetch Reminder keys").Tag(transient.Tag).Err()
}
return
}
// FetchReminderRawPayloads fetches payloads of a batch of Reminders.
//
// The Reminder objects are re-used in the returned batch.
// If any Reminder is no longer found, it is silently omitted in the returned
// batch.
// In case of any other error, partial result of fetched Reminders so far
// should be returned alongside the error.
func (dsDB) FetchReminderRawPayloads(ctx context.Context, batch []*reminder.Reminder) ([]*reminder.Reminder, error) {
vs := make([]*dsReminder, len(batch))
for i, r := range batch {
vs[i] = (&dsReminder{}).fromReminder(r)
}
err := ds.Get(ctx, vs)
merr, ok := err.(errors.MultiError)
if err != nil && !ok {
return nil, errors.Annotate(err, "failed to fetch Reminders").Tag(transient.Tag).Err()
}
res := make([]*reminder.Reminder, 0, len(batch))
// Copy reminders with loaded payloads to result and move to the left errors
// in MultiError which are not expected.
ei := 0
for i, v := range vs {
switch {
case merr == nil || merr[i] == nil:
res = append(res, v.toReminder(batch[i]))
case merr[i] != ds.ErrNoSuchEntity:
merr[ei] = merr[i]
ei++
}
}
if ei == 0 {
return res, nil
}
return res, merr[:ei]
}