blob: c07e28c24f924e16bfe9345d76446b0fb548f70b [file] [log] [blame]
// Copyright 2021 The LUCI Authors.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package bqlog
import (
codepb ""
var (
// Roughly a limit on a size of a single AppendRows message.
batchSizeMaxBytes = 5 * 1024 * 1024
// How long to wait by default before sending an incomplete batch.
defaultBatchAgeMax = 10 * time.Second
// How many bytes to buffer by default before starting dropping excesses.
defaultMaxLiveSizeBytes = 50 * 1024 * 1024
var (
metricSentCounter = metric.NewCounter(
"Count of log entries successfully sent",
field.String("table"), // full table ID "project.dataset.table"
metricDroppedCounter = metric.NewCounter(
"Count of log entries dropped for various reasons",
field.String("table"), // full table ID "project.dataset.table"
field.String("reason"), // reason of why it was dropped if known
metricErrorsCounter = metric.NewCounter(
"Count of encountered RPC errors",
field.String("table"), // full table ID "project.dataset.table"
field.String("code"), // canonical gRPC code (as string) if known
// Bundler buffers logs in memory before sending them to BigQuery.
type Bundler struct {
CloudProject string // the cloud project with the dataset, required
Dataset string // the BQ dataset with log tables, required
m sync.RWMutex
bufs map[protoreflect.FullName]*logBuffer
ctx context.Context
cancel context.CancelFunc
running bool
draining bool
// Sink describes where and how to log proto messages of the given type.
type Sink struct {
Prototype proto.Message // used only for its type descriptor, required
Table string // the BQ table name within the bundler's dataset, required
BatchAgeMax time.Duration // for how long to buffer messages (or 0 for some default)
MaxLiveSizeBytes int // approximate limit on memory buffer size (or 0 for some default)
// RegisterSink tells the bundler where and how to log messages of some
// concrete proto type.
// There can currently be only one sink per proto message type. Must be called
// before the bundler is running. Can be called during the init() time.
func (b *Bundler) RegisterSink(sink Sink) {
if sink.BatchAgeMax == 0 {
sink.BatchAgeMax = defaultBatchAgeMax
if sink.MaxLiveSizeBytes == 0 {
sink.MaxLiveSizeBytes = defaultMaxLiveSizeBytes
} else if sink.MaxLiveSizeBytes < batchSizeMaxBytes {
sink.MaxLiveSizeBytes = batchSizeMaxBytes
if sink.Table == "" {
panic("missing required field Table")
defer b.m.Unlock()
if b.running {
panic("the bundler is already running")
typ := proto.MessageName(sink.Prototype)
if b.bufs[typ] != nil {
panic(fmt.Sprintf("message type %q was already registered with RegisterSink", typ))
if b.bufs == nil {
b.bufs = make(map[protoreflect.FullName]*logBuffer, 1)
b.bufs[typ] = &logBuffer{
decl: sink,
desc: protodesc.ToDescriptorProto(sink.Prototype.ProtoReflect().Descriptor()),
// Log asynchronously logs the given message to a BQ table associated with
// the message type via a prior RegisterSink call.
// This is a best effort operation (and thus returns no error).
// Messages are dropped when:
// - Writes to BigQuery are failing with a fatal error:
// - The table doesn't exist.
// - The table has an incompatible schema.
// - The server account has no permission to write to the table.
// - Etc.
// - The server crashes before it manages to flush buffered logs.
// - The internal flush buffer is full (per MaxLiveSizeBytes).
// In case of transient write errors messages may be duplicated.
// Panics if `m` was not registered via RegisterSink or if the bundler is not
// running yet.
func (b *Bundler) Log(ctx context.Context, m proto.Message) {
typ := proto.MessageName(m)
blob, err := proto.Marshal(m)
defer b.m.RUnlock()
if !b.running {
panic("the bundler is not running yet")
buf := b.bufs[typ]
if buf == nil {
panic(fmt.Sprintf("message type %q was not registered with RegisterSink", typ))
switch {
case err != nil:
recordDrop(ctx, buf.tableID, 1, err, "MARSHAL_ERROR")
case b.draining:
recordDrop(ctx, buf.tableID, 1, errors.New("draining already"), "DRAINING")
// Note: we explicitly do not select on ctx.Done() since often the context
// is already expired (e.g. when logging from handlers that have reached the
// deadline). Sending to the channel here should be fast, since buffer's
// FullBehavior is set to DropOldestBatch, i.e. it never really blocks.
buf.disp.C <- blob
// Start launches the bundler internal goroutines.
// Canceling the context with cease all bundler activities. To gracefully
// shutdown the bundler (e.g. by flushing all pending logs) use Shutdown.
func (b *Bundler) Start(ctx context.Context, w BigQueryWriter) {
if b.CloudProject == "" {
panic("missing required field CloudProject")
if b.Dataset == "" {
panic("missing required field Dataset")
defer b.m.Unlock()
if b.running {
panic("the bundler is already running")
b.running = true
b.ctx, b.cancel = context.WithCancel(ctx)
for _, buf := range b.bufs {
tableID := fmt.Sprintf("%s.%s.%s", b.CloudProject, b.Dataset, buf.decl.Table)
bufCtx := loggingFields(b.ctx, tableID)
buf.start(bufCtx, tableID, &logSender{
ctx: bufCtx,
w: w,
tableID: tableID,
desc: buf.desc,
dest: fmt.Sprintf("projects/%s/datasets/%s/tables/%s/_default", b.CloudProject, b.Dataset, buf.decl.Table),
// Shutdown flushes pending logs and closes streaming RPCs.
// Does nothing if the bundler wasn't running. Gives up waiting for all data to
// be flushed (and drops it) after 15s timeout or when `ctx` is canceled.
func (b *Bundler) Shutdown(ctx context.Context) {
var drained []chan struct{}
if b.running && !b.draining {
b.draining = true
for _, buf := range b.bufs {
drained = append(drained, buf.drain(loggingFields(ctx, buf.tableID)))
// Totally shutdown everything after some deadline by canceling the root
// bundler context. It should cause all dispatcher.Channels to give up on
// any retries they might be doing.
cancel := b.cancel
go func() {
<-clock.After(ctx, 15*time.Second)
for _, ch := range drained {
func loggingFields(ctx context.Context, tableID string) context.Context {
return logging.SetFields(ctx, logging.Fields{
"activity": "luci.bqlog",
"table": tableID,
func recordSent(ctx context.Context, tableID string, count int) {
metricSentCounter.Add(ctx, int64(count), tableID)
func recordDrop(ctx context.Context, tableID string, count int, err error, reason string) {
ctx = loggingFields(ctx, tableID)
if err != nil {
logging.Errorf(ctx, "Dropped %d row(s): %s: %s", count, reason, err)
} else {
logging.Errorf(ctx, "Dropped %d row(s): %s", count, reason)
metricDroppedCounter.Add(ctx, int64(count), tableID, reason)
func recordErr(ctx context.Context, tableID string, count int, err error) {
ctx = loggingFields(ctx, tableID)
if transient.Tag.In(err) {
logging.Warningf(ctx, "Transient error when sending %d row(s): %s", count, err)
} else {
logging.Errorf(ctx, "Fatal error when sending %d row(s): %s", count, err)
codeStr := "UNKNOWN"
if code := grpcutil.Code(err); code != codes.Unknown {
if codeStr = codepb.Code_name[int32(code)]; codeStr == "" {
codeStr = fmt.Sprintf("CODE_%d", code)
} else if errors.Contains(err, io.EOF) {
codeStr = "EOF"
metricErrorsCounter.Add(ctx, 1, tableID, codeStr)
type logBuffer struct {
decl Sink
desc *descriptorpb.DescriptorProto
tableID string
sender *logSender
disp dispatcher.Channel[[]byte]
func (b *logBuffer) start(ctx context.Context, tableID string, sender *logSender) {
b.tableID = tableID
b.sender = sender
opts := dispatcher.Options[[]byte]{
ItemSizeFunc: func(itm []byte) int { return len(itm) },
DropFn: func(data *buffer.Batch[[]byte], flush bool) {
if data != nil {
recordDrop(ctx, b.tableID, len(data.Data), nil, "DISPATCHER")
ErrorFn: func(data *buffer.Batch[[]byte], err error) (retry bool) {
recordErr(ctx, b.tableID, len(data.Data), err)
return transient.Tag.In(err)
Buffer: buffer.Options{
MaxLeases: 1, // there can be only one outstanding write per an RPC stream
BatchItemsMax: -1, // cut batches based on size
BatchSizeMax: batchSizeMaxBytes,
BatchAgeMax: b.decl.BatchAgeMax,
FullBehavior: &buffer.DropOldestBatch{
MaxLiveSize: b.decl.MaxLiveSizeBytes,
var err error
b.disp, err = dispatcher.NewChannel[[]byte](ctx, &opts, sender.send)
if err != nil {
panic(fmt.Sprintf("failed to start the dispatcher: %s", err)) // should not be happening
func (b *logBuffer) drain(ctx context.Context) chan struct{} {
logging.Debugf(ctx, "Draining...")
drained := make(chan struct{})
go func() {
// Wait until the dispatcher channel is drained into the gRPC sender.
select {
case <-ctx.Done():
case <-b.disp.DrainC:
// Wait until the pending gRPC data is flushed.
logging.Debugf(ctx, "Drained")
return drained
type logSender struct {
ctx context.Context
w BigQueryWriter
tableID string
desc *descriptorpb.DescriptorProto
dest string
m sync.Mutex
stream storagepb.BigQueryWrite_AppendRowsClient
func (s *logSender) send(data *buffer.Batch[[]byte]) (rerr error) {
// There allowed only one concurrent Send or CloseSend per a gRPC stream.
defer s.m.Unlock()
// Open the gRPC stream if have none yet.
opened := false
if == nil {
stream, err := s.w.AppendRows(s.ctx)
if err != nil {
return grpcutil.WrapIfTransient(err)
opened = true = stream
// Prepare the request.
protoData := &storagepb.AppendRowsRequest_ProtoData{
Rows: &storagepb.ProtoRows{
SerializedRows: make([][]byte, len(data.Data)),
for i, row := range data.Data {
protoData.Rows.SerializedRows[i] = row.Item
// WriterSchema field is necessary only in the first request.
if opened {
protoData.WriterSchema = &storagepb.ProtoSchema{
ProtoDescriptor: s.desc,
err :={
WriteStream: s.dest,
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: protoData,
// If the stream was aborted, properly shut it all down so we can try again
// later with a new stream.
if err != nil { = nil
return errors.Annotate(err, "failed to send data to BQ").Tag(transient.Tag).Err()
// Otherwise try to read the acknowledgment from the server. We need to wait
// for it to make sure it is OK to "forget" about the `data` batch.
resp, err :=
// If there's a gRPC-level error, it means the connection is broken and should
// be abandoned.
if err != nil { = nil
if err == io.EOF {
return errors.Annotate(err, "server unexpected closed the connection").Tag(transient.Tag).Err()
return grpcutil.WrapIfTransient(err)
// If the overall connection is fine, but the latest push specifically was
// rejected, the error is in `resp`.
if sts := resp.GetError(); sts != nil {
return grpcutil.WrapIfTransient(status.ErrorProto(sts))
recordSent(s.ctx, s.tableID, len(data.Data))
return nil
func (s *logSender) stop() {
defer s.m.Unlock()
if != nil { // wait for ACK = nil