| // Copyright 2023 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package lucianalysis contains methods to query test failures maintained in BigQuery. |
| package lucianalysis |
| |
| import ( |
| "bytes" |
| "context" |
| "fmt" |
| "net/http" |
| "strings" |
| "text/template" |
| |
| "cloud.google.com/go/bigquery" |
| "go.chromium.org/luci/bisection/model" |
| configpb "go.chromium.org/luci/bisection/proto/config" |
| pb "go.chromium.org/luci/bisection/proto/v1" |
| tpb "go.chromium.org/luci/bisection/task/proto" |
| "go.chromium.org/luci/bisection/util" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| rdbpbutil "go.chromium.org/luci/resultdb/pbutil" |
| "go.chromium.org/luci/server/auth" |
| "google.golang.org/api/iterator" |
| "google.golang.org/api/option" |
| ) |
| |
| var internalDatasetID = "internal" |
| |
| var readFailureTemplate = template.Must(template.New("").Parse( |
| ` |
| {{define "basic" -}} |
| WITH |
| segments_with_failure_rate AS ( |
| SELECT |
| *, |
| ( segments[0].counts.unexpected_results / segments[0].counts.total_results) AS current_failure_rate, |
| ( segments[1].counts.unexpected_results / segments[1].counts.total_results) AS previous_failure_rate, |
| segments[0].start_position AS nominal_upper, |
| segments[1].end_position AS nominal_lower, |
| STRING(variant.builder) AS builder |
| FROM test_variant_segments_unexpected_realtime |
| WHERE project = @project AND ARRAY_LENGTH(segments) > 1 |
| ), |
| builder_regression_groups AS ( |
| SELECT |
| ref_hash AS RefHash, |
| ANY_VALUE(ref) AS Ref, |
| nominal_lower AS RegressionStartPosition, |
| nominal_upper AS RegressionEndPosition, |
| ARRAY_AGG(STRUCT( |
| test_id AS TestId, |
| variant_hash AS VariantHash, |
| variant AS Variant, |
| previous_failure_rate as StartPositionUnexpectedResultRate, |
| current_failure_rate as EndPositionUnexpectedResultRate |
| ) ORDER BY test_id, variant_hash) AS TestVariants, |
| ANY_VALUE(segments[0].start_hour) AS StartHour, |
| ANY_VALUE(segments[0].end_hour) AS EndHour |
| FROM segments_with_failure_rate |
| WHERE |
| current_failure_rate = 1 |
| -- The passing tail is allowed to be slightly non-deterministic, with failure rate less than 0.5%. |
| AND previous_failure_rate < 0.005 |
| AND segments[0].counts.unexpected_passed_results = 0 |
| AND segments[0].start_hour >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY) |
| -- We only consider test failures with non-skipped result in the last 24 hour. |
| AND segments[0].end_hour >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) |
| GROUP BY ref_hash, builder, nominal_lower, nominal_upper |
| ), |
| builder_regression_groups_with_latest_build AS ( |
| SELECT |
| v.buildbucket_build.builder.bucket, |
| v.buildbucket_build.builder.builder, |
| ANY_VALUE(g) AS regression_group, |
| ANY_VALUE(v.buildbucket_build.id HAVING MAX v.partition_time) AS build_id, |
| ANY_VALUE(REGEXP_EXTRACT(v.results[0].parent.id, r'^task-{{.SwarmingProject}}.appspot.com-([0-9a-f]+)$') HAVING MAX v.partition_time) AS swarming_run_id, |
| ANY_VALUE(COALESCE(b2.infra.swarming.task_dimensions, b2.infra.backend.task_dimensions, b.infra.swarming.task_dimensions, b.infra.backend.task_dimensions) HAVING MAX v.partition_time) AS task_dimensions, |
| ANY_VALUE(JSON_VALUE_ARRAY(b.input.properties, "$.sheriff_rotations") HAVING MAX v.partition_time) AS SheriffRotations, |
| ANY_VALUE(JSON_VALUE(b.input.properties, "$.builder_group") HAVING MAX v.partition_time) AS BuilderGroup, |
| FROM builder_regression_groups g |
| -- Join with test_verdict table to get the build id of the lastest build for a test variant. |
| LEFT JOIN test_verdicts v |
| ON g.testVariants[0].TestId = v.test_id |
| AND g.testVariants[0].VariantHash = v.variant_hash |
| AND g.RefHash = v.source_ref_hash |
| -- Join with buildbucket builds table to get the buildbucket related information for tests. |
| LEFT JOIN (select * from {{.BBTableName}} where create_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY)) b |
| ON v.buildbucket_build.id = b.id |
| -- JOIN with buildbucket builds table again to get task dimensions of parent builds. |
| LEFT JOIN (select * from {{.BBTableName}} where create_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY)) b2 |
| ON JSON_VALUE(b.input.properties, "$.parent_build_id") = CAST(b2.id AS string) |
| -- Filter by test_verdict.partition_time to only return test failures that have test verdict recently. |
| -- 3 days is chosen as we expect tests run at least once every 3 days if they are not disabled. |
| -- If this is found to be too restricted, we can increase it later. |
| WHERE v.partition_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY) AND v.project = @project |
| GROUP BY v.buildbucket_build.builder.bucket, v.buildbucket_build.builder.builder, g.testVariants[0].TestId, g.testVariants[0].VariantHash, g.RefHash |
| ) |
| {{- if .ExcludedPools}} |
| {{- template "withExcludedPools" .}} |
| {{- else}} |
| {{- template "withoutExcludedPools" .}} |
| {{- end -}} |
| ORDER BY regression_group.RegressionEndPosition DESC |
| LIMIT 5000 |
| {{- end}} |
| |
| {{- define "withoutExcludedPools"}} |
| SELECT regression_group.*, |
| bucket, |
| builder, |
| -- use empty array instead of null so we can read into []NullString. |
| IFNULL(SheriffRotations, []) as SheriffRotations |
| FROM builder_regression_groups_with_latest_build |
| WHERE {{.DimensionExcludeFilter}} AND (bucket NOT IN UNNEST(@excludedBuckets)) |
| -- We need to compare ARRAY_LENGTH with null because of unexpected Bigquery behaviour b/138262091. |
| AND ((BuilderGroup IN UNNEST(@allowedBuilderGroups)) OR ARRAY_LENGTH(@allowedBuilderGroups) = 0 OR ARRAY_LENGTH(@allowedBuilderGroups) IS NULL) |
| AND (BuilderGroup NOT IN UNNEST(@excludedBuilderGroups)) |
| {{end}} |
| |
| {{define "withExcludedPools"}} |
| SELECT regression_group.*, |
| bucket, |
| builder, |
| -- use empty array instead of null so we can read into []NullString. |
| IFNULL(SheriffRotations, []) as SheriffRotations |
| FROM builder_regression_groups_with_latest_build g |
| LEFT JOIN {{.SwarmingProject}}.swarming.task_results_run s |
| ON g.swarming_run_id = s.run_id |
| WHERE s.end_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY) |
| AND {{.DimensionExcludeFilter}} AND (bucket NOT IN UNNEST(@excludedBuckets)) |
| AND (s.bot.pools[0] NOT IN UNNEST(@excludedPools)) |
| -- We need to compare ARRAY_LENGTH with null because of unexpected Bigquery behaviour b/138262091. |
| AND ((BuilderGroup IN UNNEST(@allowedBuilderGroups)) OR ARRAY_LENGTH(@allowedBuilderGroups) = 0 OR ARRAY_LENGTH(@allowedBuilderGroups) IS NULL) |
| AND (BuilderGroup NOT IN UNNEST(@excludedBuilderGroups)) |
| {{end}} |
| `)) |
| |
| // NewClient creates a new client for reading test failures from LUCI Analysis. |
| // Close() MUST be called after you have finished using this client. |
| // GCP project where the query operations are billed to, either luci-bisection or luci-bisection-dev. |
| // luciAnalysisProject is the function that returns the gcp project that contains the BigQuery table we want to query. |
| func NewClient(ctx context.Context, gcpProject string, luciAnalysisProjectFunc func(luciProject string) string) (*Client, error) { |
| if gcpProject == "" { |
| return nil, errors.New("GCP Project must be specified") |
| } |
| if luciAnalysisProjectFunc == nil { |
| return nil, errors.New("LUCI Analysis Project function must be specified") |
| } |
| tr, err := auth.GetRPCTransport(ctx, auth.AsSelf, auth.WithScopes(bigquery.Scope)) |
| if err != nil { |
| return nil, err |
| } |
| client, err := bigquery.NewClient(ctx, gcpProject, option.WithHTTPClient(&http.Client{ |
| Transport: tr, |
| })) |
| if err != nil { |
| return nil, err |
| } |
| return &Client{ |
| client: client, |
| luciAnalysisProjectFunc: luciAnalysisProjectFunc, |
| }, nil |
| } |
| |
| // Client may be used to read LUCI Analysis test failures. |
| type Client struct { |
| client *bigquery.Client |
| // luciAnalysisProjectFunc is a function that return LUCI Analysis project |
| // given a LUCI Project. |
| luciAnalysisProjectFunc func(luciProject string) string |
| } |
| |
| // Close releases any resources held by the client. |
| func (c *Client) Close() error { |
| return c.client.Close() |
| } |
| |
| // BuilderRegressionGroup contains a list of test variants |
| // which use the same builder and have the same regression range. |
| type BuilderRegressionGroup struct { |
| Bucket bigquery.NullString |
| Builder bigquery.NullString |
| RefHash bigquery.NullString |
| Ref *Ref |
| RegressionStartPosition bigquery.NullInt64 |
| RegressionEndPosition bigquery.NullInt64 |
| TestVariants []*TestVariant |
| StartHour bigquery.NullTimestamp |
| EndHour bigquery.NullTimestamp |
| SheriffRotations []bigquery.NullString |
| } |
| |
| type Ref struct { |
| Gitiles *Gitiles |
| } |
| type Gitiles struct { |
| Host bigquery.NullString |
| Project bigquery.NullString |
| Ref bigquery.NullString |
| } |
| |
| type TestVariant struct { |
| TestID bigquery.NullString |
| VariantHash bigquery.NullString |
| Variant bigquery.NullJSON |
| StartPositionUnexpectedResultRate float64 |
| EndPositionUnexpectedResultRate float64 |
| } |
| |
| func (c *Client) ReadTestFailures(ctx context.Context, task *tpb.TestFailureDetectionTask, filter *configpb.FailureIngestionFilter) ([]*BuilderRegressionGroup, error) { |
| dimensionExcludeFilter := "(TRUE)" |
| if len(task.DimensionExcludes) > 0 { |
| dimensionExcludeFilter = "(NOT (SELECT LOGICAL_OR((SELECT count(*) > 0 FROM UNNEST(task_dimensions) WHERE KEY = kv.key and value = kv.value)) FROM UNNEST(@dimensionExcludes) kv))" |
| } |
| |
| queryStm, err := generateTestFailuresQuery(task, dimensionExcludeFilter, filter.ExcludedTestPools) |
| if err != nil { |
| return nil, errors.Annotate(err, "generate test failures query").Err() |
| } |
| q := c.client.Query(queryStm) |
| q.DefaultDatasetID = internalDatasetID |
| q.DefaultProjectID = c.luciAnalysisProjectFunc(task.Project) |
| q.Parameters = []bigquery.QueryParameter{ |
| {Name: "project", Value: task.Project}, |
| {Name: "dimensionExcludes", Value: task.DimensionExcludes}, |
| {Name: "excludedBuckets", Value: filter.GetExcludedBuckets()}, |
| {Name: "excludedPools", Value: filter.GetExcludedTestPools()}, |
| {Name: "allowedBuilderGroups", Value: filter.GetAllowedBuilderGroups()}, |
| {Name: "excludedBuilderGroups", Value: filter.GetExcludedBuilderGroups()}, |
| } |
| job, err := q.Run(ctx) |
| if err != nil { |
| return nil, errors.Annotate(err, "querying test failures").Err() |
| } |
| it, err := job.Read(ctx) |
| if err != nil { |
| return nil, err |
| } |
| groups := []*BuilderRegressionGroup{} |
| for { |
| row := &BuilderRegressionGroup{} |
| err := it.Next(row) |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return nil, errors.Annotate(err, "obtain next test failure group row").Err() |
| } |
| groups = append(groups, row) |
| } |
| return groups, nil |
| } |
| |
| func generateTestFailuresQuery(task *tpb.TestFailureDetectionTask, dimensionExcludeFilter string, excludedPools []string) (string, error) { |
| bbTableName, err := buildBucketBuildTableName(task.Project) |
| if err != nil { |
| return "", errors.Annotate(err, "buildBucketBuildTableName").Err() |
| } |
| |
| swarmingProject := "" |
| switch task.Project { |
| case "chromium": |
| swarmingProject = "chromium-swarm" |
| case "chrome": |
| swarmingProject = "chrome-swarming" |
| default: |
| return "", errors.Reason("couldn't get swarming project for project %s", task.Project).Err() |
| } |
| |
| var b bytes.Buffer |
| err = readFailureTemplate.ExecuteTemplate(&b, "basic", map[string]any{ |
| "SwarmingProject": swarmingProject, |
| "DimensionExcludeFilter": dimensionExcludeFilter, |
| "BBTableName": bbTableName, |
| "ExcludedPools": excludedPools, |
| }) |
| if err != nil { |
| return "", errors.Annotate(err, "execute template").Err() |
| } |
| return b.String(), nil |
| } |
| |
| const BuildBucketProject = "cr-buildbucket" |
| |
| // This returns a qualified BigQuary table name of the builds table |
| // in BuildBucket for a LUCI project. |
| // The table name is checked against SQL-Injection. |
| // Thus, it can be injected into a SQL query. |
| func buildBucketBuildTableName(luciProject string) (string, error) { |
| // Revalidate project as safeguard against SQL-Injection. |
| if err := util.ValidateProject(luciProject); err != nil { |
| return "", err |
| } |
| return fmt.Sprintf("%s.%s.builds", BuildBucketProject, luciProject), nil |
| } |
| |
| type BuildInfo struct { |
| BuildID int64 |
| StartCommitHash string |
| EndCommitHash string |
| } |
| |
| func (c *Client) ReadBuildInfo(ctx context.Context, tf *model.TestFailure) (BuildInfo, error) { |
| q := c.client.Query(` |
| SELECT |
| ANY_VALUE(buildbucket_build.id) AS BuildID, |
| ANY_VALUE(sources.gitiles_commit.commit_hash) AS CommitHash, |
| sources.gitiles_commit.position AS Position |
| FROM test_verdicts |
| WHERE project = @project |
| AND test_id = @testID |
| AND variant_hash = @variantHash |
| AND source_ref_hash = @refHash |
| AND buildbucket_build.builder.bucket = @bucket |
| AND buildbucket_build.builder.builder = @builder |
| AND sources.gitiles_commit.position in (@startPosition, @endPosition) |
| AND partition_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY) |
| GROUP BY sources.gitiles_commit.position |
| ORDER BY sources.gitiles_commit.position DESC |
| `) |
| q.DefaultDatasetID = internalDatasetID |
| q.DefaultProjectID = c.luciAnalysisProjectFunc(tf.Project) |
| q.Parameters = []bigquery.QueryParameter{ |
| {Name: "project", Value: tf.Project}, |
| {Name: "testID", Value: tf.TestID}, |
| {Name: "variantHash", Value: tf.VariantHash}, |
| {Name: "refHash", Value: tf.RefHash}, |
| {Name: "bucket", Value: tf.Bucket}, |
| {Name: "builder", Value: tf.Builder}, |
| {Name: "startPosition", Value: tf.RegressionStartPosition}, |
| {Name: "endPosition", Value: tf.RegressionEndPosition}, |
| } |
| job, err := q.Run(ctx) |
| if err != nil { |
| return BuildInfo{}, errors.Annotate(err, "querying test_verdicts").Err() |
| } |
| it, err := job.Read(ctx) |
| if err != nil { |
| return BuildInfo{}, err |
| } |
| rowVals := map[string]bigquery.Value{} |
| // First row is for regression end position. |
| err = it.Next(&rowVals) |
| if err != nil { |
| return BuildInfo{}, errors.Annotate(err, "read build info row for regression end position").Err() |
| } |
| // Make sure the first row is for the end position. |
| if rowVals["Position"].(int64) != tf.RegressionEndPosition { |
| return BuildInfo{}, errors.New("position should equal to RegressionEndPosition. this suggests something wrong with the query.") |
| } |
| buildInfo := BuildInfo{ |
| BuildID: rowVals["BuildID"].(int64), |
| EndCommitHash: rowVals["CommitHash"].(string), |
| } |
| // Second row is for regression start position. |
| err = it.Next(&rowVals) |
| if err != nil { |
| return BuildInfo{}, errors.Annotate(err, "read build info row for regression start position").Err() |
| } |
| // Make sure the second row is for the start position. |
| if rowVals["Position"].(int64) != tf.RegressionStartPosition { |
| return BuildInfo{}, errors.New("position should equal to RegressionStartPosition. this suggests something wrong with the query.") |
| } |
| buildInfo.StartCommitHash = rowVals["CommitHash"].(string) |
| return buildInfo, nil |
| } |
| |
| type TestVerdictKey struct { |
| TestID string |
| VariantHash string |
| RefHash string |
| } |
| |
| type TestVerdictResultRow struct { |
| TestID bigquery.NullString |
| VariantHash bigquery.NullString |
| RefHash bigquery.NullString |
| TestName bigquery.NullString |
| Status bigquery.NullString |
| } |
| |
| type TestVerdictResult struct { |
| TestName string |
| Status pb.TestVerdictStatus |
| } |
| |
| // ReadLatestVerdict queries LUCI Analysis for latest verdict. |
| // It supports querying for multiple keys at a time to save time and resources. |
| // Returns a map of TestVerdictKey -> latest verdict. |
| func (c *Client) ReadLatestVerdict(ctx context.Context, project string, keys []TestVerdictKey) (map[TestVerdictKey]TestVerdictResult, error) { |
| if len(keys) == 0 { |
| return nil, errors.New("no key specified") |
| } |
| err := validateTestVerdictKeys(keys) |
| if err != nil { |
| return nil, errors.Annotate(err, "validate keys").Err() |
| } |
| clauses := make([]string, len(keys)) |
| for i, key := range keys { |
| clauses[i] = fmt.Sprintf("(test_id = %q AND variant_hash = %q AND source_ref_hash = %q)", key.TestID, key.VariantHash, key.RefHash) |
| } |
| whereClause := fmt.Sprintf("(%s)", strings.Join(clauses, " OR ")) |
| |
| // We expect a test to have result in the last 3 days. |
| // Set the partition time to 3 days to reduce the cost. |
| query := ` |
| SELECT |
| test_id AS TestID, |
| variant_hash AS VariantHash, |
| source_ref_hash AS RefHash, |
| ARRAY_AGG ( |
| ( SELECT value FROM UNNEST(tv.results[0].tags) WHERE KEY = "test_name") |
| ORDER BY tv.partition_time DESC |
| LIMIT 1 |
| )[OFFSET(0)] AS TestName, |
| ANY_VALUE(status HAVING MAX tv.partition_time) AS Status |
| FROM test_verdicts tv |
| WHERE project = @project AND ` + whereClause + ` |
| AND partition_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY) |
| GROUP BY test_id, variant_hash, source_ref_hash |
| ` |
| logging.Infof(ctx, "Running query %s", query) |
| q := c.client.Query(query) |
| q.DefaultDatasetID = internalDatasetID |
| q.DefaultProjectID = c.luciAnalysisProjectFunc(project) |
| q.Parameters = []bigquery.QueryParameter{ |
| {Name: "project", Value: project}, |
| } |
| job, err := q.Run(ctx) |
| if err != nil { |
| return nil, errors.Annotate(err, "querying test name").Err() |
| } |
| it, err := job.Read(ctx) |
| if err != nil { |
| return nil, errors.Annotate(err, "read").Err() |
| } |
| results := map[TestVerdictKey]TestVerdictResult{} |
| for { |
| row := &TestVerdictResultRow{} |
| err := it.Next(row) |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return nil, errors.Annotate(err, "obtain next row").Err() |
| } |
| key := TestVerdictKey{ |
| TestID: row.TestID.String(), |
| VariantHash: row.VariantHash.String(), |
| RefHash: row.RefHash.String(), |
| } |
| results[key] = TestVerdictResult{ |
| TestName: row.TestName.String(), |
| Status: pb.TestVerdictStatus(pb.TestVerdictStatus_value[row.Status.String()]), |
| } |
| } |
| return results, nil |
| } |
| |
| type CountRow struct { |
| Count bigquery.NullInt64 |
| } |
| |
| // TestIsUnexpectedConsistently queries LUCI Analysis to see if a test is |
| // still unexpected deterministically since a commit position. |
| // This is to be called before we take a culprit action, in case a test |
| // status has changed. |
| func (c *Client) TestIsUnexpectedConsistently(ctx context.Context, project string, key TestVerdictKey, sinceCommitPosition int64) (bool, error) { |
| err := validateTestVerdictKeys([]TestVerdictKey{key}) |
| if err != nil { |
| return false, errors.Annotate(err, "validate keys").Err() |
| } |
| // If there is a row with counts.total_non_skipped > counts.unexpected_non_skipped, |
| // It means there are some expected non skipped results. |
| query := ` |
| SELECT |
| COUNT(*) as count |
| FROM test_verdicts |
| WHERE project = @project AND test_id = @testID AND variant_hash = @variantHash AND source_ref_hash = @refHash |
| AND counts.total_non_skipped > counts.unexpected_non_skipped |
| AND sources.gitiles_commit.position > @sinceCommitPosition |
| AND partition_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 DAY) |
| ` |
| logging.Infof(ctx, "Running query %s", query) |
| q := c.client.Query(query) |
| q.DefaultDatasetID = internalDatasetID |
| q.DefaultProjectID = c.luciAnalysisProjectFunc(project) |
| q.Parameters = []bigquery.QueryParameter{ |
| {Name: "project", Value: project}, |
| {Name: "testID", Value: key.TestID}, |
| {Name: "variantHash", Value: key.VariantHash}, |
| {Name: "refHash", Value: key.RefHash}, |
| {Name: "sinceCommitPosition", Value: sinceCommitPosition}, |
| } |
| |
| job, err := q.Run(ctx) |
| if err != nil { |
| return false, errors.Annotate(err, "running query").Err() |
| } |
| it, err := job.Read(ctx) |
| if err != nil { |
| return false, errors.Annotate(err, "read").Err() |
| } |
| row := &CountRow{} |
| err = it.Next(row) |
| if err == iterator.Done { |
| return false, errors.New("cannot get count") |
| } |
| if err != nil { |
| return false, errors.Annotate(err, "obtain next row").Err() |
| } |
| return row.Count.Int64 == 0, nil |
| } |
| |
| type ChangepointResult struct { |
| TestID string |
| VariantHash string |
| RefHash string |
| Segments []*Segment |
| } |
| type Segment struct { |
| StartPosition bigquery.NullInt64 |
| EndPosition bigquery.NullInt64 |
| CountTotalResults bigquery.NullInt64 |
| CountUnexpectedResults bigquery.NullInt64 |
| } |
| |
| func (c *Client) ChangepointAnalysisForTestVariant(ctx context.Context, project string, keys []TestVerdictKey) (map[TestVerdictKey]*ChangepointResult, error) { |
| err := validateTestVerdictKeys(keys) |
| if err != nil { |
| return nil, errors.Annotate(err, "validate keys").Err() |
| } |
| clauses := make([]string, len(keys)) |
| for i, key := range keys { |
| clauses[i] = fmt.Sprintf("(test_id = %q AND variant_hash = %q AND ref_hash = %q)", key.TestID, key.VariantHash, key.RefHash) |
| } |
| whereClause := fmt.Sprintf("(%s)", strings.Join(clauses, " OR ")) |
| query := ` |
| SELECT |
| test_id as TestID, |
| variant_hash as VariantHash, |
| ref_hash as RefHash, |
| (SELECT |
| ARRAY_AGG(STRUCT( |
| s.start_position as StartPosition, |
| s.end_position as EndPosition, |
| s.counts.total_results as CountTotalResults, |
| s.counts.unexpected_results as CountUnexpectedResults)) |
| FROM UNNEST(segments) s |
| ) AS Segments |
| FROM test_variant_segments_unexpected_realtime |
| WHERE project = @project AND ` + whereClause |
| logging.Infof(ctx, "Running query %s", query) |
| q := c.client.Query(query) |
| q.DefaultDatasetID = internalDatasetID |
| q.DefaultProjectID = c.luciAnalysisProjectFunc(project) |
| q.Parameters = []bigquery.QueryParameter{ |
| {Name: "project", Value: project}, |
| } |
| |
| job, err := q.Run(ctx) |
| if err != nil { |
| return nil, errors.Annotate(err, "running query").Err() |
| } |
| it, err := job.Read(ctx) |
| if err != nil { |
| return nil, errors.Annotate(err, "read").Err() |
| } |
| results := map[TestVerdictKey]*ChangepointResult{} |
| for { |
| row := &ChangepointResult{} |
| err := it.Next(row) |
| if err == iterator.Done { |
| break |
| } |
| if err != nil { |
| return nil, errors.Annotate(err, "obtain next changepoint row").Err() |
| } |
| key := TestVerdictKey{ |
| TestID: row.TestID, |
| VariantHash: row.VariantHash, |
| RefHash: row.RefHash, |
| } |
| results[key] = row |
| } |
| return results, nil |
| } |
| |
| func validateTestVerdictKeys(keys []TestVerdictKey) error { |
| for _, key := range keys { |
| if err := rdbpbutil.ValidateTestID(key.TestID); err != nil { |
| return err |
| } |
| if err := util.ValidateVariantHash(key.VariantHash); err != nil { |
| return err |
| } |
| if err := util.ValidateRefHash(key.RefHash); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |