blob: 5be6e7ba5f6a91c3751cd74d41163fd697caef04 [file] [log] [blame]
// Copyright 2017 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package bundleServicesClient
import (
s ""
// The maximum, AppEngine request size, minus 1MB for overhead.
const maxBundleSize = gae.MaxRequestSize - (1024 * 1024) // 1MB
// Client is a LogDog Coordinator Services endpoint client that intercepts
// calls that can be batched and buffers them, sending them with the Batch
// RPC instead of their independent individual RPCs.
// The Context and CallOption set for the first intercepted call will be used
// when making the batch call; all other CallOption sets will be ignored.
// Bundling parameters can be controlled by modifying the Bundler prior to
// invoking it.
type Client struct {
// ServicesClient is the Coordinator Services endpoint Client that is being
// wrapped.
// Starting from the time that the first message is added to a bundle, once
// this delay has passed, handle the bundle.
// The default is bundler.DefaultDelayThreshold.
DelayThreshold time.Duration
// Once a bundle has this many items, handle the bundle. Since only one
// item at a time is added to a bundle, no bundle will exceed this
// threshold, so it also serves as a limit.
// The default is bundler.DefaultBundleCountThreshold.
BundleCountThreshold int
initBundlerOnce sync.Once
bundler *bundler.Bundler
// outstanding is used to track outstanding RPCs. On Flush, the Client will
// block pending completion of all outstanding RPCs.
outstanding sync.WaitGroup
// RegisterStream implements ServicesClient.
func (c *Client) RegisterStream(ctx context.Context, in *s.RegisterStreamRequest, opts ...grpc.CallOption) (
*s.RegisterStreamResponse, error) {
resp, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_RegisterStream{RegisterStream: in},
if err != nil {
return nil, err
return resp.GetRegisterStream(), nil
// LoadStream implements ServicesClient.
func (c *Client) LoadStream(ctx context.Context, in *s.LoadStreamRequest, opts ...grpc.CallOption) (
*s.LoadStreamResponse, error) {
resp, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_LoadStream{LoadStream: in},
if err != nil {
return nil, err
return resp.GetLoadStream(), nil
// TerminateStream implements ServicesClient.
func (c *Client) TerminateStream(ctx context.Context, in *s.TerminateStreamRequest, opts ...grpc.CallOption) (
*empty.Empty, error) {
_, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_TerminateStream{TerminateStream: in},
if err != nil {
return nil, err
return &empty.Empty{}, nil
// ArchiveStream implements ServicesClient.
func (c *Client) ArchiveStream(ctx context.Context, in *s.ArchiveStreamRequest, opts ...grpc.CallOption) (
*empty.Empty, error) {
_, err := c.bundleRPC(ctx, opts, &s.BatchRequest_Entry{
Value: &s.BatchRequest_Entry_ArchiveStream{ArchiveStream: in},
if err != nil {
return nil, err
return &empty.Empty{}, nil
// Flush flushes the Bundler. It should be called when terminating to ensure
// that buffered client requests have been completed.
func (c *Client) Flush() {
func (c *Client) initBundler() {
c.initBundlerOnce.Do(func() {
c.bundler = bundler.NewBundler(&batchEntry{}, c.bundlerHandler)
c.bundler.DelayThreshold = c.DelayThreshold
c.bundler.BundleCountThreshold = c.BundleCountThreshold
c.bundler.BundleByteThreshold = maxBundleSize // Hard-coded.
// bundleRPC adds req to the underlying Bundler, blocks until it completes, and
// returns its response.
func (c *Client) bundleRPC(ctx context.Context, opts []grpc.CallOption, req *s.BatchRequest_Entry) (*s.BatchResponse_Entry, error) {
be := &batchEntry{
req: req,
ctx: ctx,
opts: opts,
complete: make(chan *s.BatchResponse_Entry, 1),
if err := c.addEntry(be); err != nil {
return nil, err
resp := <-be.complete
if e := resp.GetErr(); e != nil {
return nil, e.ToError()
return resp, nil
func (c *Client) addEntry(be *batchEntry) error {
return c.bundler.Add(be, proto.Size(be.req))
// bundleHandler is called when a bundle threshold has been met.
// This is a bundler.Bundler handler function. "iface" is []*batchEntry{}, a
// slice of the prototype passed into NewBundler.
// Note that "iface" is owned by this handler; the Bundler allocates a new
// slice after each bundle dispatch. Therefore, retention and mutation are safe.
func (c *Client) bundlerHandler(iface interface{}) {
entries := iface.([]*batchEntry)
if len(entries) == 0 {
ctx, opts := entries[0].ctx, entries[0].opts
go func() {
defer c.outstanding.Done()
c.sendBundle(ctx, entries, opts...)
func (c *Client) sendBundle(ctx context.Context, entries []*batchEntry, opts ...grpc.CallOption) {
req := s.BatchRequest{
Req: make([]*s.BatchRequest_Entry, len(entries)),
for i, ent := range entries {
req.Req[i] = ent.req
resp, err := c.ServicesClient.Batch(ctx, &req, opts...)
// Supply a response to each blocking request. Note that "complete" is a
// buffered channel, so this will not block.
if err != nil {
logging.WithError(err).Errorf(ctx, "Failed to send RPC bundle.")
// Error case: generate an error response from "err".
for _, ent := range entries {
e := s.MakeError(err)
ent.complete <- &s.BatchResponse_Entry{
Value: &s.BatchResponse_Entry_Err{Err: e},
// We don't have a solution for a case where the Coordinator couldn't provide
// a single response. We would infinitely continue retrying our initial
// request set.
// This shouldn't happen, but if it does, make it visible.
if len(resp.Resp) == 0 {
panic(errors.New("batch response had zero entries"))
// Pair each response with its request.
count := 0
for _, r := range resp.Resp {
// Handle error conditions.
switch {
case r.Index < 0, int(r.Index) >= len(entries):
logging.Warningf(ctx, "Response included invalid index %d (%d entries).", r.Index, len(entries))
case entries[r.Index] == nil:
logging.Warningf(ctx, "Response included duplicate entry for index %d.", r.Index)
entries[r.Index].complete <- r
entries[r.Index] = nil
// Fast path: if our count equals the number of entries, then we've processed
// them all.
if count == len(entries) {
// Figure out which entries we didn't process and resubmit.
count = 0
for _, be := range entries {
if be == nil {
// Already processed.
if err := c.addEntry(be); err != nil {
// This was already added successfully, so it can't fail here.
panic(errors.Annotate(err, "failed to re-add entry").Err())
logging.Debugf(ctx, "Resubmitting %d unprocessed entr[y|ies].", count)
type batchEntry struct {
req *s.BatchRequest_Entry
ctx context.Context
opts []grpc.CallOption
complete chan *s.BatchResponse_Entry