blob: f2646694c9826991dd551dab4cfa773b4238010c [file] [log] [blame]
// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datastorecache
import (
"context"
"fmt"
"net/http"
"strings"
"sync/atomic"
"time"
"go.chromium.org/luci/appengine/memlock"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
log "go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/server/router"
"go.chromium.org/gae/service/datastore"
"go.chromium.org/gae/service/info"
"github.com/julienschmidt/httprouter"
)
func errHTTPHandler(fn func(ctx context.Context, req *http.Request, params httprouter.Params) error) router.Handler {
return func(ctx *router.Context) {
err := fn(ctx.Context, ctx.Request, ctx.Params)
if err == nil {
// Handler returned no error, everything is good.
return
}
// Handler returned an error, dump it to output.
ctx.Writer.WriteHeader(http.StatusInternalServerError)
// Log all of our stack lines individually, so we don't overflow the
// maximum log message size with a full stack.
stk := errors.RenderStack(err)
log.WithError(err).Errorf(ctx.Context, "Handler returned error.")
for _, line := range stk {
log.Errorf(ctx.Context, ">> %s", line)
}
dumpErr := func() error {
for _, line := range stk {
if _, err := ctx.Writer.Write([]byte(line + "\n")); err != nil {
return err
}
}
return nil
}()
if dumpErr != nil {
log.WithError(dumpErr).Errorf(ctx.Context, "Failed to dump error stack.")
}
}
}
// manager is initialized to perform the management cron task.
type manager struct {
cache *Cache
// queryBatchSize is the size of the query batch to run. This must be >0.
queryBatchSize int32
}
// installCronRoute installs a handler for this manager's cron task into the
// supplied Router at the specified path.
//
// It is recommended to assert in the middleware that this endpoint is only
// accessible from a cron task.
func (m *manager) installCronRoute(path string, r *router.Router, base router.MiddlewareChain) {
r.GET(path, base, errHTTPHandler(m.handleManageCronGET))
}
func (m *manager) handleManageCronGET(ctx context.Context, req *http.Request, params httprouter.Params) error {
var h Handler
if hf := m.cache.HandlerFunc; hf != nil {
h = hf(ctx)
}
// NOTE: All manager runs currently have exactly one shard, #0.
const shardID = 0
shard := managerShard{
manager: m,
h: h,
now: clock.Now(ctx).UTC(),
st: managerShardStats{
Shard: shardID + 1, // +1 b/c 0 is invalid ID.
},
clientID: strings.Join([]string{
"datastore_cache_manager",
info.RequestID(ctx),
}, "\x00"),
shard: 0,
shardKey: fmt.Sprintf("datastore_cache_manager_shard_%d", shardID),
}
return shard.run(ctx)
}
type managerShard struct {
*manager
// h is this manager's Handler. Note this can be nil, in which case the cached
// entries will be pruned eventually.
h Handler
clientID string
now time.Time
st managerShardStats
shard int
shardKey string
// entries is the number of observed cache entries.
entries int32
// errors is the number of errors encountered. If this is non-zero, our
// handler will return an error.
errors int32
}
func (ms *managerShard) observeEntry() { atomic.AddInt32(&ms.entries, 1) }
func (ms *managerShard) observeErrors(c int32) { atomic.AddInt32(&ms.errors, c) }
func (ms *managerShard) run(ctx context.Context) error {
// Enter our cache cacheNamespace. We'll leave this when calling Handler
// functions.
ctx = ms.cache.withNamespace(ctx)
ctx = log.SetField(ctx, "shard", ms.shard)
// Validate shard configuration.
switch {
case ms.queryBatchSize <= 0:
return errors.Reason("invalid query batch size %d", ms.queryBatchSize).Err()
}
// Take out a memlock on our cache shard.
return memlock.TryWithLock(ctx, ms.shardKey, ms.clientID, func(ctx context.Context) (rv error) {
// Output our stats on completion.
defer func() {
ms.st.LastEntryCount = int(ms.entries)
if rv == nil {
ms.st.LastSuccessfulRun = ms.now
}
if rv := datastore.Put(ctx, &ms.st); rv != nil {
log.WithError(rv).Errorf(ctx, "Failed to Put() stats on completion.")
}
}()
if err := ms.runLocked(ctx); err != nil {
return errors.Annotate(err, "running maintenance loop").Err()
}
// If we observed errors during processing, note this.
if ms.errors > 0 {
return errors.Reason("%d error(s) encountered during processing", ms.errors).Err()
}
return nil
})
}
// runLocked runs the main main maintenance loop.
//
// As the run is executed, stats can be collected in ms.st. These will be output
// to datastore on completion.
func (ms *managerShard) runLocked(ctx context.Context) error {
workers := ms.cache.Parallel
if workers <= 0 {
workers = 1
}
// NOTE: This does not currently restrain itself to the current shard. This
// can be done using a "__key__" inequality filter to bound the query.
prototype := entry{
CacheName: ms.cache.Name,
}
q := datastore.NewQuery(prototype.kind())
var (
totalEntries = 0
totalRefreshed = 0
totalPruned = 0
totalErrors = int32(0)
entries = make([]*entry, 0, ms.queryBatchSize)
putBuf = make([]*entry, ms.queryBatchSize)
deleteBuf = make([]*entry, ms.queryBatchSize)
)
// Calculate our pruning threshold. If an entry's "LastAccessed" is <= this
// threshold, it is candidate for pruning.
var pruneThreshold time.Time
if pi := ms.cache.pruneInterval(); pi > 0 {
pruneThreshold = ms.now.Add(-pi)
}
// handleEntries refreshes the accumulated entries. It is used as a callback
// in between query batches, as well as a finalizer after the queries
// complete.
//
// handleEntries is, itself, not goroutine-safe.
//
// It will refresh in parallel, adding entries to "putBuf" or "deleteBuf" as
// appropriate. At the end of its operation, all deferred datastore
// operations will execute, and the accumulated entries list will be purged
// for next round.
handleEntries := func(ctx context.Context) error {
if len(entries) == 0 {
return nil
}
// Use atomic-friendly int32 values to index the put/delete buffers. This
// will let our parallel goroutines safely add entries with very low
// overhead.
var putIdx, deleteIdx int32
putEntry := func(e *entry) { putBuf[atomic.AddInt32(&putIdx, 1)-1] = e }
deleteEntry := func(e *entry) { deleteBuf[atomic.AddInt32(&deleteIdx, 1)-1] = e }
// Process each entry in parallel.
//
// Each task will return a nil error, so the error result does not need to
// be observed.
_ = parallel.WorkPool(workers, func(taskC chan<- func() error) {
for _, e := range entries {
e := e
taskC <- func() error {
// Is this entry candidate for pruning?
if !(pruneThreshold.IsZero() || e.LastAccessed.After(pruneThreshold)) {
log.Fields{
"key": e.keyHash(),
"lastRefresh": e.LastRefreshed,
"lastAccessed": e.LastAccessed,
"pruneThreshold": pruneThreshold,
}.Infof(ctx, "Pruning expired cache entry.")
deleteEntry(e)
return nil
}
// Is this cache entry candidate for refresh?
if ms.h != nil {
refreshInterval := ms.h.RefreshInterval(e.Key)
if refreshInterval > 0 && !e.LastRefreshed.After(ms.now.Add(-refreshInterval)) {
// Call our Handler's Refresh function. We leave our cache namespace
// first.
switch value, delta, err := doRefresh(ctx, ms.h, e, ""); err {
case nil:
// Refresh successful! Update the entry.
//
// Even if he data hasn't changed, the LastRefreshed time has, and
// the cost of the "Put" is the same either way.
e.LastRefreshed = ms.now
e.LastRefreshDelta = int64(delta)
e.loadValue(value)
putEntry(e)
return nil
case ErrDeleteCacheEntry:
log.Fields{
"key": e.keyHash(),
}.Debugf(ctx, "Refresh requested entry deletion.")
deleteEntry(e)
return nil
default:
log.Fields{
log.ErrorKey: err,
"key": e.keyHash(),
"lastRefresh": e.LastRefreshed,
}.Errorf(ctx, "Failed to refresh cache entry.")
atomic.AddInt32(&totalErrors, 1)
}
}
}
return nil
}
}
})
// Clear our entries buffer for next round.
entries = entries[:0]
// Flush our put/delete buffers. Accumulate errors (best effort) and return
// them as a batch.
//
// A failure here is a datastore failure, and will halt processing by
// propagating from the callback to the query return value.
_ = parallel.FanOutIn(func(taskC chan<- func() error) {
if putIdx > 0 {
taskC <- func() error {
if err := datastore.Put(ctx, putBuf[:putIdx]); err != nil {
log.Fields{
log.ErrorKey: err,
"size": putIdx,
}.Errorf(ctx, "Failed to Put batch.")
atomic.AddInt32(&totalErrors, 1)
} else {
totalRefreshed += int(putIdx)
}
return nil
}
}
if deleteIdx > 0 {
taskC <- func() error {
if err := datastore.Delete(ctx, deleteBuf[:deleteIdx]); err != nil {
log.Fields{
log.ErrorKey: err,
"size": deleteIdx,
}.Errorf(ctx, "Failed to Delete batch.")
atomic.AddInt32(&totalErrors, 1)
} else {
totalPruned += int(deleteIdx)
}
return nil
}
}
})
return nil
}
err := datastore.RunBatch(ctx, ms.queryBatchSize, q, func(e *entry) error {
totalEntries++
ms.observeEntry()
entries = append(entries, e)
if len(entries) >= int(ms.queryBatchSize) {
// Hit the end of a query batch. Process entries.
handleEntries(ctx)
}
return nil
})
if err != nil {
return errors.Annotate(err, "failed to run entry query").Err()
}
// Flush any outstanding entries (ignore error, will always be nil).
_ = handleEntries(ctx)
if totalErrors > 0 {
ms.observeErrors(totalErrors)
}
log.Fields{
"entries": totalEntries,
"errors": totalErrors,
"refreshed": totalRefreshed,
"pruned": totalPruned,
}.Infof(ctx, "Successfully updated cache entries.")
return nil
}