| // Copyright 2016 The LUCI Authors. All rights reserved. |
| // Use of this source code is governed under the Apache License, Version 2.0 |
| // that can be found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "log" |
| "os" |
| "time" |
| |
| "golang.org/x/net/context" |
| "google.golang.org/api/support/bundler" |
| |
| service "github.com/luci/luci-go/common/api/isolate/isolateservice/v1" |
| "github.com/luci/luci-go/common/isolatedclient" |
| ) |
| |
| // isolateService is an internal interface to allow mocking of the |
| // isolatedclient.Client. |
| type isolateService interface { |
| Contains(context.Context, []*service.HandlersEndpointsV1Digest) ([]*isolatedclient.PushState, error) |
| Push(context.Context, *isolatedclient.PushState, isolatedclient.Source) error |
| } |
| |
| // CheckerCallback is the callback used by Checker to indicate whether a file is |
| // present on the isolate server. If the item not present, the callback will be |
| // include the PushState necessary to upload it. Otherwise, the PushState will |
| // be nil. |
| type CheckerCallback func(*Item, *isolatedclient.PushState) |
| |
| type checkerItem struct { |
| item *Item |
| isolated bool |
| callback CheckerCallback |
| } |
| |
| // Checker uses the isolatedclient.Client to check whether items are available |
| // on the server. |
| // Checker methods are safe to call concurrently. |
| type Checker struct { |
| ctx context.Context |
| svc isolateService |
| bundler *bundler.Bundler |
| err error |
| |
| Hit, Miss CountBytes |
| } |
| |
| // CountBytes aggregates a count of files and the number of bytes in them. |
| type CountBytes struct { |
| Count int |
| Bytes int64 |
| } |
| |
| func (cb *CountBytes) addFile(size int64) { |
| cb.Count++ |
| cb.Bytes += size |
| } |
| |
| // NewChecker creates a NewChecker with the given isolated client. |
| // The provided context is used to make all requests to the isolate server. |
| func NewChecker(ctx context.Context, client *isolatedclient.Client) *Checker { |
| return newChecker(ctx, client) |
| } |
| |
| func newChecker(ctx context.Context, svc isolateService) *Checker { |
| c := &Checker{ |
| svc: svc, |
| ctx: ctx, |
| } |
| c.bundler = bundler.NewBundler(checkerItem{}, func(bundle interface{}) { |
| items := bundle.([]checkerItem) |
| if c.err != nil { |
| for _, item := range items { |
| // Drop any more incoming items. |
| log.Printf("WARNING dropped %q from Checker", item.item.Path) |
| } |
| return |
| } |
| c.err = c.check(items) |
| }) |
| c.bundler.DelayThreshold = 50 * time.Millisecond |
| c.bundler.BundleCountThreshold = 50 |
| return c |
| } |
| |
| // AddItem adds the given item to the checker for testing, and invokes the provided |
| // callback asynchronously. The isolated param indicates whether the given item |
| // represents a JSON isolated manifest (as opposed to a regular file). |
| // In the case of an error, the callback may never be invoked. |
| func (c *Checker) AddItem(item *Item, isolated bool, callback CheckerCallback) { |
| if err := c.bundler.Add(checkerItem{item, isolated, callback}, 0); err != nil { |
| // An error is only returned if the size is too big, but we always use |
| // zero size so no error is possible. |
| panic(err) |
| } |
| } |
| |
| // Close shuts down the checker, blocking until all pending items have been |
| // checked with the server. Close returns the first error encountered during |
| // the checking process, if any. |
| // After Close has returned, Checker is guaranteed to no longer invoke any |
| // previously-provided callback. |
| func (c *Checker) Close() error { |
| c.bundler.Flush() |
| // After Close has returned, we know there are no outstanding running |
| // checks. |
| return c.err |
| } |
| |
| // check is invoked from the bundler's handler. As such, it is only ever run |
| // one invocation at a time. |
| func (c *Checker) check(items []checkerItem) error { |
| var digests []*service.HandlersEndpointsV1Digest |
| for _, item := range items { |
| digests = append(digests, &service.HandlersEndpointsV1Digest{ |
| Digest: string(item.item.Digest), |
| Size: item.item.Size, |
| IsIsolated: item.isolated, |
| }) |
| } |
| out, err := c.svc.Contains(c.ctx, digests) |
| if err != nil { |
| // TODO(djd): propogate this more cleanly. At the moment, dropping |
| // callbacks may cause the TAR archiver to hang. |
| log.Printf("ERROR: isolate Contains call failed: %v", err) |
| os.Exit(infraFailExit) |
| return err |
| } |
| for i, item := range items { |
| if size := item.item.Size; out[i] == nil { |
| c.Hit.addFile(size) |
| } else { |
| c.Miss.addFile(size) |
| } |
| |
| item.callback(item.item, out[i]) |
| } |
| return nil |
| } |