blob: ec860c75175db66a1c4c4f222bcf139f0945a69f [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dsset
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"go.chromium.org/luci/gae/filter/txndefer"
"go.chromium.org/luci/gae/impl/memory"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/clock/testclock"
"go.chromium.org/luci/common/data/rand/mathrand"
"go.chromium.org/luci/common/data/stringset"
. "github.com/smartystreets/goconvey/convey"
)
func testingContext() context.Context {
c := txndefer.FilterRDS(memory.Use(context.Background()))
datastore.GetTestable(c).AutoIndex(true)
datastore.GetTestable(c).Consistent(true)
c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC()))
c = mathrand.Set(c, rand.New(rand.NewSource(1000)))
return c
}
// pop pops a bunch of items from the set and returns items that were popped.
func pop(c context.Context, s *Set, listing *Listing, ids []string) (popped []string, err error) {
op, err := s.BeginPop(c, listing)
if err != nil {
return nil, err
}
for _, id := range ids {
if op.Pop(id) {
popped = append(popped, id)
}
}
if err = FinishPop(c, op); err != nil {
return nil, err
}
return popped, nil
}
func TestSet(t *testing.T) {
t.Parallel()
Convey("item one lifecycle", t, func() {
c := testingContext()
set := Set{
Parent: datastore.NewKey(c, "Parent", "parent", 0, nil),
TombstonesDelay: time.Minute,
}
const limit = 10
// Add one item.
So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
// The item is returned by the listing.
listing, err := set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
So(listing.Garbage, ShouldBeNil)
// Pop it!
err = datastore.RunInTransaction(c, func(c context.Context) error {
popped, err := pop(c, &set, listing, []string{"abc"})
So(err, ShouldBeNil)
So(popped, ShouldResemble, []string{"abc"})
return nil
}, nil)
So(err, ShouldBeNil)
// The listing no longer returns it.
listing, err = set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldBeNil)
// The listing no longer returns the item, and there's no tombstones to
// cleanup.
listing, err = set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldBeNil)
So(listing.Garbage, ShouldBeNil)
// Attempt to add it back (should be ignored).
So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
// The listing still doesn't return it, but we now have a tombstone to
// cleanup (again).
listing, err = set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldBeNil)
So(len(listing.Garbage), ShouldEqual, 1)
So(listing.Garbage[0].old, ShouldBeFalse)
So(listing.Garbage[0].storage, ShouldNotBeNil)
// Popping it again doesn't work either.
err = datastore.RunInTransaction(c, func(c context.Context) error {
popped, err := pop(c, &set, listing, []string{"abc"})
So(err, ShouldBeNil)
So(popped, ShouldBeNil)
return nil
}, nil)
So(err, ShouldBeNil)
// Cleaning up the storage, again. This should make List stop returning
// the tombstone (since it has no storage items associated with it and it's
// not ready to be evicted yet).
So(CleanupGarbage(c, listing.Garbage), ShouldBeNil)
listing, err = set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldBeNil)
So(listing.Garbage, ShouldBeNil)
// Time passes, tombstone expires.
clock.Get(c).(testclock.TestClock).Add(2 * time.Minute)
// Listing now returns expired tombstone.
listing, err = set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldBeNil)
So(len(listing.Garbage), ShouldEqual, 1)
So(listing.Garbage[0].storage, ShouldBeNil) // cleaned already
// Cleanup storage keys.
So(CleanupGarbage(c, listing.Garbage), ShouldBeNil)
// Cleanup the tombstones themselves.
err = datastore.RunInTransaction(c, func(c context.Context) error {
popped, err := pop(c, &set, listing, nil)
So(err, ShouldBeNil)
So(popped, ShouldBeNil)
return nil
}, nil)
So(err, ShouldBeNil)
// No tombstones returned any longer.
listing, err = set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldBeNil)
So(listing.Garbage, ShouldBeNil)
// And the item can be added back now, since no trace of it is left.
So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
// Yep, it is there.
listing, err = set.List(c, limit)
So(err, ShouldBeNil)
So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
So(listing.Garbage, ShouldBeNil)
})
Convey("List obeys limit", t, func() {
c := testingContext()
set := Set{
Parent: datastore.MakeKey(c, "Parent", "parent"),
TombstonesDelay: time.Minute,
}
So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
So(set.Add(c, []Item{{ID: "def"}}), ShouldBeNil)
So(set.Add(c, []Item{{ID: "ghi"}}), ShouldBeNil)
l, err := set.List(c, 2)
So(err, ShouldBeNil)
So(l.Items, ShouldHaveLength, 2)
})
Convey("delete items non-transactionally", t, func() {
c := testingContext()
set := Set{
Parent: datastore.MakeKey(c, "Parent", "parent"),
TombstonesDelay: time.Minute,
}
// Add 3 items.
So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
So(set.Add(c, []Item{{ID: "def"}}), ShouldBeNil)
So(set.Add(c, []Item{{ID: "ghi"}}), ShouldBeNil)
l, err := set.List(c, 10)
So(err, ShouldBeNil)
So(l.Items, ShouldHaveLength, 3)
// Delete 2 items before transacting.
i := 0
err = set.Delete(c, func() string {
switch i = i + 1; i {
case 1:
return "def"
case 2:
return "abc"
default:
return ""
}
})
So(err, ShouldBeNil)
l2, err := set.List(c, 10)
So(err, ShouldBeNil)
So(l2.Items, ShouldResemble, []Item{{ID: "ghi"}})
})
}
func TestStress(t *testing.T) {
t.Parallel()
Convey("stress", t, func() {
// Add 1000 items in parallel from N goroutines, and (also in parallel),
// run N instances of "List and pop all", collecting the result in single
// list. There should be no duplicates in the final list!
c := testingContext()
set := Set{
Parent: datastore.MakeKey(c, "Parent", "parent"),
TombstonesDelay: time.Minute,
}
producers := 3
consumers := 5
items := 100
wakeups := make(chan string)
lock := sync.Mutex{}
var consumed []string
for i := 0; i < producers; i++ {
go func() {
for j := 0; j < items; j++ {
set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}})
// Wake up 3 consumers, so they "fight".
wakeups <- "wake"
wakeups <- "wake"
wakeups <- "wake"
}
for i := 0; i < consumers; i++ {
wakeups <- "done"
}
}()
}
consume := func() {
listing, err := set.List(c, 100000)
if err != nil || len(listing.Items) == 0 {
return
}
keys := make([]string, len(listing.Items))
for i, itm := range listing.Items {
keys[i] = itm.ID
}
// Try to pop all.
var popped []string
err = datastore.RunInTransaction(c, func(c context.Context) error {
var err error
popped, err = pop(c, &set, listing, keys)
return err
}, nil)
// Consider items consumed only if transaction has landed.
if err == nil && len(popped) != 0 {
lock.Lock()
consumed = append(consumed, popped...)
lock.Unlock()
}
}
wg := sync.WaitGroup{}
wg.Add(consumers)
for i := 0; i < consumers; i++ {
go func() {
defer wg.Done()
done := false
for !done {
done = (<-wakeups) == "done"
consume()
}
}()
}
wg.Wait() // this waits for completion of the entire pipeline
// Make sure 'consumed' is the initially produced set.
dedup := stringset.New(len(consumed))
for _, itm := range consumed {
dedup.Add(itm)
}
So(dedup.Len(), ShouldEqual, len(consumed)) // no dups
So(len(consumed), ShouldEqual, items) // all are accounted for
})
}