blob: 5d9fada06bea389fd0747adbd45a19d45f6d06e5 [file] [log] [blame]
// Copyright 2023 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package swarming
import (
"context"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/logging"
swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
)
// WaitMode is passed to Get* functions and indicates how they should block.
type WaitMode int
const (
// NoWait means to fetch the current task status, do not wait for completion.
NoWait WaitMode = 0
// WaitAny means to wait until at least one task completes and then return
// the current state of all tasks: at least one of them will be completed, but
// possibly more (if multiple tasks complete at the same time).
WaitAny WaitMode = 1
// WaitAll means to wait until all tasks complete.
WaitAll WaitMode = 2
)
// GetOne returns the status of a single task, optionally waiting for it to
// complete first.
//
// If `mode` is NoWait, will just query the task status once, otherwise (if
// `mode` is WaitAny or WaitAll), will block until the task completes
// (successfully or not) or the context expires.
//
// On success returns task's status and a nil error. If `mode` is NoWait, it
// might be a non-final status (i.e. PENDING or RUNNING). In other modes this
// will always be a final status.
//
// Exits early if the context expires or if RPCs to Swarming fail with a fatal
// error code. Returns a non-nil error in this case. It will either be a gRPC
// error or a context error. If an error happened after the function fetched
// task's state at least once, returns the latest fetched state (which will be
// either PENDING or RUNNING) as well.
//
// Retries on transient errors until the context expires.
//
// Panics if `taskID` is an empty string.
func GetOne(ctx context.Context, client Client, taskID string, fields *TaskResultFields, mode WaitMode) (*swarmingv2.TaskResultResponse, error) {
if taskID == "" {
panic("taskID should not be empty")
}
startTime := clock.Now(ctx)
var lastKnown *swarmingv2.TaskResultResponse
for {
switch res, err := client.TaskResult(ctx, taskID, fields); {
case err != nil && ctx.Err() != nil:
// Prefer local context errors over gRPC's DEADLINE_EXCEEDED error.
return lastKnown, ctx.Err()
case err != nil && isFatalError(err):
// Abort only on fatal errors such as PERMISSION_DENIED.
return lastKnown, err
case err == nil:
lastKnown = res
if mode == NoWait || final(res.State) {
return res, nil
}
}
// If the task is still not final or we got a transient error, retry later.
if err := sleep(ctx, startTime, 1); err != nil {
return lastKnown, err
}
}
}
// GetMany queries status of all given tasks, optionally waiting for them to
// complete.
//
// This is a variant of GetOne which uses batch RPCs to reduce overhead. It
// calls the callback with result of each task as soon as it is available.
// See GetOne doc for the semantics of values passed to the callback.
//
// If `mode` NoWait, will fetch the current state of all tasks and pass them to
// the callback. If `mode` is WaitAny, will block until at least one task
// completes, and then pass states of all tasks to the callback. If `mode` is
// WaitAll, will block until all tasks complete, calling the callback whenever
// it detects any new completions.
//
// The callback will always be called exactly `len(taskIDs)` times, once per
// each task, in undefined order. The callback should not block. If it needs to
// do further processing, it should do it asynchronously.
//
// Retries on transient errors until the context expires.
//
// Panics if `taskIDs` is empty or any of given task IDs is an empty string.
func GetMany(ctx context.Context, client Client, taskIDs []string, fields *TaskResultFields, mode WaitMode, sink func(taskID string, res *swarmingv2.TaskResultResponse, err error)) {
if len(taskIDs) == 0 {
panic("need at least one task to wait for")
}
startTime := clock.Now(ctx)
// Current state. Keys are task IDs, values are:
// * ResultOrErr{Result: nil, Err: nil} if haven't got any information yet.
// * ResultOrErr{Result: ..., Err: nil} if the task is not final yet.
// * ResultOrErr{Result: nil, Err: ...} if failed to fetch the task result.
awaiting := make(map[string]ResultOrErr, len(taskIDs))
for _, taskID := range taskIDs {
awaiting[taskID] = ResultOrErr{}
}
// Sends all remaining results to the sink, optionally setting the error.
flushRemaining := func(err error) {
for taskID, res := range awaiting {
if err != nil {
sink(taskID, res.Result, err)
} else {
if res.Result == nil && res.Err == nil {
panic("should not be possible")
}
sink(taskID, res.Result, res.Err)
}
delete(awaiting, taskID)
}
}
awaitingIDs := make([]string, 0, len(taskIDs))
for {
// Collect task IDs we still waiting for. Reuse the slice across iterations.
awaitingIDs = awaitingIDs[:0]
for taskID := range awaiting {
awaitingIDs = append(awaitingIDs, taskID)
}
// Ask the backend for status of all tasks.
switch res, err := client.TaskResults(ctx, awaitingIDs, fields); {
case err != nil && ctx.Err() != nil:
// Prefer local context errors over gRPC's DEADLINE_EXCEEDED error.
flushRemaining(ctx.Err())
return
case err != nil && isFatalError(err):
// Abort only on fatal errors such as PERMISSION_DENIED.
flushRemaining(err)
return
case err == nil:
// Update last known task results and errors.
for i, taskID := range awaitingIDs {
lastKnown := awaiting[taskID]
switch fetched := res[i]; {
case fetched.Result != nil:
lastKnown.Result = fetched.Result
case fetched.Err != nil:
lastKnown.Err = fetched.Err
default:
panic("impossible empty result in TaskResults response")
}
awaiting[taskID] = lastKnown
}
// If not asked to wait at all, we are done.
if mode == NoWait {
flushRemaining(nil)
return
}
}
// Flush completed or erroneous tasks.
completed := 0
for taskID, res := range awaiting {
done := res.Result != nil && final(res.Result.State)
if res.Err != nil || done {
sink(taskID, res.Result, res.Err)
delete(awaiting, taskID)
if done {
completed++
}
}
}
switch {
case len(awaiting) == 0:
// If there's nothing to wait for, we are done.
return
case mode == WaitAny && completed != 0:
// If were waiting for at least one task and got it, we are done.
flushRemaining(nil)
return
default:
// If still have tasks or had a transient error, sleep and retry.
if err := sleep(ctx, startTime, len(awaiting)); err != nil {
flushRemaining(err)
return
}
}
}
}
// isFatalError returns true if this gRPC client error is fatal.
func isFatalError(err error) bool {
switch status.Code(err) {
case codes.OK,
codes.Internal,
codes.Unknown,
codes.Unavailable,
codes.DeadlineExceeded,
codes.Canceled:
return false
default:
return true
}
}
// final is true if the task is in some final state.
func final(state swarmingv2.TaskState) bool {
return state != swarmingv2.TaskState_PENDING && state != swarmingv2.TaskState_RUNNING
}
// sleep sleeps a bit between retry loop iterations.
func sleep(ctx context.Context, startTime time.Time, taskCount int) error {
// Start with a 1 second delay and for each 30 seconds of waiting add
// another second until hitting the ceiling.
delay := time.Second + clock.Since(ctx, startTime)/30
if delay >= 15*time.Second {
delay = 15 * time.Second
}
// Randomize delay to avoid sending RPCs in a lockstep.
delay += time.Duration(mathrand.Int63n(ctx, int64(delay/3)))
// This will abort early if the context expires.
logging.Debugf(ctx, "Sleeping %s, waiting for %d tasks", delay.Round(100*time.Millisecond), taskCount)
clock.Sleep(ctx, delay)
return ctx.Err()
}