blob: 0f9c8aeb6abf92947f2854a88f0deb63d75d202c [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package swarmingimpl
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"slices"
"strings"
"time"
"github.com/maruel/subcommands"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/protojson"
"go.chromium.org/luci/client/cmd/swarming/swarmingimpl/base"
"go.chromium.org/luci/client/cmd/swarming/swarmingimpl/clipb"
"go.chromium.org/luci/client/cmd/swarming/swarmingimpl/output"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/data/stringset"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/swarming/client/swarming"
swarmingv2 "go.chromium.org/luci/swarming/proto/api_v2"
)
type taskOutputOption []string
func (t taskOutputOption) includesJSON() bool {
return slices.Contains(t, "json")
}
func (t taskOutputOption) includesConsole() bool {
return slices.Contains(t, "console")
}
func (t taskOutputOption) includesDir() (path string, ok bool) {
for _, v := range t {
if v == "dir" {
return "", true
}
if path, ok := strings.CutPrefix(v, "dir:"); ok {
return path, true
}
}
return "", false
}
func (t taskOutputOption) String() string {
if len(t) == 0 {
return "none"
}
return strings.Join(t, ",")
}
func (t *taskOutputOption) Set(s string) error {
if slices.Contains(*t, s) {
return nil
}
switch {
case s == "none" || s == "":
// Nothing.
case s == "console":
*t = append(*t, s)
case s == "json":
*t = append(*t, s)
case s == "dir" || strings.HasPrefix(s, "dir:"):
if _, yes := t.includesDir(); yes {
return errors.Reason("cannot have more than one \"dir\" destination").Err()
}
*t = append(*t, s)
case s == "all":
_ = t.Set("console")
_ = t.Set("json")
default:
return errors.Reason("invalid task output option").Err()
}
return nil
}
var taskIDRe = regexp.MustCompile("^[a-f0-9]+$")
// taskResult is a consolidation of the results of packaging up swarming
// task results from collect.
type taskResult struct {
// taskID is the ID of the swarming task for which this results were retrieved.
taskID string
// result is the raw result structure returned by a swarming RPC call.
// result may be nil if err is non-nil.
result *swarmingv2.TaskResultResponse
// output is the console output produced by the swarming task.
// output will only be populated if requested (nil otherwise).
output *textOutput
// outputs is a list of file outputs from a task, downloaded from an isolate server.
// outputs will only be populated if requested.
outputs []string
// err is set if an operational error occurred while doing RPCs to gather the
// task result, which includes errors received from the server.
err error
// summaryLogged is true if we already logged the task summary.
summaryLogged bool
}
// SummaryLine is a short summary of task state for logs.
func (t *taskResult) SummaryLine() string {
if t.err != nil {
return fmt.Sprintf("%s: %s", t.taskID, t.err)
}
if t.result.State == swarmingv2.TaskState_COMPLETED {
return fmt.Sprintf("%s: COMPLETED, exit code %d", t.taskID, t.result.ExitCode)
}
return fmt.Sprintf("%s: %s", t.taskID, t.result.State)
}
// logSummary logs the task summary if it hasn't been logged before.
func (t *taskResult) logSummary(ctx context.Context) {
if !t.summaryLogged {
t.summaryLogged = true
if t.err != nil {
logging.Warningf(ctx, "%s", t.SummaryLine())
} else {
logging.Infof(ctx, "%s", t.SummaryLine())
}
}
}
// textOutput is a container for the fetched task's console output.
//
// Task console output can be huge (hundreds of megabytes). We store it in a
// file to avoid OOMs. When `-task-output-stdout dir` is set, this is the final
// output file returned to the caller. Otherwise it is some temporary file
// deleted after we are done with it.
//
// Assumes `fetch` is called before `dump` and no calls are happening
// concurrently.
type textOutput struct {
file *os.File // the backing file open in RW mode
temp bool // if true, delete the file after closing it
}
func (t *textOutput) close() error {
var merr errors.MultiError
merr.MaybeAdd(t.file.Close())
if t.temp {
merr.MaybeAdd(os.Remove(t.file.Name()))
}
return merr.AsError()
}
func (t *textOutput) fetch(ctx context.Context, svc swarming.Client, taskID string) error {
_, err := svc.TaskOutput(ctx, taskID, t.file)
return err
}
func (t *textOutput) dump(out io.Writer) (n int64, err error) {
if _, err := t.file.Seek(0, io.SeekStart); err != nil {
return 0, err
}
return io.Copy(out, t.file)
}
func (t *textOutput) dumpToUTF8() (string, error) {
var buf strings.Builder
if _, err := t.dump(&buf); err != nil {
return "", err
}
return strings.ToValidUTF8(buf.String(), "\uFFFD"), nil
}
// CmdCollect returns an object for the `collect` subcommand.
func CmdCollect(authFlags base.AuthFlags) *subcommands.Command {
return &subcommands.Command{
UsageLine: "collect -S <server> -requests-json <path> [<task ID> <task ID> ...])",
ShortDesc: "waits on a set of Swarming tasks",
LongDesc: `Waits on a set of Swarming tasks given either as task IDs or via a file produced by \"trigger\" subcommand.
Behavior depends on combination of -eager and -wait flags:
* -wait is set and -eager is unset (default): wait for all tasks to complete
and report their results.
* -wait is set and -eager is set: wait for at least one task to complete and
then report the current state of all tasks. It will be a mix of pending
and completed tasks (with at least one, but perhaps more, task completed).
* -wait is unset (regardless if -eager is set or not): report the current
state of all tasks, don't wait. It will be a mix of pending and completed
tasks with no other guarantees.
The JSON output will always have entries for all requested tasks. Each entry
contains the last known state of the task (in "results" field) if it was fetched
at least once and, possibly, an error message (in "error" field) if there was
an error fetching the state.
Note that "error" field reports only local errors. If a task itself failed
remotely, but this outcome was successfully fetched, then "error" field will be
unset, and the task's failure will be communicated via "results" object (in
particular its "state" field).
If -wait is set, will wait for at most -timeout duration or until SIGTERM. Upon
hitting the timeout the JSON entries of all still pending or running tasks will
contain literal "rpc_timeout" value in their "error" fields. Similarly, if
waiting was aborted by SIGTERM, the "error" field will contain "rpc_canceled"
value.
Flag -task-output-stdout controls where to dump the console log of completed
tasks. Can be specified multiple times to emit the log into multiple places.
Its possible values:
* "none" (default): don't fetch the console log at all.
* "console": dump the log to stdout.
* "json": dump the log into the JSON output (in "result.output" field).
* "dir:<path>": dump the log into <path>/<task-ID>.txt.
* "dir": dump the log into <output-dir>/<task-ID>.txt (see -output-dir flag).
* "all": a legacy alias for combination of "console" and "json".
Flag -output-dir controls where to store isolated outputs of completed tasks,
as well as tasks' console log (when using "-task-output-stdout dir" flag). If it
is unset (default), isolated outputs will not be fetched. Otherwise isolated
outputs of a completed task with ID <task-ID> will be downloaded to
<output-dir>/<task-ID> directory. If such directory already exists, it will be
cleared first. If a task has no isolated outputs or it has not completed yet,
its output directory will be empty. The JSON output will contain a list of
downloaded files (relative to the task output directory) in "result.outputs"
field.
`,
CommandRun: func() subcommands.CommandRun {
return base.NewCommandRun(authFlags, &collectImpl{}, base.Features{
MinArgs: 0,
MaxArgs: base.Unlimited,
MeasureDuration: true,
UsesCAS: true,
OutputJSON: base.OutputJSON{
Enabled: true,
DeprecatedAliasFlag: "task-summary-json",
Usage: "A file to write a summary of task results as json.",
DefaultToStdout: false,
},
})
},
}
}
type collectImpl struct {
taskIDs []string
wait bool
timeout time.Duration
taskSummaryPython bool
taskOutput taskOutputOption
outputDir string
textOutputDir string // where to store console output or "" for temp
eager bool
perf bool
jsonInput string
outputFetchConcurrency int
}
func (cmd *collectImpl) RegisterFlags(fs *flag.FlagSet) {
fs.BoolVar(&cmd.wait, "wait", true, "If set, wait for tasks to complete. Otherwise just poll their current state.")
fs.DurationVar(&cmd.timeout, "timeout", 0, "Timeout to wait for tasks to complete when -wait is set. Set to 0 for no timeout.")
fs.BoolVar(&cmd.eager, "eager", false, "If set, stop waiting whenever any task finishes, do not wait for all of them to finish.")
//TODO(tikuta): Remove this flag once crbug.com/894045 is fixed.
fs.BoolVar(&cmd.taskSummaryPython, "task-summary-python", false, "Generate python client compatible task summary json.")
fs.BoolVar(&cmd.perf, "perf", false, "Include performance statistics.")
fs.Var(&cmd.taskOutput, "task-output-stdout", "Where to put each task's console output (combined stderr and stdout): none, json, console, dir[:<path>], all (a legacy alias for json+console). Can be specified multiple times.")
fs.StringVar(&cmd.outputDir, "output-dir", "", "Where to download outputs to.")
fs.StringVar(&cmd.jsonInput, "requests-json", "", "Load the task IDs from a .json file as saved by \"trigger -json-output\".")
fs.IntVar(&cmd.outputFetchConcurrency, "output-fetch-concurrency", 8, "Limits how many concurrent result fetches are allowed (to avoid OOMs). 0 is unlimited.")
}
func (cmd *collectImpl) ParseInputs(args []string, env subcommands.Env) error {
if cmd.timeout < 0 {
return errors.Reason("negative timeout is not allowed").Err()
}
if !cmd.wait && cmd.timeout > 0 {
return errors.Reason("do not specify -timeout with -wait=false").Err()
}
// Figure out where to store files with tasks' stdout.
var ok bool
cmd.textOutputDir, ok = cmd.taskOutput.includesDir()
if ok {
if cmd.textOutputDir == "" {
cmd.textOutputDir = cmd.outputDir
}
if cmd.textOutputDir == "" {
return errors.Reason(
"cannot figure out where to store task console output: " +
"either specify the directory as `-task-output-stdout dir:<path>` " +
"(if only console output is required) or pass `-output-dir` " +
"(if both text and isolated output are required)",
).Err()
}
}
// Collect all task IDs to wait on.
cmd.taskIDs = args
if cmd.jsonInput != "" {
data, err := os.ReadFile(cmd.jsonInput)
if err != nil {
return errors.Annotate(err, "reading json input").Err()
}
var tasks clipb.SpawnTasksOutput
if err := protojson.Unmarshal(data, &tasks); err != nil {
return errors.Annotate(err, "unmarshalling json input").Err()
}
for _, task := range tasks.Tasks {
cmd.taskIDs = append(cmd.taskIDs, task.TaskId)
}
}
if len(cmd.taskIDs) == 0 {
return errors.Reason("must specify at least one task id, either directly or through -requests-json").Err()
}
// Verify they all look like Swarming task IDs.
//
// TODO(vadimsh): Extract and reuse in other subcommands.
for _, taskID := range cmd.taskIDs {
if !taskIDRe.MatchString(taskID) {
return errors.Reason("task ID %q must be hex ([a-f0-9])", taskID).Err()
}
}
// Verify there are no duplicates. This may break some map look ups.
seen := stringset.New(len(cmd.taskIDs))
for _, taskID := range cmd.taskIDs {
if !seen.Add(taskID) {
return errors.Reason("task ID %s is given more than once", taskID).Err()
}
}
return nil
}
// fetchTaskResults updates `res` in-place with outputs of the task.
func (cmd *collectImpl) fetchTaskResults(ctx context.Context, svc swarming.Client, res *taskResult) {
// Prepare the output directory, even if the task failed or is still running.
// It will be empty in this case, signifying the task produced no outputs.
outputDir := ""
if cmd.outputDir != "" {
var outErr error
outputDir, outErr = prepareOutputDir(cmd.outputDir, res.taskID)
if outErr != nil && res.err == nil {
res.err = outErr
}
}
// If asked to fetch task's console output, prepare the storage file for it.
wantConsoleOut := len(cmd.taskOutput) != 0
if wantConsoleOut {
var outErr error
res.output, outErr = prepareTextOutput(cmd.textOutputDir, res.taskID)
if outErr != nil && res.err == nil {
res.err = outErr
}
}
// If failed to fetch the task status (or create the output directory), don't
// even bother to fetch the results.
if res.err != nil {
res.err = normalizeCtxErr(res.err)
res.logSummary(ctx)
return
}
if res.result.State == swarmingv2.TaskState_PENDING || res.result.State == swarmingv2.TaskState_RUNNING {
res.logSummary(ctx)
return
}
eg, ectx := errgroup.WithContext(ctx)
// Fetch combined stderr/stdout (aka console) output if asked for it.
if wantConsoleOut {
eg.Go(func() error {
logging.Debugf(ectx, "%s: fetching console output", res.taskID)
if err := res.output.fetch(ectx, svc, res.taskID); err != nil {
return errors.Annotate(err, "fetching console output of %s", res.taskID).Err()
}
return nil
})
}
// Fetch isolated files if asked for them and the task has them.
wantIsolatedOut := outputDir != "" && res.result.CasOutputRoot != nil
if wantIsolatedOut {
eg.Go(func() error {
logging.Debugf(ectx, "%s: fetching isolated output", res.taskID)
output, err := svc.FilesFromCAS(ectx, outputDir, &swarmingv2.CASReference{
CasInstance: res.result.CasOutputRoot.CasInstance,
Digest: &swarmingv2.Digest{
Hash: res.result.CasOutputRoot.Digest.Hash,
SizeBytes: res.result.CasOutputRoot.Digest.SizeBytes,
},
})
if err != nil {
return errors.Annotate(err, "fetching isolated output of %s", res.taskID).Err()
}
res.outputs = output
return nil
})
}
if wantConsoleOut || wantIsolatedOut {
res.err = normalizeCtxErr(eg.Wait())
if res.err != nil && ctx.Err() != nil {
// When the root context expires, `res.err` may end up having all sorts of
// errors depending on what exactly was happening when the context
// expired. Use a cleaner context error in that case.
res.err = normalizeCtxErr(ctx.Err())
}
if res.err == nil {
logging.Debugf(ctx, "%s: finished fetching outputs", res.taskID)
}
}
res.logSummary(ctx)
}
// normalizeCtxErr replaces context errors with ones that serialize to
// documented values.
func normalizeCtxErr(err error) error {
switch {
case errors.Is(err, context.DeadlineExceeded):
return errors.New("rpc_timeout")
case errors.Is(err, context.Canceled):
return errors.New("rpc_canceled")
default:
return err
}
}
// prepareOutputDir creates the directory for storing isolated outputs.
func prepareOutputDir(outputDir, taskID string) (string, error) {
// This should never happen, but check anyway since we do not want to
// accidentally delete all of `outputDir`.
if taskID == "" {
panic("should never happen")
}
// Create a task-id-based subdirectory to house the outputs.
dir := filepath.Join(filepath.Clean(outputDir), taskID)
// The call can theoretically be retried. In this case the directory will
// already exist and may contain partial results. Take no chance and restart
// from scratch.
if err := os.RemoveAll(dir); err != nil {
return "", errors.Annotate(err, "failed to remove directory: %s", dir).Err()
}
if err := os.MkdirAll(dir, 0777); err != nil {
return "", errors.Annotate(err, "failed to create directory: %s", dir).Err()
}
return dir, nil
}
// prepareTextOutput creates a file for storing task's console output.
//
// If `outputDir` is empty, will use a temporary file.
func prepareTextOutput(outputDir, taskID string) (*textOutput, error) {
if outputDir == "" {
file, err := os.CreateTemp("", fmt.Sprintf("swarming_%s_*.txt", taskID))
if err != nil {
return nil, errors.Annotate(err, "failed to create a temp file for storing console output").Err()
}
return &textOutput{file: file, temp: true}, nil
}
if err := os.MkdirAll(outputDir, 0777); err != nil {
return nil, errors.Annotate(err, "failed to create directory: %s", outputDir).Err()
}
file, err := os.Create(filepath.Join(outputDir, taskID+".txt"))
if err != nil {
return nil, errors.Annotate(err, "failed to create a file for storing console output").Err()
}
return &textOutput{file: file}, nil
}
func (cmd *collectImpl) Execute(ctx context.Context, svc swarming.Client, sink *output.Sink, extra base.Extra) error {
// The context used for waiting for task completion.
var wctx context.Context
var wcancel context.CancelFunc
if cmd.timeout > 0 {
wctx, wcancel = clock.WithTimeout(ctx, cmd.timeout)
} else {
wctx, wcancel = context.WithCancel(ctx)
}
defer wcancel()
var mode swarming.WaitMode
switch {
case cmd.wait && cmd.eager:
mode = swarming.WaitAny
case cmd.wait && !cmd.eager:
mode = swarming.WaitAll
default:
mode = swarming.NoWait
}
fields := swarming.TaskResultFields{
WithPerf: cmd.perf,
}
// A limiter on number of concurrent fetches to avoid OOMs.
acquireSlot := func() error { return nil }
releaseSlot := func() {}
if cmd.outputFetchConcurrency > 0 {
sem := semaphore.NewWeighted(int64(cmd.outputFetchConcurrency))
acquireSlot = func() error { return sem.Acquire(ctx, 1) }
releaseSlot = func() { sem.Release(1) }
}
// Collect statuses of all tasks and start fetching their results as soon
// as they are available, in parallel. Fetch results using the root `ctx`
// (to not be affected by -timeout, which is a *waiting* timeout). Call
// GetMany in a background goroutine in order to start reading from
// `resultsCh` below in parallel (to report results as soon as they are
// available).
resultsCh := make(chan taskResult)
go func() {
swarming.GetMany(wctx, svc, cmd.taskIDs, &fields, mode, func(taskID string, res *swarmingv2.TaskResultResponse, err error) {
go func() {
taskRes := taskResult{taskID: taskID, result: res, err: err}
if acqErr := acquireSlot(); acqErr != nil {
taskRes.err = normalizeCtxErr(acqErr)
} else {
cmd.fetchTaskResults(ctx, svc, &taskRes)
releaseSlot()
}
resultsCh <- taskRes
}()
})
}()
// TODO(crbug.com/894045): Get rid of taskSummaryPython mode.
var emitter summaryEmitter
switch {
case extra.OutputJSON == "":
emitter = noopSummaryEmitter{}
case cmd.taskSummaryPython:
emitter = &legacySummaryEmitter{
sink: sink,
populateStdout: cmd.taskOutput.includesJSON(),
taskIDs: cmd.taskIDs,
resultByID: make(map[string]*taskResult, len(cmd.taskIDs)),
}
default:
emitter = &defaultSummaryEmitter{
sink: sink,
populateStdout: cmd.taskOutput.includesJSON(),
}
}
// All errors dealing with the output.
var outputErrs errors.MultiError
// Wait for all fetchTaskResults(...) calls to complete. Emit their output
// as soon as it is available.
emitter.start(&outputErrs)
for i := 0; i < len(cmd.taskIDs); i++ {
res := <-resultsCh
res.logSummary(ctx) // might be context cancellation, log it
if cmd.taskOutput.includesConsole() {
fmt.Fprintln(extra.Stdout, res.SummaryLine())
if res.output != nil {
switch written, err := res.output.dump(extra.Stdout); {
case err != nil:
outputErrs.MaybeAdd(errors.Annotate(err, "emitting stdout of %q", res.taskID).Err())
case written != 0:
fmt.Fprintln(extra.Stdout)
}
}
}
emitter.emit(&res, &outputErrs)
}
emitter.finish(&outputErrs)
return outputErrs.AsError()
}
////////////////////////////////////////////////////////////////////////////////
// summaryEmitters knows how to write task result entries to the JSON output.
//
// Takes ownership of *taskResult passed to it. Writes all errors into the
// given MultiError.
type summaryEmitter interface {
start(merr *errors.MultiError)
emit(res *taskResult, merr *errors.MultiError)
finish(merr *errors.MultiError)
}
// Just closes outputs without reading them.
type noopSummaryEmitter struct{}
func (noopSummaryEmitter) start(merr *errors.MultiError) {}
func (noopSummaryEmitter) emit(res *taskResult, merr *errors.MultiError) {
if res.output != nil {
merr.MaybeAdd(errors.Annotate(res.output.close(), "closing console output of %q", res.taskID).Err())
}
}
func (noopSummaryEmitter) finish(merr *errors.MultiError) {}
// The non-legacy summary format is an unordered dict. We can write entries
// for it in any order. This allows us to "forget" them (and free memory
// allocated for task's stdout) as soon as possible.
type defaultSummaryEmitter struct {
sink *output.Sink
populateStdout bool
}
func (e *defaultSummaryEmitter) start(merr *errors.MultiError) {
merr.MaybeAdd(output.StartMap(e.sink))
}
func (e *defaultSummaryEmitter) emit(res *taskResult, merr *errors.MultiError) {
entry := &clipb.ResultSummaryEntry{
Results: res.result,
Outputs: res.outputs,
}
if res.err != nil {
entry.Error = res.err.Error()
if entry.Error == "" {
entry.Error = "unknown"
}
}
if e.populateStdout && res.result != nil && res.output != nil {
var err error
entry.Output, err = res.output.dumpToUTF8()
merr.MaybeAdd(errors.Annotate(err, "reading task output %q", res.taskID).Err())
}
if res.output != nil {
merr.MaybeAdd(errors.Annotate(res.output.close(), "closing console output of %q", res.taskID).Err())
}
err := output.MapEntry(e.sink, res.taskID, entry)
merr.MaybeAdd(errors.Annotate(err, "writing JSON output for task %q", res.taskID).Err())
}
func (e *defaultSummaryEmitter) finish(merr *errors.MultiError) {}
// Legacy Python summary is a list of proto message ordered in the same order
// as `cmd.taskIDs`. We get results from the channel in some arbitrary order.
// It means we'll generally have to buffer them all before we can write them.
type legacySummaryEmitter struct {
sink *output.Sink
populateStdout bool
taskIDs []string // task IDs in order we need to write them
resultByID map[string]*taskResult
}
func (e *legacySummaryEmitter) start(merr *errors.MultiError) {
// All output is emitted in finish all at once.
}
func (e *legacySummaryEmitter) emit(res *taskResult, merr *errors.MultiError) {
e.resultByID[res.taskID] = res
}
func (e *legacySummaryEmitter) finish(merr *errors.MultiError) {
shards := make([]map[string]any, 0, len(e.taskIDs))
for _, taskID := range e.taskIDs {
result := e.resultByID[taskID]
if result.result == nil {
// This means there was an error fetching the task. Note that python
// results format has no way to communicate errors. We just write `null`
// into the corresponding slot to indicate the task is not ready.
shards = append(shards, nil)
continue
}
// Convert TaskResultResponse proto to a free-form map[string]any to inject
// `output` as an extra field not present in the original proto.
buf, err := (protojson.MarshalOptions{UseProtoNames: true}).Marshal(result.result)
if err != nil {
merr.MaybeAdd(errors.Annotate(err, "JSON serializing results of %q", taskID).Err())
shards = append(shards, nil) // indicates there was an error
continue
}
var jsonResult map[string]any
if err := json.Unmarshal(buf, &jsonResult); err != nil {
merr.MaybeAdd(errors.Annotate(err, "JSON deserializing results of %q", taskID).Err())
shards = append(shards, nil) // indicates there was an error
continue
}
// Load output as UTF-8 and put into the JSON struct.
jsonResult["output"] = ""
if e.populateStdout && result.output != nil {
jsonResult["output"], err = result.output.dumpToUTF8()
merr.MaybeAdd(errors.Annotate(err, "reading task output %q", taskID).Err())
}
// Report the completed task result.
shards = append(shards, jsonResult)
}
// Close all outputs, we don't need them anymore.
for _, res := range e.resultByID {
if res.output != nil {
merr.MaybeAdd(errors.Annotate(res.output.close(), "closing console output of %q", res.taskID).Err())
}
}
// Finally write the combined JSON summary for all shards.
err := output.JSON(e.sink, map[string]any{"shards": shards})
merr.MaybeAdd(errors.Annotate(err, "writing JSON summary").Err())
}