blob: 4d1961706729ce7af4942707ef87ac94202a7abd [file] [log] [blame]
// Copyright 2020 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package invocations
import (
"context"
"cloud.google.com/go/spanner"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/trace"
"go.chromium.org/luci/grpc/appstatus"
"go.chromium.org/luci/server/span"
"go.chromium.org/luci/resultdb/internal/spanutil"
"go.chromium.org/luci/resultdb/pbutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
// ReadColumns reads the specified columns from an invocation Spanner row.
// If the invocation does not exist, the returned error is annotated with
// NotFound GRPC code.
// For ptrMap see ReadRow comment in span/util.go.
func ReadColumns(ctx context.Context, id ID, ptrMap map[string]interface{}) error {
if id == "" {
return errors.Reason("id is unspecified").Err()
}
err := spanutil.ReadRow(ctx, "Invocations", id.Key(), ptrMap)
switch {
case spanner.ErrCode(err) == codes.NotFound:
return appstatus.Attachf(err, codes.NotFound, "%s not found", id.Name())
case err != nil:
return errors.Annotate(err, "failed to fetch %s", id.Name()).Err()
default:
return nil
}
}
func readMulti(ctx context.Context, ids IDSet, f func(id ID, inv *pb.Invocation) error) error {
if len(ids) == 0 {
return nil
}
st := spanner.NewStatement(`
SELECT
i.InvocationId,
i.State,
i.CreatedBy,
i.CreateTime,
i.FinalizeTime,
i.Deadline,
i.Tags,
i.BigQueryExports,
ARRAY(SELECT IncludedInvocationId FROM IncludedInvocations incl WHERE incl.InvocationID = i.InvocationId),
i.ProducerResource,
i.Realm,
i.HistoryTime,
FROM Invocations i
WHERE i.InvocationID IN UNNEST(@invIDs)
`)
st.Params = spanutil.ToSpannerMap(map[string]interface{}{
"invIDs": ids,
})
var b spanutil.Buffer
return spanutil.Query(ctx, st, func(row *spanner.Row) error {
var id ID
included := IDSet{}
var bqExports [][]byte
inv := &pb.Invocation{}
var createdBy spanner.NullString
var producerResource spanner.NullString
var realm spanner.NullString
var historyTime *timestamppb.Timestamp
err := b.FromSpanner(row, &id,
&inv.State,
&createdBy,
&inv.CreateTime,
&inv.FinalizeTime,
&inv.Deadline,
&inv.Tags,
&bqExports,
&included,
&producerResource,
&realm,
&historyTime)
if err != nil {
return err
}
inv.Name = pbutil.InvocationName(string(id))
inv.IncludedInvocations = included.Names()
inv.CreatedBy = createdBy.StringVal
inv.ProducerResource = producerResource.StringVal
inv.Realm = realm.StringVal
if len(bqExports) > 0 {
inv.BigqueryExports = make([]*pb.BigQueryExport, len(bqExports))
for i, buf := range bqExports {
inv.BigqueryExports[i] = &pb.BigQueryExport{}
if err := proto.Unmarshal(buf, inv.BigqueryExports[i]); err != nil {
return errors.Annotate(err, "%s: failed to unmarshal BigQuery export", inv.Name).Err()
}
}
}
if historyTime != nil {
inv.HistoryOptions = &pb.HistoryOptions{
UseInvocationTimestamp: true,
}
}
return f(id, inv)
})
}
// Read reads one invocation from Spanner.
// If the invocation does not exist, the returned error is annotated with
// NotFound GRPC code.
func Read(ctx context.Context, id ID) (*pb.Invocation, error) {
var ret *pb.Invocation
err := readMulti(ctx, NewIDSet(id), func(id ID, inv *pb.Invocation) error {
ret = inv
return nil
})
switch {
case err != nil:
return nil, err
case ret == nil:
return nil, appstatus.Errorf(codes.NotFound, "%s not found", id.Name())
default:
return ret, nil
}
}
// ReadBatch reads multiple invocations from Spanner.
// If any of them are not found, returns an error.
func ReadBatch(ctx context.Context, ids IDSet) (map[ID]*pb.Invocation, error) {
ret := make(map[ID]*pb.Invocation, len(ids))
err := readMulti(ctx, ids, func(id ID, inv *pb.Invocation) error {
if _, ok := ret[id]; ok {
panic("query is incorrect; it returned duplicated invocation IDs")
}
ret[id] = inv
return nil
})
if err != nil {
return nil, err
}
for id := range ids {
if _, ok := ret[id]; !ok {
return nil, appstatus.Errorf(codes.NotFound, "%s not found", id.Name())
}
}
return ret, nil
}
// ReadState returns the invocation's state.
func ReadState(ctx context.Context, id ID) (pb.Invocation_State, error) {
var state pb.Invocation_State
err := ReadColumns(ctx, id, map[string]interface{}{"State": &state})
return state, err
}
// ReadStateBatch reads the states of multiple invocations.
func ReadStateBatch(ctx context.Context, ids IDSet) (map[ID]pb.Invocation_State, error) {
ret := make(map[ID]pb.Invocation_State)
err := span.Read(ctx, "Invocations", ids.Keys(), []string{"InvocationID", "State"}).Do(func(r *spanner.Row) error {
var id ID
var s pb.Invocation_State
if err := spanutil.FromSpanner(r, &id, &s); err != nil {
return errors.Annotate(err, "failed to fetch %s", ids).Err()
}
ret[id] = s
return nil
})
if err != nil {
return nil, err
}
return ret, nil
}
// ReadRealm returns the invocation's realm.
func ReadRealm(ctx context.Context, id ID) (string, error) {
var realm string
err := ReadColumns(ctx, id, map[string]interface{}{"Realm": &realm})
return realm, err
}
// ReadRealms returns the invocations' realms.
// Makes a single RPC.
func ReadRealms(ctx context.Context, ids IDSet) (realms map[ID]string, err error) {
ctx, ts := trace.StartSpan(ctx, "resultdb.invocations.ReadRealms")
ts.Attribute("cr.dev/count", len(ids))
defer func() { ts.End(err) }()
realms = make(map[ID]string, len(ids))
var b spanutil.Buffer
err = span.Read(ctx, "Invocations", ids.Keys(), []string{"InvocationId", "Realm"}).Do(func(row *spanner.Row) error {
var id ID
var realm spanner.NullString
if err := b.FromSpanner(row, &id, &realm); err != nil {
return err
}
realms[id] = realm.StringVal
return nil
})
if err != nil {
return nil, err
}
// Return a NotFound error if ret is missing a requested invocation.
for id := range ids {
if _, ok := realms[id]; !ok {
return nil, appstatus.Errorf(codes.NotFound, "%s not found", id.Name())
}
}
return realms, nil
}