blob: f2037883b6839e2af4820c7d4500735c9419227a [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"go.chromium.org/luci/cipd/client/cipd/fs"
"go.chromium.org/luci/cipd/client/cipd/internal/messages"
"go.chromium.org/luci/cipd/client/cipd/pkg"
"go.chromium.org/luci/cipd/common"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
. "github.com/smartystreets/goconvey/convey"
)
// No need to create a lot of files in tests.
const testInstanceCacheMaxSize = 10
func TestInstanceCache(t *testing.T) {
t.Parallel()
Convey("InstanceCache", t, func() {
ctx, tc := testclock.UseTime(context.Background(), testclock.TestTimeLocal)
tempDir, err := ioutil.TempDir("", "instanceche_test")
So(err, ShouldBeNil)
defer os.RemoveAll(tempDir)
fs := fs.NewFileSystem(tempDir, "")
pin := func(i int) common.Pin {
pin := common.Pin{"pkg", fmt.Sprintf("%d", i)}
pin.InstanceID = strings.Repeat("a", 40-len(pin.InstanceID)) + pin.InstanceID
return pin
}
fakeData := func(p common.Pin) string {
return "data:" + p.InstanceID
}
var fetchM sync.Mutex
var fetchErr chan error
var fetchCalls int
fetcher := func(ctx context.Context, pin common.Pin, w io.WriteSeeker) error {
fetchM.Lock()
fetchCalls++
fetchErrCh := fetchErr
fetchM.Unlock()
if fetchErrCh != nil {
if err := <-fetchErrCh; err != nil {
return err
}
}
_, err := w.Write([]byte(fakeData(pin)))
return err
}
cache := &InstanceCache{
FS: fs,
Fetcher: fetcher,
maxSize: testInstanceCacheMaxSize,
}
cache.Launch(ctx)
defer cache.Close(ctx)
access := func(cache *InstanceCache, pin common.Pin) (created bool, src pkg.Source) {
fetchM.Lock()
before := fetchCalls
fetchM.Unlock()
cache.RequestInstances(ctx, []*InstanceRequest{
{Context: ctx, Pin: pin},
})
res := cache.WaitInstance()
So(res.Err, ShouldBeNil)
fetchM.Lock()
created = fetchCalls > before
fetchM.Unlock()
return created, res.Source
}
putNew := func(cache *InstanceCache, pin common.Pin) {
created, src := access(cache, pin)
So(created, ShouldBeTrue)
So(src.Close(ctx, false), ShouldBeNil)
}
readSrc := func(src pkg.Source) string {
buf, err := io.ReadAll(io.NewSectionReader(src, 0, src.Size()))
So(err, ShouldBeNil)
return string(buf)
}
testHas := func(cache *InstanceCache, pin common.Pin) {
created, src := access(cache, pin)
So(created, ShouldBeFalse)
So(readSrc(src), ShouldEqual, fakeData(pin))
So(src.Close(ctx, false), ShouldBeNil)
}
accessTime := func(cache *InstanceCache, pin common.Pin) (lastAccess time.Time, ok bool) {
cache.withState(ctx, clock.Now(ctx), func(s *messages.InstanceCache) (save bool) {
var entry *messages.InstanceCache_Entry
if entry, ok = s.Entries[pin.InstanceID]; ok {
lastAccess = entry.LastAccess.AsTime()
}
return false
})
return
}
countTempFiles := func() int {
tempDirFile, err := os.Open(tempDir)
So(err, ShouldBeNil)
defer tempDirFile.Close()
files, err := tempDirFile.Readdirnames(0)
So(err, ShouldBeNil)
return len(files)
}
Convey("Works in general", func() {
cache2 := &InstanceCache{FS: fs, Fetcher: fetcher}
cache2.Launch(ctx)
defer cache2.Close(ctx)
// Add new.
putNew(cache, pin(0))
// Check it can be seen even through another InstanceCache object.
testHas(cache, pin(0))
testHas(cache2, pin(0))
})
Convey("Temp cache removes files", func() {
cache := &InstanceCache{
FS: fs,
Tmp: true,
Fetcher: fetcher,
}
cache.Launch(ctx)
defer cache.Close(ctx)
So(countTempFiles(), ShouldEqual, 0)
cache.RequestInstances(ctx, []*InstanceRequest{
{Context: ctx, Pin: pin(0)},
})
res := cache.WaitInstance()
So(res.Err, ShouldBeNil)
So(countTempFiles(), ShouldEqual, 1)
So(res.Source.Close(ctx, false), ShouldBeNil)
So(countTempFiles(), ShouldEqual, 0)
})
Convey("Redownloads corrupted files", func() {
So(countTempFiles(), ShouldEqual, 0)
// Download the first time.
cache.RequestInstances(ctx, []*InstanceRequest{
{Context: ctx, Pin: pin(0)},
})
res := cache.WaitInstance()
So(res.Err, ShouldBeNil)
So(fetchCalls, ShouldEqual, 1)
So(res.Source.Close(ctx, false), ShouldBeNil)
// Stored in the cache (plus state.db file).
So(countTempFiles(), ShouldEqual, 2)
// The second call grabs it from the cache.
cache.RequestInstances(ctx, []*InstanceRequest{
{Context: ctx, Pin: pin(0)},
})
res = cache.WaitInstance()
So(res.Err, ShouldBeNil)
So(fetchCalls, ShouldEqual, 1)
// Close as corrupted. Should be removed from the cache.
So(res.Source.Close(ctx, true), ShouldBeNil)
// Only state.db file left.
So(countTempFiles(), ShouldEqual, 1)
// Download the second time.
cache.RequestInstances(ctx, []*InstanceRequest{
{Context: ctx, Pin: pin(0)},
})
res = cache.WaitInstance()
So(res.Err, ShouldBeNil)
So(fetchCalls, ShouldEqual, 2)
So(res.Source.Close(ctx, false), ShouldBeNil)
})
Convey("Concurrency", func() {
cache := &InstanceCache{
FS: fs,
Fetcher: fetcher,
}
defer cache.Close(ctx)
var reqs []*InstanceRequest
for i := 0; i < 100; i++ {
reqs = append(reqs, &InstanceRequest{
Context: ctx,
Pin: pin(i),
State: i,
})
}
Convey("Preserves the order when using single stream", func() {
cache.ParallelDownloads = 1
cache.Launch(ctx)
cache.RequestInstances(ctx, reqs)
for i := 0; i < len(reqs); i++ {
res := cache.WaitInstance()
So(res.Err, ShouldBeNil)
So(res.State.(int), ShouldEqual, i)
So(readSrc(res.Source), ShouldEqual, fakeData(pin(i)))
So(res.Source.Close(ctx, false), ShouldBeNil)
}
})
Convey("Doesn't deadlock", func() {
cache.ParallelDownloads = 4
cache.Launch(ctx)
seen := map[int]struct{}{}
cache.RequestInstances(ctx, reqs)
for i := 0; i < len(reqs); i++ {
res := cache.WaitInstance()
So(res.Err, ShouldBeNil)
So(res.Source.Close(ctx, false), ShouldBeNil)
seen[res.State.(int)] = struct{}{}
}
So(len(seen), ShouldEqual, len(reqs))
})
Convey("Handles errors", func() {
fetchErr = make(chan error, len(reqs))
cache.ParallelDownloads = 4
cache.Launch(ctx)
cache.RequestInstances(ctx, reqs)
// Make errCount fetches fail and rest succeed.
const errCount = 10
for i := 0; i < errCount; i++ {
fetchErr <- fmt.Errorf("boom %d", i)
}
for i := errCount; i < len(reqs); i++ {
fetchErr <- nil
}
errs := 0
for i := 0; i < len(reqs); i++ {
res := cache.WaitInstance()
if res.Source != nil {
So(res.Source.Close(ctx, false), ShouldBeNil)
}
if res.Err != nil {
errs++
}
}
So(errs, ShouldEqual, errCount)
})
})
Convey("GC respects MaxSize", func() {
// Add twice more the limit.
for i := 0; i < testInstanceCacheMaxSize*2; i++ {
putNew(cache, pin(i))
tc.Add(time.Second)
}
// Check the number of actual files.
So(countTempFiles(), ShouldEqual, testInstanceCacheMaxSize+1) // +1 for state.db
// Only last testInstanceCacheMaxSize instances are still in the cache.
for i := testInstanceCacheMaxSize; i < testInstanceCacheMaxSize*2; i++ {
testHas(cache, pin(i))
}
// The rest are missing and can be recreated.
for i := 0; i < testInstanceCacheMaxSize; i++ {
putNew(cache, pin(i))
}
})
Convey("GC respects MaxAge", func() {
cache.maxAge = 2500 * time.Millisecond
for i := 0; i < 8; i++ {
if i != 0 {
tc.Add(time.Second)
}
putNew(cache, pin(i))
}
// Age of last added item (i == 7) is 0 => age of i'th item is 7-i.
//
// Condition for survival: age < cache.maxAge, e.g. 7-i<2.5 => i >= 5.
//
// Thus we expect {5, 6, 7} to still be in the cache after the GC.
cache.GC(ctx)
testHas(cache, pin(5))
testHas(cache, pin(6))
testHas(cache, pin(7))
// The rest are missing and can be recreated.
for i := 0; i < 5; i++ {
putNew(cache, pin(i))
}
})
Convey("RequestInstancesDoesNotEvictEntriesToBeFetched", func() {
cache.maxAge = 2 * time.Second
for i := 0; i < 8; i++ {
putNew(cache, pin(i))
}
// At this point, 8 fetches have been done.
So(fetchCalls, ShouldEqual, 8)
tc.Add(3 * time.Second)
// All of the cache entries are now older than cache.maxAge,
// but we are fetching them again, so they should not be
// evicted.
var ir []*InstanceRequest
// A new download will trigger gc
ir = append(ir, &InstanceRequest{Context: ctx, Pin: pin(10)})
for i := 0; i < 8; i++ {
ir = append(ir, &InstanceRequest{Context: ctx, Pin: pin(i)})
}
cache.RequestInstances(ctx, ir)
for i := 0; i < 9; i++ {
res := cache.WaitInstance()
So(res.Err, ShouldBeNil)
So(res.Source.Close(ctx, false), ShouldBeNil)
}
// Only one new fetch should happen, for pin 10.
So(fetchCalls, ShouldEqual, 9)
})
Convey("Sync", func() {
stateDbPath := filepath.Join(tempDir, instanceCacheStateFilename)
const count = 10
testSync := func(causeResync func()) {
// Add instances.
for i := 0; i < count; i++ {
putNew(cache, pin(i))
}
causeResync()
// state.db must be restored.
for i := 0; i < count; i++ {
lastAccess, ok := accessTime(cache, pin(i))
So(ok, ShouldBeTrue)
So(lastAccess.UnixNano(), ShouldEqual, clock.Now(ctx).UnixNano())
}
_, ok := accessTime(cache, common.Pin{"nonexistent", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"})
So(ok, ShouldBeFalse)
}
Convey("state.db disappeared", func() {
testSync(func() {
err := os.Remove(stateDbPath)
So(err, ShouldBeNil)
})
})
Convey("state.db corrupted", func() {
testSync(func() {
f, err := os.Create(stateDbPath)
So(err, ShouldBeNil)
f.WriteString("blah")
defer f.Close()
})
})
})
})
}