blob: 201511123d72898c1cf2702463e544c6c2e35f8b [file] [log] [blame]
// Copyright 2024 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package dsutils
import (
cloudds ""
func init() {
type ReadOptions struct {
// HexPrefixLength is the minimum guaranteed length of hex prefix in
// `ParentKind`'s key. The hex prefix will be used to divide the read queries
// to make it run faster should the beam runner decides more parallelism is
// needed.
HexPrefixLength int
// OutputBatchSize controls the number of datastore keys in a given output
// batch.
OutputBatchSize int
// MinEstimatedCount is the count used as a floor when estimating the number
// of entities of the specified Kind in a namespace when the stats is not
// available or is too small. This can happen when the kind was just added to
// the namespace and stats hasn't been updated yet.
MinEstimatedCount int64
// InitialSplitSize is the size used to split the query DoFn on start up.
InitialSplitSize int64
// GetAllKeysWithHexPrefix queries all the keys from datastore for the given
// kind in the given namespaces (in a PCollection<string>) and returns
// PCollection<KeyBatch>.
func GetAllKeysWithHexPrefix(
s beam.Scope,
cloudProject string,
namespaces beam.PCollection,
kind string,
opts ReadOptions,
) beam.PCollection {
if opts.OutputBatchSize < 1 {
opts.OutputBatchSize = 1
s = s.Scope(fmt.Sprintf("datastore.GetAllKeysWithHexPrefix.%s.%s", cloudProject, kind))
namespaces = beam.Reshuffle(s, namespaces)
namespacesWithCount := beam.ParDo(s, &getEstimatedCountFn{
CloudProject: cloudProject,
Kind: kind,
MinEstimatedCount: opts.MinEstimatedCount,
}, namespaces)
return beam.ParDo(s, &getAllKeysWithHexPrefixFn{
CloudProject: cloudProject,
Kind: kind,
HexPrefixLength: opts.HexPrefixLength,
OutputBatchSize: opts.OutputBatchSize,
InitialSplitSize: opts.InitialSplitSize,
}, namespacesWithCount)
type getEstimatedCountFn struct {
CloudProject string
Kind string
MinEstimatedCount int64
withDatastoreEnv func(context.Context) context.Context
// Setup implements beam DoFn protocol.
func (fn *getEstimatedCountFn) Setup(ctx context.Context) error {
if fn.withDatastoreEnv == nil {
client, err := cloudds.NewClient(ctx, fn.CloudProject, option.WithEndpoint(""))
if err != nil {
return errors.Annotate(err, "failed to construct cloud datastore client").Err()
fn.withDatastoreEnv = func(ctx context.Context) context.Context {
return (&cloud.ConfigLite{
ProjectID: fn.CloudProject,
DS: client,
return nil
type NamespaceCount struct {
Namespace string
EstimatedCount int64
type kindStat struct {
Key *datastore.Key `gae:"$key"`
Count int64 `gae:"count,noindex"`
Extra datastore.PropertyMap `gae:",extra"`
// ProcessElement implements beam DoFn protocol.
func (fn *getEstimatedCountFn) ProcessElement(
ctx context.Context,
namespace string,
emit func(NamespaceCount),
) error {
ctx = fn.withDatastoreEnv(ctx)
ctx, err := info.Namespace(ctx, namespace)
if err != nil {
return errors.Annotate(err, "failed to apply namespace: %s", namespace).Err()
stat := kindStat{
Key: datastore.MakeKey(ctx, "__Stat_Ns_Kind__", fn.Kind),
Count: 0,
err = datastore.Get(ctx, &stat)
if err != nil {
if !errors.Is(err, datastore.ErrNoSuchEntity) {
return errors.Annotate(err, "failed to query stats for kind `%s` in namespace `%s`", fn.Kind, namespace).Err()
if stat.Count < fn.MinEstimatedCount {
log.Warnf(ctx, "Datastore: there are only `%d` `%s` recorded in namespace `%s`. The minimum size `%d` will be used.",
stat.Count, fn.Kind, namespace, fn.MinEstimatedCount)
stat.Count = fn.MinEstimatedCount
} else {
log.Infof(ctx, "Datastore: there are `%d` `%s` in namespace `%s`.", stat.Count, fn.Kind, namespace)
emit(NamespaceCount{Namespace: namespace, EstimatedCount: stat.Count})
return nil
type getAllKeysWithHexPrefixFn struct {
CloudProject string
Kind string
HexPrefixLength int
OutputBatchSize int
InitialSplitSize int64
withDatastoreEnv func(context.Context) context.Context
emittedKeys beam.Counter
emittedBatches beam.Counter
// Setup implements beam DoFn protocol.
func (fn *getAllKeysWithHexPrefixFn) Setup(ctx context.Context) error {
if fn.withDatastoreEnv == nil {
client, err := cloudds.NewClient(ctx, fn.CloudProject, option.WithEndpoint(""))
if err != nil {
return errors.Annotate(err, "failed to construct cloud datastore client").Err()
fn.withDatastoreEnv = func(ctx context.Context) context.Context {
return (&cloud.ConfigLite{
ProjectID: fn.CloudProject,
DS: client,
namespace := fmt.Sprintf("datastore.get-all-keys-with-hex-prefix.%s.%s", fn.CloudProject, fn.Kind)
fn.emittedKeys = beam.NewCounter(namespace, "emitted-keys")
fn.emittedBatches = beam.NewCounter(namespace, "emitted-batches")
return nil
// CreateInitialRestriction implements beam DoFn protocol.
func (fn *getAllKeysWithHexPrefixFn) CreateInitialRestriction(ctx context.Context, nc NamespaceCount) hexPrefixRestriction {
return hexPrefixRestriction{
HexPrefixLength: fn.HexPrefixLength,
StartIsExclusive: false,
Start: "",
EndIsUnbounded: true,
EndIsExclusive: false,
End: "",
// CreateTracker implements beam DoFn protocol.
func (fn *getAllKeysWithHexPrefixFn) CreateTracker(ctx context.Context, restriction hexPrefixRestriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(newHexPrefixRestrictionTracker(restriction))
// SplitRestriction implements beam DoFn protocol.
func (fn *getAllKeysWithHexPrefixFn) SplitRestriction(ctx context.Context, nc NamespaceCount, restriction hexPrefixRestriction) (splits []hexPrefixRestriction, err error) {
initialSplitCount := nc.EstimatedCount / fn.InitialSplitSize
if initialSplitCount < 1 {
initialSplitCount = 1
weights := make([]int64, 0, initialSplitCount)
for i := 0; i < int(initialSplitCount); i++ {
weights = append(weights, 1)
return restriction.Split(weights)
// RestrictionSize implements beam DoFn protocol.
func (fn *getAllKeysWithHexPrefixFn) RestrictionSize(ctx context.Context, nc NamespaceCount, restriction hexPrefixRestriction) float64 {
return restriction.Ratio() * float64(nc.EstimatedCount)
type KeyBatch struct {
Namespace string
Keys []*datastore.Key
// ProcessElement implements beam DoFn protocol.
func (fn *getAllKeysWithHexPrefixFn) ProcessElement(
ctx context.Context,
rt *sdf.LockRTracker,
nc NamespaceCount,
emit func(KeyBatch),
) (sdf.ProcessContinuation, error) {
ctx = fn.withDatastoreEnv(ctx)
ctx, err := info.Namespace(ctx, nc.Namespace)
if err != nil {
return sdf.StopProcessing(), errors.Annotate(err, "failed to apply namespace: %s", nc.Namespace).Err()
restriction := rt.GetRestriction().(hexPrefixRestriction)
log.Infof(ctx, "Datastore: processing Namespace `%s` Range %s", nc.Namespace, restriction.RangeString())
q := datastore.NewQuery(fn.Kind).KeysOnly(true)
// If start == "", its practically unbounded. We don't need to apply the
// filter. And we cannot apply an empty key anyway otherwise datastore will
// report an error.
if restriction.Start != "" {
startKey := datastore.MakeKey(ctx, fn.Kind, restriction.Start)
if restriction.StartIsExclusive {
q = q.Gt("__key__", startKey)
} else {
q = q.Gte("__key__", startKey)
if !restriction.EndIsUnbounded {
// Key token cannot be empty otherwise datastore will report an error. When
// end is bounded to "", nothing can be smaller than it. Short-circuit it.
if restriction.End == "" {
return sdf.StopProcessing(), nil
endKey := datastore.MakeKey(ctx, fn.Kind, restriction.End)
if restriction.EndIsExclusive {
q = q.Lt("__key__", endKey)
} else {
q = q.Lte("__key__", endKey)
claimedKeys := make([]*datastore.Key, 0, fn.OutputBatchSize)
emitClaimedKeys := func() {
if len(claimedKeys) == 0 {
// We cannot batch keys in the same namespace in a later stage without
// using a GBK (GroupByKey) or something similar. We want to avoid GBK
// because
// 1. GBK prevents stage fusion, which leads to unnecessary IO between
// stages.
// 2. GBK can lead to OOM when certain keys are very large.
// 3. In batch mode, GBK stops the next stage from executing until all
// elements are collected.
// Therefore, we need to emit batches instead of individual keys here.
emit(KeyBatch{Namespace: nc.Namespace, Keys: claimedKeys})
fn.emittedKeys.Inc(ctx, int64(len(claimedKeys)))
fn.emittedBatches.Inc(ctx, 1)
claimedKeys = make([]*datastore.Key, 0, fn.OutputBatchSize)
// We already claimed these keys from the restriction tracker. Always emit the
// final batch of claimed keys, even when there was an error.
defer emitClaimedKeys()
lastClaimed := ""
claimedCount := 0
err = datastore.Run(ctx, q, func(key *datastore.Key) error {
claim := key.StringID()
if !rt.TryClaim(HexPosClaim{Value: claim}) {
return datastore.Stop
claimedCount += 1
lastClaimed = claim
claimedKeys = append(claimedKeys, key)
if len(claimedKeys) < fn.OutputBatchSize {
return nil
return nil
if err != nil {
// Log the error and try again in 10 mins.
err = errors.Annotate(err, "failed to run bounded query Namespace `%s` Range: `%s`, Claimed: %d, Last Claimed: `%s`",
nc.Namespace, restriction.RangeString(), claimedCount, lastClaimed).Err()
log.Errorf(ctx, "%v", err)
// This will trigger a self-checkpointing split so we don't need to retry
// the entire key range.
// Returning the error directly will cause the entire restriction to be
// retried up to 4 times (runner dependent).
return sdf.ResumeProcessingIn(10 * time.Minute), nil
rt.TryClaim(HexPosClaim{End: true})
// The restriction might have been split. Log the actual restriction we
// completed.
finalRestriction := rt.GetRestriction().(hexPrefixRestriction)
log.Infof(ctx, "Datastore: finished processing Namespace `%s` Range %s (was %s), claimed %d keys",
nc.Namespace, finalRestriction.RangeString(), restriction.RangeString(), claimedCount)
return sdf.StopProcessing(), nil