// Copyright 2018 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package dsmapper
import (
// Need this to enqueue tasks inside Datastore transactions.
_ ""
// ID identifies a mapper registered in the controller.
// It will be passed across processes, so all processes that execute mapper jobs
// should register same mappers under same IDs.
// The safest approach is to keep mapper IDs in the app unique, e.g. do NOT
// reuse them when adding new mappers or significantly changing existing ones.
type ID string
// Mapper applies some function to the given slice of entities, given by
// their keys.
// May be called multiple times for same key (thus should be idempotent).
// Returning a transient error indicates that the processing of this batch of
// keys should be retried (even if some keys were processed successfully).
// Returning a fatal error causes the entire shard (and eventually the entire
// job) to be marked as failed. The processing of the failed shard stops right
// away, but other shards are kept running until completion (or their own
// failure).
// The function is called outside of any transactions, so it can start its own
// if needed.
type Mapper func(ctx context.Context, keys []*datastore.Key) error
// Factory knows how to construct instances of Mapper.
// Factory is supplied by the users of the library and registered in the
// controller via RegisterFactory call.
// It is used to get a mapper to process a set of pages within a shard. It takes
// a Job (including its Config and Params) and a shard index, so it can prepare
// the mapper for processing of this specific shard.
// Returning a transient error triggers an eventual retry. Returning a fatal
// error causes the shard (eventually the entire job) to be marked as failed.
type Factory func(ctx context.Context, j *Job, shardIdx int) (Mapper, error)
// Controller is responsible for starting, progressing and finishing mapping
// jobs.
// It should be treated as a global singleton object. Having more than one
// controller in the production application is a bad idea (they'll collide with
// each other since they use global datastore namespace). It's still useful
// to instantiate multiple controllers in unit tests.
type Controller struct {
// MapperQueue is a name of the Cloud Tasks queue to use for mapping jobs.
// This queue will perform all "heavy" tasks. It should be configured
// appropriately to allow desired number of shards to run in parallel.
// For example, if the largest submitted job is expected to have 128 shards,
// max_concurrent_requests setting of the mapper queue should be at least 128,
// otherwise some shards will be stalled waiting for others to finish
// (defeating the purpose of having large number of shards).
// If empty, "default" is used.
MapperQueue string
// ControlQueue is a name of the Cloud Tasks queue to use for control signals.
// This queue is used very lightly when starting and stopping jobs (roughly
// 2*Shards tasks overall per job). A default queue.yaml settings for such
// queue should be sufficient (unless you run a lot of different jobs at
// once).
// If empty, "default" is used.
ControlQueue string
m sync.RWMutex
mappers map[ID]Factory
disp *tq.Dispatcher
// Install registers task queue task handlers in the given task queue
// dispatcher.
// This must be done before Controller is used.
// There can be at most one Controller installed into an instance of TQ
// dispatcher. Installing more will cause panics.
// If you need multiple different controllers for some reason, create multiple
// tq.Dispatchers (with different base URLs, so they don't conflict with each
// other) and install them all into the router.
func (ctl *Controller) Install(disp *tq.Dispatcher) {
defer ctl.m.Unlock()
if ctl.disp != nil {
panic("mapper.Controller is already installed into a tq.Dispatcher")
ctl.disp = disp
controlQueue := ctl.ControlQueue
if controlQueue == "" {
controlQueue = "default"
mapperQueue := ctl.MapperQueue
if mapperQueue == "" {
mapperQueue = "default"
ID: "dsmapper-split-and-launch",
Prototype: &tasks.SplitAndLaunch{},
Kind: tq.Transactional,
Queue: controlQueue,
Handler: ctl.splitAndLaunchHandler,
Quiet: true,
ID: "dsmapper-fan-out-shards",
Prototype: &tasks.FanOutShards{},
Kind: tq.Transactional,
Queue: controlQueue,
Handler: ctl.fanOutShardsHandler,
Quiet: true,
ID: "dsmapper-process-shard",
Prototype: &tasks.ProcessShard{},
Kind: tq.FollowsContext,
Queue: mapperQueue,
Handler: ctl.processShardHandler,
Quiet: true,
ID: "dsmapper-request-job-state-update",
Prototype: &tasks.RequestJobStateUpdate{},
Kind: tq.Transactional,
Queue: controlQueue,
Handler: ctl.requestJobStateUpdateHandler,
Quiet: true,
ID: "dsmapper-update-job-state",
Prototype: &tasks.UpdateJobState{},
Kind: tq.NonTransactional,
Queue: controlQueue,
Handler: ctl.updateJobStateHandler,
Quiet: true,
// tq returns a dispatcher set in Install or panics if not set yet.
// Grabs the reader lock inside.
func (ctl *Controller) tq() *tq.Dispatcher {
defer ctl.m.RUnlock()
if ctl.disp == nil {
panic("mapper.Controller wasn't installed into tq.Dispatcher yet")
return ctl.disp
// RegisterFactory adds the given mapper factory to the internal registry.
// Intended to be used during init() time or early during the process
// initialization. Panics if a factory with such ID has already been registered.
// The mapper ID will be used internally to identify which mapper a job should
// be using. If a factory disappears while the job is running (e.g. if the
// service binary is updated and new binary doesn't have the mapper registered
// anymore), the job ends with a failure.
func (ctl *Controller) RegisterFactory(id ID, m Factory) {
defer ctl.m.Unlock()
if _, ok := ctl.mappers[id]; ok {
panic(fmt.Sprintf("mapper %q is already registered", id))
if ctl.mappers == nil {
ctl.mappers = make(map[ID]Factory, 1)
ctl.mappers[id] = m
// getFactory returns a registered mapper factory or an error.
// Grabs the reader lock inside. Can return only fatal errors.
func (ctl *Controller) getFactory(id ID) (Factory, error) {
defer ctl.m.RUnlock()
if m, ok := ctl.mappers[id]; ok {
return m, nil
return nil, errors.Reason("no mapper factory with ID %q registered", id).Err()
// initMapper instantiates a Mapper through a registered factory.
// May return fatal and transient errors.
func (ctl *Controller) initMapper(ctx context.Context, j *Job, shardIdx int) (Mapper, error) {
f, err := ctl.getFactory(j.Config.Mapper)
if err != nil {
return nil, errors.Annotate(err, "when initializing mapper").Err()
m, err := f(ctx, j, shardIdx)
if err != nil {
return nil, errors.Annotate(err, "error from mapper factory %q", j.Config.Mapper).Err()
return m, nil
// LaunchJob launches a new mapping job, returning its ID (that can be used to
// control it or query its status).
// Launches a datastore transaction inside.
func (ctl *Controller) LaunchJob(ctx context.Context, j *JobConfig) (JobID, error) {
disp := ctl.tq()
if err := j.Validate(); err != nil {
return 0, errors.Annotate(err, "bad job config").Err()
if _, err := ctl.getFactory(j.Mapper); err != nil {
return 0, errors.Annotate(err, "bad job config").Err()
// Prepare and store the job entity, generate its key. Launch a tq task that
// subdivides the key space and launches individual shards. We do it
// asynchronously since this can be potentially slow (for large number of
// shards).
var job Job
err := runTxn(ctx, func(ctx context.Context) error {
now := clock.Now(ctx).UTC()
job = Job{
Config: *j,
State: dsmapperpb.State_STARTING,
Created: now,
Updated: now,
if err := datastore.Put(ctx, &job); err != nil {
return errors.Annotate(err, "failed to store Job entity").Tag(transient.Tag).Err()
return disp.AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("split:job-%d", job.ID),
Payload: &tasks.SplitAndLaunch{
JobId: int64(job.ID),
if err != nil {
return 0, err
return job.ID, nil
// GetJob fetches a previously launched job given its ID.
// Returns ErrNoSuchJob if not found. All other possible errors are transient
// and they are marked as such.
func (ctl *Controller) GetJob(ctx context.Context, id JobID) (*Job, error) {
// Even though we could have made getJob public, we want to force API users
// to use Controller as a single facade.
return getJob(ctx, id)
// AbortJob aborts a job and returns its most recent state.
// Silently does nothing if the job is finished or already aborted.
// Returns ErrNoSuchJob is there's no such job at all. All other possible errors
// are transient and they are marked as such.
func (ctl *Controller) AbortJob(ctx context.Context, id JobID) (job *Job, err error) {
err = runTxn(ctx, func(ctx context.Context) error {
var err error
switch job, err = getJob(ctx, id); {
case err != nil:
return err
case isFinalState(job.State) || job.State == dsmapperpb.State_ABORTING:
return nil // nothing to abort, already done
case job.State == dsmapperpb.State_STARTING:
// Shards haven't been launched yet. Kill the job right away.
job.State = dsmapperpb.State_ABORTED
case job.State == dsmapperpb.State_RUNNING:
// Running shards will discover that the job is aborting and will
// eventually move into ABORTED state (notifying the job about it). Once
// all shards report they are done, the job itself will switch into
// ABORTED state.
job.State = dsmapperpb.State_ABORTING
job.Updated = clock.Now(ctx).UTC()
return errors.Annotate(datastore.Put(ctx, job), "failed to store Job entity").Tag(transient.Tag).Err()
if err != nil {
job = nil // don't return bogus data in case txn failed to land
// Task queue tasks handlers.
// errJobAborted is used internally as shard failure status when the job is
// being aborted.
// It causes the shard to switch into ABORTED state instead of FAIL.
var errJobAborted = errors.New("the job has been aborted")
// splitAndLaunchHandler splits the job into shards and enqueues tasks that
// process shards.
func (ctl *Controller) splitAndLaunchHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.SplitAndLaunch)
now := clock.Now(ctx).UTC()
// Fetch job details. Make sure it isn't canceled and isn't running already.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_STARTING)
if err != nil || job == nil {
return errors.Annotate(err, "in SplitAndLaunch").Err()
// Figure out key ranges for shards. There may be fewer shards than requested
// if there are too few entities.
dq := job.Config.Query.ToDatastoreQuery()
ranges, err := splitter.SplitIntoRanges(ctx, dq, splitter.Params{
Shards: job.Config.ShardCount,
Samples: 512, // should be enough for everyone...
if err != nil {
return errors.Annotate(err, "failed to split the query into shards").Tag(transient.Tag).Err()
// Create entities that hold shards state. Each one is in its own entity
// group, since the combined write rate to them is O(ShardCount), which can
// overcome limits of a single entity group.
shards := make([]*shard, len(ranges))
for idx, rng := range ranges {
shards[idx] = &shard{
JobID: job.ID,
Index: idx,
State: dsmapperpb.State_STARTING,
Range: rng,
ExpectedCount: -1,
Created: now,
Updated: now,
// Calculate number of entities in each shard to track shard processing
// progress. Note that this can be very slow if there are many entities.
if job.Config.TrackProgress {
logging.Infof(ctx, "Estimating the size of each shard...")
if err := fetchShardSizes(ctx, dq, shards); err != nil {
return errors.Annotate(err, "when estimating shard sizes").Err()
// We use auto-generated keys for shards to make sure crashed SplitAndLaunch
// task retries cleanly, even if the underlying key space we are mapping over
// changes between the retries (making a naive put using "<job-id>:<index>"
// key non-idempotent!).
logging.Infof(ctx, "Instantiating shards...")
if err := datastore.Put(ctx, shards); err != nil {
return errors.Annotate(err, "failed to store shards").Tag(transient.Tag).Err()
// Prepare shardList which is basically a manual fully consistent index for
// Job -> [Shard] relation. We can't use a regular index, since shards are all
// in different entity groups (see O(ShardCount) argument above).
// Log the resulting shards along the way.
shardsEnt := shardList{
Parent: datastore.KeyForObj(ctx, job),
Shards: make([]int64, len(shards)),
for idx, s := range shards {
shardsEnt.Shards[idx] = s.ID
l, r := "-inf", "+inf"
if s.Range.Start != nil {
l = s.Range.Start.String()
if s.Range.End != nil {
r = s.Range.End.String()
count := ""
if s.ExpectedCount != 0 {
count = fmt.Sprintf(" (%d entities)", s.ExpectedCount)
logging.Infof(ctx, "Shard #%d is %d: %s - %s%s", idx, s.ID, l, r, count)
// Transactionally associate shards with the job and launch the TQ task that
// kicks off the processing of each individual shard. We use an intermediary
// task for this since transactionally launching O(ShardCount) tasks hits TQ
// transaction limits.
// If SplitAndLaunch crashes before this transaction lands, there'll be some
// orphaned Shard entities, no big deal.
logging.Infof(ctx, "Updating the job and launching the fan out task...")
return runTxn(ctx, func(ctx context.Context) error {
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_STARTING)
if err != nil || job == nil {
return errors.Annotate(err, "in SplitAndLaunch txn").Err()
job.State = dsmapperpb.State_RUNNING
job.Updated = now
if err := datastore.Put(ctx, job, &shardsEnt); err != nil {
return errors.Annotate(err,
"when storing Job %d and ShardList with %d shards", job.ID, len(shards),
return ctl.tq().AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("fanout:job-%d", job.ID),
Payload: &tasks.FanOutShards{
JobId: int64(job.ID),
// fetchShardSizes makes a bunch of Count() queries to figure out size of each
// shard.
// Updates ExpectedCount in-place.
func fetchShardSizes(ctx context.Context, baseQ *datastore.Query, shards []*shard) error {
ctx, cancel := clock.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err := parallel.WorkPool(32, func(tasks chan<- func() error) {
for _, sh := range shards {
sh := sh
tasks <- func() error {
n, err := datastore.CountBatch(ctx, 1024, sh.Range.Apply(baseQ))
if err == nil {
sh.ExpectedCount = n
return errors.Annotate(err, "for shard #%d", sh.Index).Err()
return transient.Tag.Apply(err)
// fanOutShardsHandler fetches a list of shards from the job and launches
// named ProcessShard tasks, one per shard.
func (ctl *Controller) fanOutShardsHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.FanOutShards)
// Make sure the job is still present. If it is aborted, we still need to
// launch the shards, so they notice they are being aborted. We could try
// to abort all shards right here and now, but it basically means implementing
// an alternative shard abort flow. Seems simpler just to let the regular flow
// to proceed.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in FanOutShards").Err()
// Grab the list of shards created in SplitAndLaunch. It must exist at this
// point, since the job is in Running state.
shardIDs, err := job.fetchShardIDs(ctx)
if err != nil {
return errors.Annotate(err, "in FanOutShards").Err()
// Enqueue a bunch of named ProcessShard tasks (one per shard) to actually
// launch shard processing. This is idempotent operation, so if FanOutShards
// crashes midway and later retried, nothing bad happens.
eg, ctx := errgroup.WithContext(ctx)
tq := ctl.tq()
for _, sid := range shardIDs {
task := makeProcessShardTask(job.ID, sid, 0, true)
eg.Go(func() error { return tq.AddTask(ctx, task) })
return eg.Wait()
// processShardHandler reads a bunch of entities (up to PageSize), and hands
// them to the mapper.
// After doing this in a loop for 1 min, it checkpoints the state and reenqueues
// itself to resume mapping in another instance of the task. This makes each
// processing TQ task relatively small, so it doesn't eat a lot of memory, or
// produces gigantic unreadable logs. It also makes TQ's "Pause queue" button
// more handy.
func (ctl *Controller) processShardHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.ProcessShard)
// Grab the shard. This returns (nil, nil) if this Task Queue task is stale
// (based on taskNum) and should be silently skipped.
sh, err := getActiveShard(ctx, msg.ShardId, msg.TaskNum)
if err != nil || sh == nil {
return errors.Annotate(err, "when fetching shard state").Err()
ctx = logging.SetField(ctx, "shardIdx", sh.Index)
"Resuming processing of the shard (launched %s ago)",
// Grab the job config, make sure the job is still active.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in ProcessShard").Err()
// If the job is being killed, kill the shard as well. This will eventually
// notify the job about shard's completion. Once all shards are done, the
// job will switch into ABORTED state.
if job.State == dsmapperpb.State_ABORTING {
return ctl.finishShard(ctx, sh.ID, 0, errJobAborted)
// Prepare the mapper by giving the factory job parameters.
mapper, err := ctl.initMapper(ctx, job, sh.Index)
switch {
case transient.Tag.In(err):
return errors.Annotate(err, "transient error when instantiating a mapper").Err()
case err != nil:
// Kill the shard if the factory returns a fatal error.
return ctl.finishShard(ctx, sh.ID, 0, err)
baseQ := job.Config.Query.ToDatastoreQuery()
lastKey := sh.ResumeFrom
keys := make([]*datastore.Key, 0, job.Config.PageSize)
shardDone := false // true when finished processing the shard
pageCount := 0 // how many pages processed successfully
itemCount := int64(0) // how many entities processed successfully
// A soft deadline when to checkpoint the progress and reenqueue the
// processing task. We never abort processing of a page midway (causes too
// many complications), so if the mapper is extremely slow, it may end up
// running longer than this deadline.
dur := time.Minute
if job.Config.TaskDuration > 0 {
dur = job.Config.TaskDuration
deadline := clock.Now(ctx).Add(dur)
// Optionally also put a limit on number of processed pages. Useful if the
// mapper is somehow leaking resources (not sure it is possible in Go, but
// it was definitely possible in Python).
pageCountLimit := math.MaxInt32
if job.Config.PagesPerTask > 0 {
pageCountLimit = job.Config.PagesPerTask
for clock.Now(ctx).Before(deadline) && pageCount < pageCountLimit {
rng := sh.Range
if lastKey != nil {
rng.Start = lastKey
if rng.IsEmpty() {
shardDone = true
// Fetch next batch of keys. Return an error to the outer scope where it
// eventually will bubble up to TQ (so the task is retried with exponential
// backoff).
logging.Infof(ctx, "Fetching the next batch...")
q := rng.Apply(baseQ).Limit(int32(job.Config.PageSize)).KeysOnly(true)
keys = keys[:0]
if err = datastore.GetAll(ctx, q, &keys); err != nil {
err = errors.Annotate(err, "when querying for keys").Tag(transient.Tag).Err()
// No results within the range? Processing of the shard is complete!
if len(keys) == 0 {
shardDone = true
// Let the mapper do its thing. Remember where to resume from.
"Processing %d entities: %s - %s",
if err = mapper(ctx, keys); err != nil {
err = errors.Annotate(err, "while mapping %d keys", len(keys)).Err()
lastKey = keys[len(keys)-1]
itemCount += int64(len(keys))
// Note: at this point we might try to checkpoint the progress, but we must
// be careful not to exceed 1 transaction per second limit. Considering we
// also MUST checkpoint the progress at the end of the task, it is a bit
// tricky to guarantee no two checkpoints are closer than 1 sec. We can do
// silly things like sleep 1 sec before the last checkpoint, but they
// provide no guarantees.
// So instead we store the progress after the deadline is up. If the task
// crashes midway, up to 1 min of work will be retried. No big deal.
// We are done with the shard when either processed all its range or failed
// with a fatal error. finishShard would take care of notifying the parent
// job about the shard's completion.
if shardDone || (err != nil && !transient.Tag.In(err)) {
return ctl.finishShard(ctx, sh.ID, itemCount, err)
if lastKey != nil {
logging.Infof(ctx, "The shard processing will resume from %s", lastKey)
} else {
logging.Infof(ctx, "The shard processing will resume from scratch")
// If the shard isn't done and we made no progress at all, then we hit
// a transient error. Ask TQ to retry.
if pageCount == 0 {
return err
// Otherwise need to checkpoint the progress and either to retry this task
// (on transient errors, to get an exponential backoff from TQ), or start
// a new task.
txnErr := shardTxn(ctx, sh.ID, func(ctx context.Context, sh *shard) (bool, error) {
switch {
case sh.ProcessTaskNum != msg.TaskNum:
logging.Warningf(ctx, "Unexpected shard state: its ProcessTaskNum is %d != %d", sh.ProcessTaskNum, msg.TaskNum)
return false, nil // some other task is already running
case sh.ResumeFrom != nil && lastKey.Less(sh.ResumeFrom):
logging.Warningf(ctx, "Unexpected shard state: its ResumeFrom is %s >= %s", sh.ResumeFrom, lastKey)
return false, nil // someone already claimed to process further, let them proceed
sh.State = dsmapperpb.State_RUNNING
sh.ResumeFrom = lastKey
sh.ProcessedCount += itemCount
// If the processing failed, just store the progress, but do not start a
// new TQ task. Retry the current task instead (to get exponential backoff).
if err != nil {
return true, nil
// Otherwise launch a new task in the chain. This essentially "resets"
// the exponential backoff counter.
return true, ctl.tq().AddTask(ctx,
makeProcessShardTask(sh.JobID, sh.ID, sh.ProcessTaskNum, false))
switch {
case err != nil && txnErr == nil:
return err
case err == nil && txnErr != nil:
return errors.Annotate(txnErr, "when storing shard progress").Err()
case err != nil && txnErr != nil:
return errors.Annotate(txnErr, "when storing shard progress after a transient error (%s)", err).Err()
default: // (nil, nil)
return nil
// finishShard marks the shard as finished (with status based on shardErr) and
// emits a task to update the parent job's status.
func (ctl *Controller) finishShard(ctx context.Context, shardID, processedCount int64, shardErr error) error {
err := shardTxn(ctx, shardID, func(ctx context.Context, sh *shard) (save bool, err error) {
runtime := clock.Now(ctx).Sub(sh.Created)
switch {
case shardErr == errJobAborted:
logging.Warningf(ctx, "The job has been aborted, aborting the shard after it has been running %s", runtime)
sh.State = dsmapperpb.State_ABORTED
sh.Error = errJobAborted.Error()
case shardErr != nil:
logging.Errorf(ctx, "The shard processing failed in %s with error: %s", runtime, shardErr)
sh.State = dsmapperpb.State_FAIL
sh.Error = shardErr.Error()
logging.Infof(ctx, "The shard processing finished successfully in %s", runtime)
sh.State = dsmapperpb.State_SUCCESS
sh.ProcessedCount += processedCount
return true, ctl.requestJobStateUpdate(ctx, sh.JobID, sh.ID)
return errors.Annotate(err, "when marking the shard as finished").Err()
// makeProcessShardTask creates a ProcessShard tq.Task.
// If 'named' is true, assigns it a name. Tasks are named based on their shard
// IDs and an index in the chain of ProcessShard tasks (task number), so that
// on retries we don't rekick already finished tasks.
func makeProcessShardTask(job JobID, shardID, taskNum int64, named bool) *tq.Task {
// Note: strictly speaking including job ID in the task name is redundant,
// since shardID is already globally unique, but it doesn't hurt. Useful for
// debugging and when looking at logs and pending TQ tasks.
t := &tq.Task{
Title: fmt.Sprintf("map:job-%d-shard-%d-task-%d", job, shardID, taskNum),
Payload: &tasks.ProcessShard{
JobId: int64(job),
ShardId: shardID,
TaskNum: taskNum,
if named {
t.DeduplicationKey = fmt.Sprintf("v1-%d-%d-%d", job, shardID, taskNum)
return t
// requestJobStateUpdate submits RequestJobStateUpdate task, which eventually
// causes updateJobStateHandler to execute.
func (ctl *Controller) requestJobStateUpdate(ctx context.Context, jobID JobID, shardID int64) error {
return ctl.tq().AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("notify:job-%d-shard-%d", jobID, shardID),
Payload: &tasks.RequestJobStateUpdate{
JobId: int64(jobID),
ShardId: shardID,
// requestJobStateUpdateHandler is called whenever state of some shard changes.
// It forwards this notification to the job (specifically updateJobStateHandler)
// throttling the rate to ~0.5 QPS to avoid overwhelming job's entity group with
// high write rate.
func (ctl *Controller) requestJobStateUpdateHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.RequestJobStateUpdate)
// Throttle to once per 2 sec (and make sure it is always in the future). We
// rely here on a pretty good (< .5s maximum skew) clock sync on servers.
eta := clock.Now(ctx).Unix()
eta = (eta/2 + 1) * 2
dedupKey := fmt.Sprintf("update-job-state-v1:%d:%d", msg.JobId, eta)
err := ctl.tq().AddTask(ctx, &tq.Task{
DeduplicationKey: dedupKey,
Title: fmt.Sprintf("update:job-%d", msg.JobId),
ETA: time.Unix(eta, 0),
Payload: &tasks.UpdateJobState{JobId: msg.JobId},
return errors.Annotate(err, "when adding UpdateJobState task").Err()
// updateJobStateHandler is called some time later after one or more shards have
// changed state.
// It calculates overall job state based on the state of its shards.
func (ctl *Controller) updateJobStateHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.UpdateJobState)
// Get the job and all its shards in their most recent state.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in UpdateJobState").Err()
shards, err := job.fetchShards(ctx)
if err != nil {
return errors.Annotate(err, "failed to fetch shards").Err()
// Switch the job into a final state only when all shards are done running.
perState := make(map[dsmapperpb.State]int, len(dsmapperpb.State_name))
finished := 0
for _, sh := range shards {
logging.Infof(ctx, "Shard #%d (%d) is in state %s", sh.Index, sh.ID, sh.State)
if isFinalState(sh.State) {
if finished != len(shards) {
return nil
jobState := dsmapperpb.State_SUCCESS
switch {
case perState[dsmapperpb.State_ABORTED] != 0:
jobState = dsmapperpb.State_ABORTED
case perState[dsmapperpb.State_FAIL] != 0:
jobState = dsmapperpb.State_FAIL
return runTxn(ctx, func(ctx context.Context) error {
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in UpdateJobState txn").Err()
// Make sure an aborting job ends up in aborted state, even if all its
// shards manged to finish. It looks weird when an ABORTING job moves
// into e.g. SUCCESS state.
if job.State == dsmapperpb.State_ABORTING {
job.State = dsmapperpb.State_ABORTED
} else {
job.State = jobState
job.Updated = clock.Now(ctx).UTC()
runtime := job.Updated.Sub(job.Created)
switch job.State {
case dsmapperpb.State_SUCCESS:
logging.Infof(ctx, "The job finished successfully in %s", runtime)
case dsmapperpb.State_FAIL:
logging.Errorf(ctx, "The job finished with %d shards failing in %s", perState[dsmapperpb.State_FAIL], runtime)
for _, sh := range shards {
if sh.State == dsmapperpb.State_FAIL {
logging.Errorf(ctx, "Shard #%d (%d) error - %s", sh.Index, sh.ID, sh.Error)
case dsmapperpb.State_ABORTED:
logging.Warningf(ctx, "The job has been aborted after %s: %d shards succeeded, %d shards failed, %d shards aborted",
runtime, perState[dsmapperpb.State_SUCCESS], perState[dsmapperpb.State_FAIL], perState[dsmapperpb.State_ABORTED])
return transient.Tag.Apply(datastore.Put(ctx, job))