// Copyright 2018 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package policy
import (
. ""
func TestLogarithmicBatching(t *testing.T) {
Convey("With simulator", t, func(c C) {
invocationDuration := time.Hour // may be modified in tests below.
s := Simulator{
OnRequest: func(s *Simulator, r task.Request) time.Duration {
return invocationDuration
OnDebugLog: func(format string, args ...any) {
c.Printf(format+"\n", args...)
const noDelay = time.Duration(0)
lastAddedTrigger := 0
addTriggers := func(delay time.Duration, n int) {
ts := make([]internal.Trigger, n)
for i := range ts {
ts[i] = internal.NoopTrigger(
fmt.Sprintf("t-%03d", lastAddedTrigger),
fmt.Sprintf("data-%03d", lastAddedTrigger),
s.AddTrigger(delay, ts...)
var err error
Convey("Logarithm base must be at least 1.0001", func() {
s.Policy, err = LogarithmicBatchingPolicy(2, 1000, 1.0)
So(err, ShouldNotBeNil)
Convey("Logarithmic batching works", func() {
// A policy that allows 1 concurrent invocation with effectively unlimited
// batch size and logarithm base of 2.
const maxBatchSize = 5
s.Policy, err = LogarithmicBatchingPolicy(1, maxBatchSize, 2.0)
So(err, ShouldBeNil)
Convey("at least 1 trigger", func() {
// Add exactly one trigger; log(2,1) == 0.
addTriggers(noDelay, 1)
So(s.Invocations, ShouldHaveLength, 1)
So(s.Last().Request.TriggerIDs(), ShouldResemble, []string{"t-001"})
So(s.Last().Request.StringProperty("noop_trigger_data"), ShouldEqual, "data-001")
So(s.PendingTriggers, ShouldHaveLength, 0)
So(s.DiscardedTriggers, ShouldHaveLength, 0)
Convey("rounds down number of consumed triggers", func() {
addTriggers(noDelay, 3)
So(s.Invocations, ShouldHaveLength, 1)
// log(2,3) = 1.584
So(s.Last().Request.TriggerIDs(), ShouldResemble, []string{"t-001"})
So(s.Last().Request.StringProperty("noop_trigger_data"), ShouldEqual, "data-001")
So(s.PendingTriggers, ShouldHaveLength, 2)
So(s.DiscardedTriggers, ShouldHaveLength, 0)
Convey("respects maxBatchSize", func() {
N := 1 << (maxBatchSize + 2)
addTriggers(noDelay, N)
So(s.Invocations, ShouldHaveLength, 1)
So(s.Last().Request.TriggerIDs(), ShouldHaveLength, maxBatchSize)
So(s.Last().Request.StringProperty("noop_trigger_data"), ShouldEqual, fmt.Sprintf("data-%03d", maxBatchSize))
So(s.PendingTriggers, ShouldHaveLength, N-maxBatchSize)
So(s.DiscardedTriggers, ShouldHaveLength, 0)
Convey("Many triggers", func() {
// Add 5 triggers.
addTriggers(noDelay, 5)
So(s.Invocations, ShouldHaveLength, 1)
So(s.Last().Request.TriggerIDs(), ShouldResemble, []string{"t-001", "t-002"})
So(s.Last().Request.StringProperty("noop_trigger_data"), ShouldEqual, "data-002")
So(s.PendingTriggers, ShouldHaveLength, 3)
// Add a few triggers while the invocation is running.
addTriggers(invocationDuration/4, 2)
addTriggers(invocationDuration/4, 2)
addTriggers(invocationDuration/4, 2)
addTriggers(invocationDuration/4, 2)
// Invocation is finsihed now, we have 11 = (3 old + 4*2 new triggers).
So(s.Invocations, ShouldHaveLength, 2) // new invocation created.
// log(2,11) = 3.459
So(s.Last().Request.TriggerIDs(), ShouldResemble, []string{"t-003", "t-004", "t-005"})
So(s.DiscardedTriggers, ShouldHaveLength, 0)
Convey("Long simulation", func() {
// Run this with: `go test -run TestLogarithmicBatching -v`
// TODO(tandrii): maybe make it like a Go benchmark?
// Parameters.
const (
veryVerbose = false
maxBatchSize = 50
maxConcurrentBuilds = 2
logBase = 1.185
buildDuration = 43 * time.Minute
simulDuration = time.Hour * 24 * 5
percentiles := []int{0, 25, 50, 75, 100}
// Value of 10 below means a commit lands every 10 minutes.
MinutesBetweenCommitsByHourOfDay := []int{
20, 30, 30, 30, 30, 30, // midnight .. 6am
20, 10, 6, 3, 1, 1, // 6am..noon
1, 1, 2, 3, 4, 5, // noon .. 6pm
6, 10, 12, 15, 20, 20, // 6pm .. midnight
// Setup.
invocationDuration = buildDuration
s.Policy, err = LogarithmicBatchingPolicy(maxConcurrentBuilds, maxBatchSize, logBase)
So(err, ShouldBeNil)
s.Epoch = testclock.TestRecentTimeUTC.Truncate(24 * time.Hour)
// Simulate.
var pendingSizes []int
var commits []time.Time
for {
delay := time.Minute * time.Duration(MinutesBetweenCommitsByHourOfDay[s.Now.Hour()])
if delay <= 0 {
panic("wrong minutesOfCommitDelayByHourOfDay")
addTriggers(delay, 1)
commits = append(commits, s.Now)
pendingSizes = append(pendingSizes, len(s.PendingTriggers))
elapsed := s.Now.Sub(s.Epoch)
if veryVerbose {
s.OnDebugLog("%70s [%12s] [%s] remaining %d", "", elapsed, s.Now, len(s.PendingTriggers))
if elapsed > simulDuration {
// There should never be any discarded triggers with this policy.
So(s.DiscardedTriggers, ShouldHaveLength, 0)
// Analyze.
var oldestCommitAgeMinutes []int
triggerSizes := make([]int, len(s.Invocations))
commistProcesssed := 0
for i, inv := range s.Invocations {
l := len(inv.Request.IncomingTriggers)
oldestCreatedAt := commits[commistProcesssed]
oldestAge := s.Epoch.Add(inv.Created).Sub(oldestCreatedAt)
commistProcesssed += l
triggerSizes[i] = l
oldestCommitAgeMinutes = append(oldestCommitAgeMinutes, int(oldestAge/time.Minute))
separatorLine := strings.Repeat("=", 80)
_, _ = Printf("\n\n%s\nReport\n%s\n", separatorLine, separatorLine)
_, _ = Printf(" * logBase %.5f\n", logBase)
_, _ = Printf(" * maxBatchSize %d\n", maxBatchSize)
_, _ = Printf(" * maxConcurrentBuilds %d\n", maxConcurrentBuilds)
_, _ = Printf(" * build duration %s\n", buildDuration)
_, _ = Printf(" * simulation duration %s\n", simulDuration)
_, _ = Println()
_, _ = Printf("Simulated %d commits, %d builds, %d triggers remaining\n", lastAddedTrigger, len(s.Invocations), len(s.PendingTriggers))
_, _ = Println()
_, _ = Println(fmtPercentiles(" number of per-build commits", "%3d", triggerSizes, percentiles...))
_, _ = Println(fmtPercentiles(" number of pending commits", "%3d", pendingSizes, percentiles...))
_, _ = Println(fmtPercentiles("oldest pending commit (minutes)", "%3d", oldestCommitAgeMinutes, percentiles...))
_, _ = Printf("%s\n", separatorLine)
func fmtPercentiles(prefix, valFormat string, values []int, percentiles string {
var sb strings.Builder
l := len(values)
for _, p := range percentiles {
switch {
case p < 0 || p > 100:
panic(fmt.Errorf("invalid percentile %d, must be in 0..100", p))
case p == 0:
sb.WriteString(" min ")
case p == 100:
sb.WriteString(" max ")
_, _ = fmt.Fprintf(&sb, " p%02d ", p)
idx := l * p / 100
if idx >= l {
idx = l - 1
_, _ = fmt.Fprintf(&sb, valFormat, values[idx])
return strings.TrimRight(sb.String(), ",")