blob: 74794c5b5c1760cfafddc9f855d52adf3d004fab [file]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package archiver
import (
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"golang.org/x/net/context"
"github.com/luci/luci-go/client/internal/common"
"github.com/luci/luci-go/client/internal/progress"
"github.com/luci/luci-go/common/api/isolate/isolateservice/v1"
"github.com/luci/luci-go/common/data/text/units"
"github.com/luci/luci-go/common/isolated"
"github.com/luci/luci-go/common/isolatedclient"
"github.com/luci/luci-go/common/logging"
"github.com/luci/luci-go/common/runtime/tracer"
)
const (
groupFound progress.Group = 0
groupFoundFound progress.Section = 0
groupHash progress.Group = 1
groupHashDone progress.Section = 0
groupHashDoneSize progress.Section = 1
groupHashTodo progress.Section = 2
groupLookup progress.Group = 2
groupLookupDone progress.Section = 0
groupLookupTodo progress.Section = 1
groupUpload progress.Group = 3
groupUploadDone progress.Section = 0
groupUploadDoneSize progress.Section = 1
groupUploadTodo progress.Section = 2
groupUploadTodoSize progress.Section = 3
)
var headers = [][]progress.Column{
{{Name: "found"}},
{
{Name: "hashed"},
{Name: "size", Formatter: units.SizeToString},
{Name: "to hash"},
},
{{Name: "looked up"}, {Name: "to lookup"}},
{
{Name: "uploaded"},
{Name: "size", Formatter: units.SizeToString},
{Name: "to upload"},
{Name: "size", Formatter: units.SizeToString},
},
}
// UploadStat is the statistic for a single upload.
type UploadStat struct {
Duration time.Duration
Size units.Size
Name string
}
// Stats is statistics from the Archiver.
type Stats struct {
Hits []units.Size // Bytes; each item is immutable.
Pushed []*UploadStat // Misses; each item is immutable.
}
// TotalHits is the number of cache hits on the server.
func (s *Stats) TotalHits() int {
return len(s.Hits)
}
// TotalBytesHits is the number of bytes not uploaded due to cache hits on the
// server.
func (s *Stats) TotalBytesHits() units.Size {
out := units.Size(0)
for _, i := range s.Hits {
out += i
}
return out
}
// TotalMisses returns the number of cache misses on the server.
func (s *Stats) TotalMisses() int {
return len(s.Pushed)
}
// TotalBytesPushed returns the sum of bytes uploaded.
func (s *Stats) TotalBytesPushed() units.Size {
out := units.Size(0)
for _, i := range s.Pushed {
out += i.Size
}
return out
}
func (s *Stats) deepCopy() *Stats {
// Only need to copy the slice, not the items themselves.
return &Stats{s.Hits, s.Pushed}
}
// New returns a thread-safe Archiver instance.
//
// If not nil, out will contain tty-oriented progress information.
//
// ctx will be used for logging.
func New(ctx context.Context, c *isolatedclient.Client, out io.Writer) *Archiver {
// TODO(maruel): Cache hashes and server cache presence.
a := &Archiver{
ctx: ctx,
canceler: common.NewCanceler(),
progress: progress.New(headers, out),
c: c,
maxConcurrentHash: 5,
maxConcurrentContains: 64,
maxConcurrentUpload: 8,
containsBatchingDelay: 100 * time.Millisecond,
containsBatchSize: 50,
stage1DedupeChan: make(chan *Item),
stage2HashChan: make(chan *Item),
stage3LookupChan: make(chan *Item),
stage4UploadChan: make(chan *Item),
}
tracer.NewPID(a, "archiver")
a.wg.Add(1)
go func() {
defer a.wg.Done()
a.stage1DedupeLoop()
}()
// TODO(todd): Create on-disk cache in a new stage inserted between stages 1
// and 2. This should be a separate interface with its own implementation
// that 'archiver' keeps a reference to as member.
a.wg.Add(1)
go func() {
defer a.wg.Done()
a.stage2HashLoop()
}()
a.wg.Add(1)
go func() {
defer a.wg.Done()
a.stage3LookupLoop()
}()
a.wg.Add(1)
go func() {
defer a.wg.Done()
a.stage4UploadLoop()
}()
// Push an nil item to enforce stage1DedupeLoop() woke up. Otherwise this
// could lead to a race condition if Close() is called too quickly.
a.stage1DedupeChan <- nil
return a
}
// Item is an item to process.
//
// It is caried over from pipeline stage to stage to do processing on it.
type Item struct {
// Immutable.
DisplayName string // Name to use to qualify this item
wgHashed sync.WaitGroup // Released once .digestItem.Digest is set
priority int64 // Lower values - earlier hashing and uploading.
path string // Set when the source is a file on disk.
a *Archiver
// Mutable.
lock sync.Mutex
err error // Item specific error
digestItem isolateservice.HandlersEndpointsV1Digest // Mutated by hashLoop(), used by doContains()
linked []*Item // Deduplicated item.
// Mutable but not accessible externally.
source isolatedclient.Source // Source of data
state *isolatedclient.PushState // Server-side push state for cache miss
}
func newItem(a *Archiver, displayName, path string, source isolatedclient.Source, priority int64) *Item {
tracer.CounterAdd(a, "itemsProcessing", 1)
i := &Item{a: a, DisplayName: displayName, path: path, source: source, priority: priority}
i.wgHashed.Add(1)
return i
}
// done is called when the Item processing is done.
//
// It can be because there was an error, it was linked to another item, it was
// present on the server or finally it was uploaded successfully.
func (i *Item) done() error {
tracer.CounterAdd(i.a, "itemsProcessing", -1)
i.a = nil
return nil
}
// WaitForHashed hangs until the item hash is known.
func (i *Item) WaitForHashed() {
i.wgHashed.Wait()
}
// Error returns any error that occurred for this item if any.
func (i *Item) Error() error {
i.lock.Lock()
defer i.lock.Unlock()
return i.err
}
// Digest returns the calculated digest once calculated, empty otherwise.
func (i *Item) Digest() isolated.HexDigest {
i.lock.Lock()
defer i.lock.Unlock()
return isolated.HexDigest(i.digestItem.Digest)
}
func (i *Item) isFile() bool {
return len(i.path) != 0
}
// SetErr forcibly set an item as failed. Normally not used by callers.
func (i *Item) SetErr(err error) {
if err == nil {
panic("internal error")
}
i.lock.Lock()
defer i.lock.Unlock()
if i.err == nil {
i.err = err
for _, child := range i.linked {
child.lock.Lock()
child.err = err
child.lock.Unlock()
}
}
}
func (i *Item) calcDigest() error {
defer i.wgHashed.Done()
var d isolateservice.HandlersEndpointsV1Digest
src, err := i.source()
if err != nil {
return fmt.Errorf("source(%s) failed: %s\n", i.DisplayName, err)
}
defer src.Close()
h := isolated.GetHash()
size, err := io.Copy(h, src)
if err != nil {
i.SetErr(err)
return fmt.Errorf("read(%s) failed: %s\n", i.DisplayName, err)
}
d = isolateservice.HandlersEndpointsV1Digest{Digest: string(isolated.Sum(h)), IsIsolated: true, Size: size}
i.lock.Lock()
defer i.lock.Unlock()
i.digestItem = d
for _, child := range i.linked {
child.lock.Lock()
child.digestItem = d
child.lock.Unlock()
child.wgHashed.Done()
}
return nil
}
func (i *Item) link(child *Item) {
child.lock.Lock()
defer child.lock.Unlock()
if !child.isFile() || child.state != nil || child.linked != nil || child.err != nil {
panic("internal error")
}
i.lock.Lock()
defer i.lock.Unlock()
i.linked = append(i.linked, child)
if i.digestItem.Digest != "" {
child.digestItem = i.digestItem
child.err = i.err
child.wgHashed.Done()
} else if i.err != nil {
child.err = i.err
child.wgHashed.Done()
}
}
// Archiver is an high level interface to an isolatedclient.Client.
//
// Uses a 4 stages pipeline, each doing work concurrently:
// - Deduplicating similar requests or known server hot cache hits.
// - Hashing files.
// - Batched cache hit lookups on the server.
// - Uploading cache misses.
type Archiver struct {
// Immutable.
ctx context.Context
c *isolatedclient.Client
maxConcurrentHash int // Stage 2; Disk I/O bound.
maxConcurrentContains int // Stage 3; Server overload due to parallelism (DDoS).
maxConcurrentUpload int // Stage 4; Network I/O bound.
containsBatchingDelay time.Duration // Used by stage 3
containsBatchSize int // Used by stage 3
closeLock sync.Mutex
stage1DedupeChan chan *Item
stage2HashChan chan *Item
stage3LookupChan chan *Item
stage4UploadChan chan *Item
wg sync.WaitGroup
canceler common.Canceler
progress progress.Progress
// Mutable.
statsLock sync.Mutex
stats Stats
}
// Close waits for all pending files to be done. If an error occured during
// processing, it is returned.
func (a *Archiver) Close() error {
// This is done so asynchronously calling push() won't crash.
a.closeLock.Lock()
ok := false
if a.stage1DedupeChan != nil {
close(a.stage1DedupeChan)
a.stage1DedupeChan = nil
ok = true
}
a.closeLock.Unlock()
if !ok {
return errors.New("was already closed")
}
a.wg.Wait()
_ = a.progress.Close()
_ = a.canceler.Close()
err := a.CancelationReason()
tracer.Instant(a, "done", tracer.Global, nil)
return err
}
// Cancel implements common.Canceler
func (a *Archiver) Cancel(reason error) {
tracer.Instant(a, "cancel", tracer.Thread, tracer.Args{"reason": reason})
a.canceler.Cancel(reason)
}
// CancelationReason implements common.Canceler
func (a *Archiver) CancelationReason() error {
return a.canceler.CancelationReason()
}
// Channel implements common.Canceler
func (a *Archiver) Channel() <-chan error {
return a.canceler.Channel()
}
// Push schedules item upload to the isolate server. Smaller priority value
// means earlier processing.
func (a *Archiver) Push(displayName string, source isolatedclient.Source, priority int64) *Item {
return a.push(newItem(a, displayName, "", source, priority))
}
// PushFile schedules file upload to the isolate server. Smaller priority
// value means earlier processing.
func (a *Archiver) PushFile(displayName, path string, priority int64) *Item {
source := func() (io.ReadCloser, error) {
return os.Open(path)
}
return a.push(newItem(a, displayName, path, source, priority))
}
// Stats returns a copy of the statistics.
func (a *Archiver) Stats() *Stats {
a.statsLock.Lock()
defer a.statsLock.Unlock()
return a.stats.deepCopy()
}
func (a *Archiver) push(item *Item) *Item {
if a.pushLocked(item) {
tracer.Instant(a, "itemAdded", tracer.Thread, tracer.Args{"item": item.DisplayName})
tracer.CounterAdd(a, "itemsAdded", 1)
a.progress.Update(groupFound, groupFoundFound, 1)
return item
}
item.done()
return nil
}
func (a *Archiver) pushLocked(item *Item) bool {
// The close(a.stage1DedupeChan) call is always occurring with the lock held.
a.closeLock.Lock()
defer a.closeLock.Unlock()
if a.stage1DedupeChan == nil {
// Archiver was closed.
return false
}
// stage1DedupeLoop must never block and must be as fast as it can because it
// is done while holding a.closeLock.
a.stage1DedupeChan <- item
return true
}
func (a *Archiver) stage1DedupeLoop() {
c := a.stage1DedupeChan
defer close(a.stage2HashChan)
seen := map[string]*Item{}
// Create our own goroutine-local channel buffer, which doesn't need to be
// synchronized (unlike channels).
buildUp := []*Item{}
for {
// Pull or push an item, dependending if there is build up.
var item *Item
ok := true
if len(buildUp) == 0 {
item, ok = <-c
} else {
select {
case item, ok = <-c:
case a.stage2HashChan <- buildUp[0]:
// Pop first item from buildUp.
buildUp = buildUp[1:]
a.progress.Update(groupHash, groupHashTodo, 1)
}
}
if !ok {
break
}
if item == nil {
continue
}
// This loop must never block and must be as fast as it can as it is
// functionally equivalent to running with a.closeLock held.
if err := a.CancelationReason(); err != nil {
item.SetErr(err)
item.done()
item.wgHashed.Done()
continue
}
// TODO(maruel): Resolve symlinks for further deduplication? Depends on the
// use case, not sure the trade off is worth.
if item.isFile() {
if previous, ok := seen[item.path]; ok {
previous.link(item)
// TODO(maruel): Semantically weird.
item.done()
continue
}
}
buildUp = append(buildUp, item)
seen[item.path] = item
}
// Take care of the build up after the channel closed.
for _, item := range buildUp {
if err := a.CancelationReason(); err != nil {
item.SetErr(err)
item.wgHashed.Done()
item.done()
} else {
// The Archiver is being closed, this has to happen synchronously.
a.stage2HashChan <- item
a.progress.Update(groupHash, groupHashTodo, 1)
}
}
}
func (a *Archiver) stage2HashLoop() {
defer close(a.stage3LookupChan)
pool := common.NewGoroutinePriorityPool(a.maxConcurrentHash, a.canceler)
defer func() {
_ = pool.Wait()
}()
for file := range a.stage2HashChan {
// This loop will implicitly buffer when stage1 is too fast by creating a
// lot of hung goroutines in pool. This permits reducing the contention on
// a.closeLock.
// TODO(tandrii): Implement backpressure in GoroutinePool, e.g. when it
// exceeds 20k or something similar.
item := file
pool.Schedule(item.priority, func() {
// calcDigest calls SetErr() and update wgHashed even on failure.
end := tracer.Span(a, "hash", tracer.Args{"name": item.DisplayName})
if err := item.calcDigest(); err != nil {
end(tracer.Args{"err": err})
a.Cancel(err)
item.done()
return
}
end(tracer.Args{"size": float64(item.digestItem.Size)})
tracer.CounterAdd(a, "bytesHashed", float64(item.digestItem.Size))
a.progress.Update(groupHash, groupHashDone, 1)
a.progress.Update(groupHash, groupHashDoneSize, item.digestItem.Size)
a.progress.Update(groupLookup, groupLookupTodo, 1)
a.stage3LookupChan <- item
}, func() {
item.SetErr(a.CancelationReason())
item.wgHashed.Done()
item.done()
})
}
}
func (a *Archiver) stage3LookupLoop() {
defer close(a.stage4UploadChan)
pool := common.NewGoroutinePool(a.maxConcurrentContains, a.canceler)
defer func() {
_ = pool.Wait()
}()
items := []*Item{}
never := make(<-chan time.Time)
timer := never
loop := true
for loop {
select {
case <-timer:
batch := items
pool.Schedule(func() {
a.doContains(batch)
}, nil)
items = []*Item{}
timer = never
case item, ok := <-a.stage3LookupChan:
if !ok {
loop = false
break
}
items = append(items, item)
if len(items) == a.containsBatchSize {
batch := items
pool.Schedule(func() {
a.doContains(batch)
}, nil)
items = []*Item{}
timer = never
} else if timer == never {
timer = time.After(a.containsBatchingDelay)
}
}
}
if len(items) != 0 {
batch := items
pool.Schedule(func() {
a.doContains(batch)
}, nil)
}
}
func (a *Archiver) stage4UploadLoop() {
pool := common.NewGoroutinePriorityPool(a.maxConcurrentUpload, a.canceler)
defer func() {
_ = pool.Wait()
}()
for state := range a.stage4UploadChan {
item := state
pool.Schedule(item.priority, func() {
a.doUpload(item)
}, nil)
}
}
// emptyBackgroundContext is a placeholder context for usage with the isolated
// client. A better implementation would use a real context to allow the isolate
// client calls to be Cancel'd. This could be done by wiring up
// archvier.canceller to a fake context which is passed to the isolated client,
// or it could be done by actually implementing real contexts in Archiver (and
// deprecating the use of canceler).
var emptyBackgroundContext = context.Background()
// doContains is called by stage 3.
func (a *Archiver) doContains(items []*Item) {
tmp := make([]*isolateservice.HandlersEndpointsV1Digest, len(items))
// No need to lock each item at that point, no mutation occurs on
// Item.digestItem after stage 2.
for i, item := range items {
tmp[i] = &item.digestItem
}
states, err := a.c.Contains(emptyBackgroundContext, tmp)
if err != nil {
err = fmt.Errorf("contains(%d) failed: %s", len(items), err)
a.Cancel(err)
for _, item := range items {
item.SetErr(err)
}
return
}
a.progress.Update(groupLookup, groupLookupDone, int64(len(items)))
for index, state := range states {
size := items[index].digestItem.Size
if state == nil {
a.statsLock.Lock()
a.stats.Hits = append(a.stats.Hits, units.Size(size))
a.statsLock.Unlock()
items[index].done()
} else {
items[index].state = state
a.progress.Update(groupUpload, groupUploadTodo, 1)
a.progress.Update(groupUpload, groupUploadTodoSize, items[index].digestItem.Size)
a.stage4UploadChan <- items[index]
}
}
logging.Infof(a.ctx, "Looked up %d items\n", len(items))
}
// doUpload is called by stage 4.
func (a *Archiver) doUpload(item *Item) {
start := time.Now()
if err := a.c.Push(emptyBackgroundContext, item.state, item.source); err != nil {
err = fmt.Errorf("push(%s) failed: %s\n", item.path, err)
a.Cancel(err)
item.SetErr(err)
} else {
a.progress.Update(groupUpload, groupUploadDone, 1)
a.progress.Update(groupUpload, groupUploadDoneSize, item.digestItem.Size)
}
item.done()
size := units.Size(item.digestItem.Size)
u := &UploadStat{time.Since(start), size, item.DisplayName}
a.statsLock.Lock()
a.stats.Pushed = append(a.stats.Pushed, u)
a.statsLock.Unlock()
logging.Infof(a.ctx, "Uploaded %7s: %s\n", size, item.DisplayName)
}