package engine
import (
// invQuery abstracts a query that fetches invocations in order of their IDs,
// smallest to largest.
// Think of it as a pointer to the head of the query, that can be advanced on
// demand.
type invQuery interface {
// peek returns the invocation the query currently points to.
// It is fetched when 'advance' is called. A first call to 'peek' may also
// initiate a fetch (to grab the first ever item).
// Returns nil if there's no more invocations to fetch. Returns an error if
// the fetch failed.
peek() (*Invocation, error)
// advance fetches the next invocation to be returned by 'peek'.
// Returns an error if this operation fails. Reaching the end of the results
// is not an error. If this happened, next 'peek' returns nil, and keeps
// returning nil forever.
advance() error
// mergeInvQueries merges results of multiple queries together.
// It picks smallest IDs first. In presence of duplicates, it favors queries
// that are listed in 'qs' earlier.
// Places the results into 'out' slice, returning the extended slice at the end.
// Returns (..., true, nil) in case results of all queries has been exhausted,
// and (..., false, nil) if it hit the limit on number of results.
func mergeInvQueries(qs []invQuery, limit int, out []*Invocation) ([]*Invocation, bool, error) {
maxLen := len(out) + limit
for {
// Find the smallest invocation from heads of all queries. Do it even if
// already reached the limit, to check whether there are more items left.
var smallest *Invocation
for _, q := range qs {
inv, err := q.peek()
if err != nil {
return nil, false, err
if inv == nil {
continue // exhausted results of this query
if smallest == nil || inv.ID < smallest.ID {
smallest = inv
switch {
case smallest == nil:
return out, true, nil // exhausted results of all queries
case len(out) == maxLen:
return out, false, nil // actually filled up to the limit
out = append(out, smallest)
// There may be duplicates in the queries, so need to pop the consumed
// invocation from all queries.
for _, q := range qs {
for {
inv, err := q.peek()
if err != nil {
return nil, false, err
if inv == nil || inv.ID > smallest.ID {
break // found something larger at the head
if err := q.advance(); err != nil {
return nil, false, err
// List based queries.
// invListQuery implements invQuery on top of a sorted list of Invocations.
// The list is assumed to be sorted by IDs in smallest-to-largest order. This is
// also the order in which invocations will be returned.
type invListQuery struct {
invs []*Invocation
cur int
func (q *invListQuery) peek() (*Invocation, error) {
if q.cur == len(q.invs) {
return nil, nil
return q.invs[q.cur], nil
func (q *invListQuery) advance() error {
if q.cur < len(q.invs) {
return nil
// activeInvQuery returns invQuery that emits active invocations, as fetched
// from the job.ActiveInvocations field.
// Smallest IDs are returned first. IDs smaller than or equal to lastScanned are
// skipped (this is used for pagination).
func activeInvQuery(c context.Context, j *Job, lastScanned int64) *invListQuery {
var invs []*Invocation
for _, id := range j.ActiveInvocations {
if id > lastScanned {
invs = append(invs, &Invocation{ID: id})
sort.Slice(invs, func(l, r int) bool { return invs[l].ID < invs[r].ID })
return &invListQuery{invs: invs}
// recentInvQuery returns invQuery that emits recently finished invocations, as
// fetched from the job.FinishedInvocationsRaw field.
// Smallest IDs are returned first. IDs smaller than or equal to lastScanned are
// skipped (this is used for pagination).
func recentInvQuery(c context.Context, j *Job, lastScanned int64) *invListQuery {
finished, err := filteredFinishedInvs(
j.FinishedInvocationsRaw, clock.Now(c).Add(-FinishedInvocationsHorizon))
if err != nil {
logging.WithError(err).Errorf(c, "Failed to unmarshal FinishedInvocationsRaw, skipping")
return &invListQuery{}
var invs []*Invocation
for _, inv := range finished {
if inv.InvocationId > lastScanned {
invs = append(invs, &Invocation{ID: inv.InvocationId})
sort.Slice(invs, func(l, r int) bool { return invs[l].ID < invs[r].ID })
return &invListQuery{invs: invs}
// Datastore based queries.
// invDatastoreIter is a wrapper over datastore query that makes it look more
// like an iterator.
// Intended usage:
// it.start(...)
// defer it.stop()
// for len(results) != pageSize {
// switch item, err :=; {
// case err != nil:
// return nil, err // RPC error
// case item == nil:
// ...
// return results, nil // fetched all available results
// default:
// results = append(results, item)
// }
// }
// return results // fetched the full page
type invDatastoreIter struct {
results chan *Invocation // receives results of the query
done chan struct{} // closed when 'stop' is called
err error // error status of the query, synchronized via 'results'
stopped bool // true if 'stop' was called
// start initiates the query.
// The iterator is initially positioned before the first item, so that a call
// to 'next' will return the first item.
func (it *invDatastoreIter) start(c context.Context, query *datastore.Query) {
it.results = make(chan *Invocation)
it.done = make(chan struct{})
go func() {
defer close(it.results)
err := datastore.Run(c, query, func(obj *Invocation, cb datastore.CursorCB) error {
select {
case it.results <- obj:
return nil
case <-it.done:
return datastore.Stop
// Let 'next' and 'stop' know about the error. They look here if they
// receive 'nil' from the results channel (which happens if it is closed).
it.err = err
// next fetches the next query item if there's one.
// Returns (nil, nil) if all items has been successfully fetched. If the query
// failed, returns (nil, err).
func (it *invDatastoreIter) next() (*Invocation, error) {
switch {
case it.results == nil:
panic("'next' is called before 'start'")
case it.stopped:
panic("'next' is called after 'stop'")
if inv, ok := <-it.results; ok {
return inv, nil
return nil, it.err // 'it.err' is valid only after the channel is closed
// stop finishes the query, killing the internal goroutine.
// Once 'stop' is called, calls to 'next' are forbidden. It is OK to call
// 'stop' again though (it will return exact same value).
func (it *invDatastoreIter) stop() error {
if !it.stopped {
it.stopped = true
close(it.done) // signal the inner loop to wake up and exit
for range it.results { // wait for the results channel to close
return it.err
// invDatastoreQuery implements invQuery on top of a datastore iterator.
// The datastore query results are expected to be sorted by IDs in
// smallest-to-largest order. This is also the order in which invocations will
// be returned.
type invDatastoreQuery struct {
iter invDatastoreIter // iterator positioned before the next result
head *Invocation // value to return in peek() or nil if haven't fetched yet
err error // non-nil if the last fetch failed
done bool // true if fetched everything we could
func (q *invDatastoreQuery) peek() (*Invocation, error) {
if q.done || q.err != nil {
return nil, q.err // in a final non-advancable state
if q.head == nil {
q.advance() // need to fetch the first item ever
return q.head, q.err
func (q *invDatastoreQuery) advance() error {
if q.done || q.err != nil {
return q.err // in a final non-advancable state
q.head, q.err =
q.done = q.head == nil && q.err == nil
return q.err
func (q *invDatastoreQuery) close() {
// finishedInvQuery returns invQuery that emits historical finished invocations,
// of the given job.
// Smallest IDs are returned first. IDs smaller than or equal to lastScanned are
// skipped (this is used for pagination).
func finishedInvQuery(c context.Context, job *Job, lastScanned int64) *invDatastoreQuery {
q := datastore.NewQuery("Invocation")
q = q.Eq("IndexedJobID", job.JobID)
if lastScanned > 0 {
q = q.Gt("__key__", datastore.KeyForObj(c, &Invocation{ID: lastScanned}))
q = q.Order("__key__")
out := &invDatastoreQuery{}
out.iter.start(c, q)
return out
// Cursor helpers.
// decodeInvCursor deserializes a base64-encoded cursor.
func decodeInvCursor(cursor string, cur *internal.InvocationsCursor) error {
if cursor == "" {
*cur = internal.InvocationsCursor{}
return nil
blob, err := base64.RawURLEncoding.DecodeString(cursor)
if err != nil {
return errors.Annotate(err, "failed to base64 decode the cursor").Err()
if err = proto.Unmarshal(blob, cur); err != nil {
return errors.Annotate(err, "failed to unmarshal the cursor").Err()
return nil
// encodeInvCursor serializes the cursor to base64-encoded string.
func encodeInvCursor(cur *internal.InvocationsCursor) (string, error) {
if cur.LastScanned == 0 {
return "", nil
blob, err := proto.Marshal(cur)
if err != nil {
return "", err // must never actually happen
return base64.RawURLEncoding.EncodeToString(blob), nil
// High level functions used by Engine.
// invsPage contains information about a page returned by fetchInvsPage.
type invsPage struct {
count int // number of invocations in the page
final bool // true if this is the final page
lastScanned int64 // ID of the last scanned invocation if 'final' is false
// fetchInvsPage fetches (perhaps incomplete or empty) page of invocations,
// by merging results from multiple queries into the given 'out' slice.
// It is called (perhaps multiple times) by public ListInvocations to construct
// a full page of results out of smaller incomplete pages.
// Returns the extended 'out' slice (that now contains fetched items) and
// information about the fetched page.
func fetchInvsPage(c context.Context, qs []invQuery, opts ListInvocationsOpts, out []*Invocation) ([]*Invocation, invsPage, error) {
prevSize := len(out)
out, final, err := mergeInvQueries(qs, opts.PageSize, out)
if err != nil {
return nil, invsPage{}, errors.Annotate(err, "failed to query invocations").Tag(transient.Tag).Err()
// Nothing new at all? We are done.
if len(out) == prevSize {
return out, invsPage{final: true}, nil
// Otherwise remember the last ID we looked at to resume our query from it. It
// is important to grab the ID before the filtering, otherwise we may end up
// stuck in an infinite loop that fetches an empty page (with all items
// filtered out) over and over again, not advancing the query.
lastScanned := out[len(out)-1].ID
// Inflate and filter (in-place) shallow entities resulted from queries over
// IDs list. Note that this may reduce the returned page size, in
// a pathological case to 0. 'ListInvocations' will compensate for that by
// calling 'fetchInvsPage' again to fetch more stuff until the full page is
// fetched.
filtered, err := fillShallowInvs(c, out[prevSize:], opts)
if err != nil {
return nil, invsPage{}, err
// 'filtered' points to a subslice of 'out' (located at the end), that has
// all filtered items now. Truncate 'out' to get rid of garbage left after
// the filtering.
out = out[:prevSize+len(filtered)]
return out, invsPage{len(out) - prevSize, final, lastScanned}, nil
// fillShallowInvs detects entities that do not have bodies fetched yet, fetches
// them, and filters them based on ActiveOnly/FinishedOnly filter defined by
// opts.
// This is needed for results of queries that use IDs inlined in the Job entity.
// We detect such shallow entities by missing Status value, which is guaranteed
// to be set for all Invocation entities.
// Filtering is required since the state of the entities fetched here may be
// more up-to-date than the state used by queries. In particular, active
// invocations may not be active anymore.
// Filters the given slice in-place and returns the filtered slice that shares
// the same underlying array.
func fillShallowInvs(c context.Context, invs []*Invocation, opts ListInvocationsOpts) ([]*Invocation, error) {
var shallow []*Invocation
for _, inv := range invs {
if inv.Status == "" {
shallow = append(shallow, inv)
if len(shallow) == 0 {
return invs, nil
if err := datastore.Get(c, shallow); err != nil {
return nil, errors.Annotate(err, "failed to fetch invocations").Tag(transient.Tag).Err()
filtered := invs[:0]
for _, inv := range invs {
if opts.ActiveOnly && inv.Status.Final() {
if opts.FinishedOnly && !inv.Status.Final() {
filtered = append(filtered, inv)
return filtered, nil