blob: 2b8a799b066ab0a2948df83ba38663050ede1845 [file] [log] [blame]
// Copyright 2022 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package graph contains methods to explore reachable invocations.
package graph
import (
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/resultdb/internal/invocations"
internalpb "go.chromium.org/luci/resultdb/internal/proto"
"go.chromium.org/luci/resultdb/internal/spanutil"
pb "go.chromium.org/luci/resultdb/proto/v1"
)
// ReachableInvocation contains summary information about a reachable
// invocation.
type ReachableInvocation struct {
// HasTestResults stores whether the invocation has any test results.
HasTestResults bool
// HasTestResults stores whether the invocation has any test exonerations.
HasTestExonerations bool
// The realm of the invocation.
Realm string
// The source associated with the invocation, which can be looked up in
// ReachableInvocations.Sources.
// If no sources could be resolved, this is EmptySourceHash.
SourceHash SourceHash
}
// ReachableInvocations is a set of reachable invocations,
// including summary information about each invocation.
// The set includes the root invocation(s) from which reachables were
// explored.
type ReachableInvocations struct {
// The set of reachable invocations, including the root
// invocation from which reachability was explored.
Invocations map[invocations.ID]ReachableInvocation
// The distinct code sources in the reachable invocation graph.
// Stored here rather than on the invocations themselves to
// simplify deduplicating sources objects as many will be the
// same between invocations.
Sources map[SourceHash]*pb.Sources
}
func NewReachableInvocations() ReachableInvocations {
return ReachableInvocations{
Invocations: make(map[invocations.ID]ReachableInvocation),
Sources: make(map[SourceHash]*pb.Sources),
}
}
// Union adds other reachable invocations.
func (r *ReachableInvocations) Union(other ReachableInvocations) {
for id, invocation := range other.Invocations {
r.Invocations[id] = invocation
}
for id, sources := range other.Sources {
r.Sources[id] = sources
}
}
// Batches splits s into batches.
// The batches are sorted by RowID(), such that interval (minRowID, maxRowID)
// of each batch does not overlap with any other batch.
//
// The size of batch is hardcoded 50, because that's the maximum parallelism
// we get from Cloud Spanner.
func (r ReachableInvocations) Batches() []ReachableInvocations {
return r.batches(50)
}
// IDSet returns the set of invocation IDs included in the list of
// reachable invocations.
func (r ReachableInvocations) IDSet() (invocations.IDSet, error) {
// Yes, this is an artificial limit. With 20,000 invocations you are already likely
// to run into problems if you try to process all of these in one go (e.g. in a
// Spanner query). If you want more, use the batched call and handle a batch at a time.
if len(r.Invocations) > MaxNodes {
return nil, errors.Reason("more than %d invocations match", MaxNodes).Tag(TooManyTag).Err()
}
return r.idSetNoLimit(), nil
}
// IDSet returns the set of invocation IDs included in the list of
// reachable invocations. This internal only version has no limit and
// is only for use where the limit will be checked in other ways.
func (r ReachableInvocations) idSetNoLimit() invocations.IDSet {
result := make(invocations.IDSet, len(r.Invocations))
for id := range r.Invocations {
result[id] = struct{}{}
}
return result
}
// WithTestResultsIDSet returns the set of invocation IDs
// that contain test results.
func (r ReachableInvocations) WithTestResultsIDSet() (invocations.IDSet, error) {
result := make(invocations.IDSet, len(r.Invocations))
for id, inv := range r.Invocations {
if inv.HasTestResults {
result[id] = struct{}{}
}
}
// Yes, this is an artificial limit. With 20,000 invocations you are already likely
// to run into problems if you try to process all of these in one go (e.g. in a
// Spanner query). If you want more, use the batched call and handle a batch at a time.
if len(result) > MaxNodes {
return nil, errors.Reason("more than %d invocations match", MaxNodes).Tag(TooManyTag).Err()
}
return result, nil
}
// WithExonerationsIDSet returns the set of invocation IDs
// that contain test exonerations.
func (r ReachableInvocations) WithExonerationsIDSet() (invocations.IDSet, error) {
result := make(invocations.IDSet, len(r.Invocations))
for id, inv := range r.Invocations {
if inv.HasTestExonerations {
result[id] = struct{}{}
}
}
// Yes, this is an artificial limit. With 20,000 invocations you are already likely
// to run into problems if you try to process all of these in one go (e.g. in a
// Spanner query). If you want more, use the batched call and handle a batch at a time.
if len(result) > MaxNodes {
return nil, errors.Reason("more than %d invocations match", MaxNodes).Tag(TooManyTag).Err()
}
return result, nil
}
func (r ReachableInvocations) batches(size int) []ReachableInvocations {
ids := r.idSetNoLimit().SortByRowID()
batches := make([]ReachableInvocations, 0, 1+len(ids)/size)
for len(ids) > 0 {
batchSize := size
if batchSize > len(ids) {
batchSize = len(ids)
}
batch := NewReachableInvocations()
for _, id := range ids[:batchSize] {
inv := r.Invocations[id]
batch.Invocations[id] = inv
if inv.SourceHash != EmptySourceHash {
batch.Sources[inv.SourceHash] = r.Sources[inv.SourceHash]
}
}
batches = append(batches, batch)
ids = ids[batchSize:]
}
return batches
}
// marshal marshals the ReachableInvocations into a Redis value.
func (r ReachableInvocations) marshal() ([]byte, error) {
if len(r.Invocations) == 0 {
return nil, errors.Reason("reachable invocations is invalid; at minimum the root invocation itself should be included").Err()
}
indexBySourceHash := make(map[SourceHash]int)
distinctSources := make([]*pb.Sources, 0, len(r.Sources))
for id, source := range r.Sources {
distinctSources = append(distinctSources, source)
indexBySourceHash[id] = len(distinctSources) - 1
}
invocations := make([]*internalpb.ReachableInvocations_ReachableInvocation, 0, len(r.Invocations))
for id, inv := range r.Invocations {
proto := &internalpb.ReachableInvocations_ReachableInvocation{
InvocationId: string(id),
HasTestResults: inv.HasTestResults,
HasTestExonerations: inv.HasTestExonerations,
Realm: inv.Realm,
}
if inv.SourceHash != EmptySourceHash {
proto.SourceOffset = int64(indexBySourceHash[inv.SourceHash]) + 1
}
invocations = append(invocations, proto)
}
message := &internalpb.ReachableInvocations{
Invocations: invocations,
Sources: distinctSources,
}
result, err := proto.Marshal(message)
if err != nil {
return nil, err
}
return spanutil.Compress(result), nil
}
// unmarshalReachableInvocations unmarshals the ReachableInvocations from a Redis value.
func unmarshalReachableInvocations(value []byte) (ReachableInvocations, error) {
// Assume 2x growth on decompression (will be resized as needed)
decompressed := make([]byte, 0, len(value)*2)
decompressed, err := spanutil.Decompress(value, decompressed)
if err != nil {
return ReachableInvocations{}, err
}
message := &internalpb.ReachableInvocations{}
if err := proto.Unmarshal(decompressed, message); err != nil {
return ReachableInvocations{}, err
}
sourceHashByIndex := make([]SourceHash, len(message.Sources))
sources := make(map[SourceHash]*pb.Sources)
for i, source := range message.Sources {
hash := HashSources(source)
sources[hash] = source
sourceHashByIndex[i] = hash
}
invs := make(map[invocations.ID]ReachableInvocation, len(message.Invocations))
for _, entry := range message.Invocations {
inv := ReachableInvocation{
HasTestResults: entry.HasTestResults,
HasTestExonerations: entry.HasTestExonerations,
Realm: entry.Realm,
SourceHash: EmptySourceHash,
}
if entry.SourceOffset > 0 {
inv.SourceHash = sourceHashByIndex[entry.SourceOffset-1]
}
invs[invocations.ID(entry.InvocationId)] = inv
}
return ReachableInvocations{
Invocations: invs,
Sources: sources,
}, nil
}