blob: d64810672c520802ed35a012ca3694e6a38f8a68 [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buildbucket
import (
"context"
"time"
"google.golang.org/api/googleapi"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry"
"go.chromium.org/luci/common/retry/transient"
)
// Fetch fetches builds matching the search criteria.
// It stops when all builds are found or when context is cancelled.
// The order of returned builds is from most-recently-created to least-recently-created.
//
// c.MaxBuilds value is used as a result page size, defaults to 100.
// limit, if >0, specifies the maximum number of builds to return.
//
// If ret is nil, retries transient errors with exponential back-off.
// Logs errors on retries.
//
// Returns nil only if the search results are exhausted.
// May return context.Canceled.
func (c *SearchCall) Fetch(limit int, ret retry.Factory) ([]*LegacyApiCommonBuildMessage, string, error) {
// Default page size to 100 because we are fetching everything.
maxBuildsKey := "max_builds"
origMaxBuilds := c.urlParams_.Get(maxBuildsKey)
if origMaxBuilds == "" {
c.MaxBuilds(100)
defer c.urlParams_.Set(maxBuildsKey, origMaxBuilds)
}
ch := make(chan *LegacyApiCommonBuildMessage)
var err error
var cursor string
go func() {
defer close(ch)
cursor, err = c.Run(ch, limit, ret)
}()
var builds []*LegacyApiCommonBuildMessage
for b := range ch {
builds = append(builds, b)
}
return builds, cursor, err
}
// Run is like Fetch, but sends results to a channel and the default page size
// is defined by the server (10 as of Sep 2017).
//
// Run blocks on sending.
func (c *SearchCall) Run(builds chan<- *LegacyApiCommonBuildMessage, limit int, ret retry.Factory) (cursor string, err error) {
if ret == nil {
ret = transient.Only(retry.Default)
}
// We will be mutating c.
// Guarantee that it will remain the same by the time function exits.
origCtx := c.ctx_
const (
cursorKey = "start_cursor"
fieldsKey = "fields"
)
origCursor := c.urlParams_.Get(cursorKey)
origFields := c.urlParams_.Get(fieldsKey)
defer func() {
// Use the low-level API to be consistent with reads.
c.ctx_ = origCtx
c.urlParams_.Set(cursorKey, origCursor)
c.urlParams_.Set(fieldsKey, origFields)
}()
// ensure "next_cursor" is included
c.urlParams_.Set(fieldsKey, googleapi.CombineFields([]googleapi.Field{
googleapi.Field(origFields),
"next_cursor",
}))
// Make a non-nil context used by default in this function.
ctx := origCtx
if ctx == nil {
// won't happen on AppEngine in practice.
ctx = context.Background()
}
sent := 0
outer:
for {
var res *LegacyApiSearchResponseMessage
err = retry.Retry(ctx, ret,
func() error {
var err error
// Set a timeout for this particular RPC.
var cancel context.CancelFunc
c.ctx_, cancel = context.WithTimeout(ctx, time.Minute)
defer cancel()
res, err = c.Do()
c.ctx_ = origCtx // for code clarity only
switch apiErr, _ := err.(*googleapi.Error); {
case apiErr != nil && apiErr.Code >= 500:
return transient.Tag.Apply(err)
case err == context.DeadlineExceeded && ctx.Err() == nil:
return transient.Tag.Apply(err) // request-level timeout
case err != nil:
return err
case res.Error != nil:
return errors.New(res.Error.Message)
default:
return nil
}
},
func(err error, wait time.Duration) {
logging.WithError(err).Warningf(ctx, "RPC error while searching builds; will retry in %s", wait)
})
if err != nil {
return
}
cursor = res.NextCursor
for _, b := range res.Builds {
select {
case <-ctx.Done():
err = ctx.Err()
return
case builds <- b:
sent++
if sent == limit {
break outer
}
}
}
if len(res.Builds) == 0 || res.NextCursor == "" {
break
}
c.urlParams_.Set(cursorKey, res.NextCursor)
}
return
}