| // Copyright 2021 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bqlog |
| |
| import ( |
| "context" |
| "fmt" |
| "io" |
| "sync" |
| "time" |
| |
| storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" |
| codepb "google.golang.org/genproto/googleapis/rpc/code" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| "google.golang.org/protobuf/proto" |
| "google.golang.org/protobuf/reflect/protodesc" |
| "google.golang.org/protobuf/reflect/protoreflect" |
| "google.golang.org/protobuf/types/descriptorpb" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/retry/transient" |
| "go.chromium.org/luci/common/sync/dispatcher" |
| "go.chromium.org/luci/common/sync/dispatcher/buffer" |
| "go.chromium.org/luci/common/tsmon/field" |
| "go.chromium.org/luci/common/tsmon/metric" |
| "go.chromium.org/luci/grpc/grpcutil" |
| ) |
| |
| var ( |
| // Roughly a limit on a size of a single AppendRows message. |
| batchSizeMaxBytes = 5 * 1024 * 1024 |
| // How long to wait by default before sending an incomplete batch. |
| defaultBatchAgeMax = 10 * time.Second |
| // How many bytes to buffer by default before starting dropping excesses. |
| defaultMaxLiveSizeBytes = 50 * 1024 * 1024 |
| ) |
| |
| var ( |
| metricSentCounter = metric.NewCounter( |
| "bqlog/sent", |
| "Count of log entries successfully sent", |
| nil, |
| field.String("table"), // full table ID "project.dataset.table" |
| ) |
| metricDroppedCounter = metric.NewCounter( |
| "bqlog/dropped", |
| "Count of log entries dropped for various reasons", |
| nil, |
| field.String("table"), // full table ID "project.dataset.table" |
| field.String("reason"), // reason of why it was dropped if known |
| ) |
| metricErrorsCounter = metric.NewCounter( |
| "bqlog/errors", |
| "Count of encountered RPC errors", |
| nil, |
| field.String("table"), // full table ID "project.dataset.table" |
| field.String("code"), // canonical gRPC code (as string) if known |
| ) |
| ) |
| |
| // Bundler buffers logs in memory before sending them to BigQuery. |
| type Bundler struct { |
| CloudProject string // the cloud project with the dataset, required |
| Dataset string // the BQ dataset with log tables, required |
| |
| m sync.RWMutex |
| bufs map[protoreflect.FullName]*logBuffer |
| ctx context.Context |
| cancel context.CancelFunc |
| running bool |
| draining bool |
| } |
| |
| // Sink describes where and how to log proto messages of the given type. |
| type Sink struct { |
| Prototype proto.Message // used only for its type descriptor, required |
| Table string // the BQ table name within the bundler's dataset, required |
| BatchAgeMax time.Duration // for how long to buffer messages (or 0 for some default) |
| MaxLiveSizeBytes int // approximate limit on memory buffer size (or 0 for some default) |
| } |
| |
| // RegisterSink tells the bundler where and how to log messages of some |
| // concrete proto type. |
| // |
| // There can currently be only one sink per proto message type. Must be called |
| // before the bundler is running. Can be called during the init() time. |
| func (b *Bundler) RegisterSink(sink Sink) { |
| if sink.BatchAgeMax == 0 { |
| sink.BatchAgeMax = defaultBatchAgeMax |
| } |
| if sink.MaxLiveSizeBytes == 0 { |
| sink.MaxLiveSizeBytes = defaultMaxLiveSizeBytes |
| } else if sink.MaxLiveSizeBytes < batchSizeMaxBytes { |
| sink.MaxLiveSizeBytes = batchSizeMaxBytes |
| } |
| |
| if sink.Table == "" { |
| panic("missing required field Table") |
| } |
| |
| b.m.Lock() |
| defer b.m.Unlock() |
| |
| if b.running { |
| panic("the bundler is already running") |
| } |
| |
| typ := proto.MessageName(sink.Prototype) |
| if b.bufs[typ] != nil { |
| panic(fmt.Sprintf("message type %q was already registered with RegisterSink", typ)) |
| } |
| if b.bufs == nil { |
| b.bufs = make(map[protoreflect.FullName]*logBuffer, 1) |
| } |
| b.bufs[typ] = &logBuffer{ |
| decl: sink, |
| desc: protodesc.ToDescriptorProto(sink.Prototype.ProtoReflect().Descriptor()), |
| } |
| } |
| |
| // Log asynchronously logs the given message to a BQ table associated with |
| // the message type via a prior RegisterSink call. |
| // |
| // This is a best effort operation (and thus returns no error). |
| // |
| // Messages are dropped when: |
| // * Writes to BigQuery are failing with a fatal error: |
| // - The table doesn't exist. |
| // - The table has an incompatible schema. |
| // - The server account has no permission to write to the table. |
| // - Etc. |
| // * The server crashes before it manages to flush buffered logs. |
| // * The internal flush buffer is full (per MaxLiveSizeBytes). |
| // |
| // In case of transient write errors messages may be duplicated. |
| // |
| // Panics if `m` was not registered via RegisterSink or if the bundler is not |
| // running yet. |
| func (b *Bundler) Log(ctx context.Context, m proto.Message) { |
| typ := proto.MessageName(m) |
| blob, err := proto.Marshal(m) |
| |
| b.m.RLock() |
| defer b.m.RUnlock() |
| |
| if !b.running { |
| panic("the bundler is not running yet") |
| } |
| buf := b.bufs[typ] |
| if buf == nil { |
| panic(fmt.Sprintf("message type %q was not registered with RegisterSink", typ)) |
| } |
| |
| switch { |
| case err != nil: |
| recordDrop(ctx, buf.tableID, 1, err, "MARSHAL_ERROR") |
| case b.draining: |
| recordDrop(ctx, buf.tableID, 1, errors.New("draining already"), "DRAINING") |
| default: |
| select { |
| case buf.disp.C <- blob: |
| case <-ctx.Done(): |
| recordDrop(ctx, buf.tableID, 1, ctx.Err(), "CONTEXT_DEADLINE") |
| } |
| } |
| } |
| |
| // Start launches the bundler internal goroutines. |
| // |
| // Canceling the context with cease all bundler activities. To gracefully |
| // shutdown the bundler (e.g. by flushing all pending logs) use Shutdown. |
| func (b *Bundler) Start(ctx context.Context, w BigQueryWriter) { |
| if b.CloudProject == "" { |
| panic("missing required field CloudProject") |
| } |
| if b.Dataset == "" { |
| panic("missing required field Dataset") |
| } |
| |
| b.m.Lock() |
| defer b.m.Unlock() |
| |
| if b.running { |
| panic("the bundler is already running") |
| } |
| b.running = true |
| b.ctx, b.cancel = context.WithCancel(ctx) |
| |
| for _, buf := range b.bufs { |
| tableID := fmt.Sprintf("%s.%s.%s", b.CloudProject, b.Dataset, buf.decl.Table) |
| bufCtx := loggingFields(b.ctx, tableID) |
| buf.start(bufCtx, tableID, &logSender{ |
| ctx: bufCtx, |
| w: w, |
| tableID: tableID, |
| desc: buf.desc, |
| dest: fmt.Sprintf("projects/%s/datasets/%s/tables/%s/_default", b.CloudProject, b.Dataset, buf.decl.Table), |
| }) |
| } |
| } |
| |
| // Shutdown flushes pending logs and closes streaming RPCs. |
| // |
| // Does nothing if the bundler wasn't running. Gives up waiting for all data to |
| // be flushed (and drops it) after 15s timeout or when `ctx` is canceled. |
| func (b *Bundler) Shutdown(ctx context.Context) { |
| var drained []chan struct{} |
| |
| b.m.Lock() |
| if b.running && !b.draining { |
| b.draining = true |
| for _, buf := range b.bufs { |
| drained = append(drained, buf.drain(loggingFields(ctx, buf.tableID))) |
| } |
| // Totally shutdown everything after some deadline by canceling the root |
| // bundler context. It should cause all dispatcher.Channels to give up on |
| // any retries they might be doing. |
| cancel := b.cancel |
| go func() { |
| <-clock.After(ctx, 15*time.Second) |
| cancel() |
| }() |
| } |
| b.m.Unlock() |
| |
| for _, ch := range drained { |
| <-ch |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| func loggingFields(ctx context.Context, tableID string) context.Context { |
| return logging.SetFields(ctx, logging.Fields{ |
| "activity": "luci.bqlog", |
| "table": tableID, |
| }) |
| } |
| |
| func recordSent(ctx context.Context, tableID string, count int) { |
| metricSentCounter.Add(ctx, int64(count), tableID) |
| } |
| |
| func recordDrop(ctx context.Context, tableID string, count int, err error, reason string) { |
| ctx = loggingFields(ctx, tableID) |
| if err != nil { |
| logging.Errorf(ctx, "Dropped %d row(s): %s: %s", count, reason, err) |
| } else { |
| logging.Errorf(ctx, "Dropped %d row(s): %s", count, reason) |
| } |
| metricDroppedCounter.Add(ctx, int64(count), tableID, reason) |
| } |
| |
| func recordErr(ctx context.Context, tableID string, count int, err error) { |
| ctx = loggingFields(ctx, tableID) |
| if transient.Tag.In(err) { |
| logging.Warningf(ctx, "Transient error when sending %d row(s): %s", count, err) |
| } else { |
| logging.Errorf(ctx, "Fatal error when sending %d row(s): %s", count, err) |
| } |
| codeStr := "UNKNOWN" |
| if code := grpcutil.Code(err); code != codes.Unknown { |
| if codeStr = codepb.Code_name[int32(code)]; codeStr == "" { |
| codeStr = fmt.Sprintf("CODE_%d", code) |
| } |
| } else if errors.Contains(err, io.EOF) { |
| codeStr = "EOF" |
| } |
| metricErrorsCounter.Add(ctx, 1, tableID, codeStr) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| type logBuffer struct { |
| decl Sink |
| desc *descriptorpb.DescriptorProto |
| tableID string |
| sender *logSender |
| disp dispatcher.Channel |
| } |
| |
| func (b *logBuffer) start(ctx context.Context, tableID string, sender *logSender) { |
| b.tableID = tableID |
| b.sender = sender |
| |
| opts := dispatcher.Options{ |
| ItemSizeFunc: func(itm interface{}) int { return len(itm.([]byte)) }, |
| DropFn: func(data *buffer.Batch, flush bool) { |
| if data != nil { |
| recordDrop(ctx, b.tableID, len(data.Data), nil, "DISPATCHER") |
| } |
| }, |
| ErrorFn: func(data *buffer.Batch, err error) (retry bool) { |
| recordErr(ctx, b.tableID, len(data.Data), err) |
| return transient.Tag.In(err) |
| }, |
| Buffer: buffer.Options{ |
| MaxLeases: 1, // there can be only one outstanding write per an RPC stream |
| BatchItemsMax: -1, // cut batches based on size |
| BatchSizeMax: batchSizeMaxBytes, |
| BatchAgeMax: b.decl.BatchAgeMax, |
| FullBehavior: &buffer.DropOldestBatch{ |
| MaxLiveSize: b.decl.MaxLiveSizeBytes, |
| }, |
| }, |
| } |
| |
| var err error |
| b.disp, err = dispatcher.NewChannel(ctx, &opts, sender.send) |
| if err != nil { |
| panic(fmt.Sprintf("failed to start the dispatcher: %s", err)) // should not be happening |
| } |
| } |
| |
| func (b *logBuffer) drain(ctx context.Context) chan struct{} { |
| logging.Debugf(ctx, "Draining...") |
| |
| drained := make(chan struct{}) |
| |
| b.disp.Close() |
| go func() { |
| // Wait until the dispatcher channel is drained into the gRPC sender. |
| select { |
| case <-ctx.Done(): |
| case <-b.disp.DrainC: |
| } |
| |
| // Wait until the pending gRPC data is flushed. |
| b.sender.stop() |
| |
| logging.Debugf(ctx, "Drained") |
| close(drained) |
| }() |
| |
| return drained |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| type logSender struct { |
| ctx context.Context |
| w BigQueryWriter |
| tableID string |
| desc *descriptorpb.DescriptorProto |
| dest string |
| |
| m sync.Mutex |
| stream storagepb.BigQueryWrite_AppendRowsClient |
| } |
| |
| func (s *logSender) send(data *buffer.Batch) (rerr error) { |
| // There allowed only one concurrent Send or CloseSend per a gRPC stream. |
| s.m.Lock() |
| defer s.m.Unlock() |
| |
| // Open the gRPC stream if have none yet. |
| opened := false |
| if s.stream == nil { |
| stream, err := s.w.AppendRows(s.ctx) |
| if err != nil { |
| return grpcutil.WrapIfTransient(err) |
| } |
| opened = true |
| s.stream = stream |
| } |
| |
| // Prepare the request. |
| protoData := &storagepb.AppendRowsRequest_ProtoData{ |
| Rows: &storagepb.ProtoRows{ |
| SerializedRows: make([][]byte, len(data.Data)), |
| }, |
| } |
| for i, row := range data.Data { |
| protoData.Rows.SerializedRows[i] = row.Item.([]byte) |
| } |
| |
| // WriterSchema field is necessary only in the first request. |
| if opened { |
| protoData.WriterSchema = &storagepb.ProtoSchema{ |
| ProtoDescriptor: s.desc, |
| } |
| } |
| |
| err := s.stream.Send(&storagepb.AppendRowsRequest{ |
| WriteStream: s.dest, |
| Rows: &storagepb.AppendRowsRequest_ProtoRows{ |
| ProtoRows: protoData, |
| }, |
| }) |
| |
| // If the stream was aborted, properly shut it all down so we can try again |
| // later with a new stream. |
| if err != nil { |
| s.stream.CloseSend() |
| s.stream.Recv() |
| s.stream = nil |
| return errors.Annotate(err, "failed to send data to BQ").Tag(transient.Tag).Err() |
| } |
| |
| // Otherwise try to read the acknowledgment from the server. We need to wait |
| // for it to make sure it is OK to "forget" about the `data` batch. |
| resp, err := s.stream.Recv() |
| |
| // If there's a gRPC-level error, it means the connection is broken and should |
| // be abandoned. |
| if err != nil { |
| s.stream = nil |
| if err == io.EOF { |
| return errors.Annotate(err, "server unexpected closed the connection").Tag(transient.Tag).Err() |
| } |
| return grpcutil.WrapIfTransient(err) |
| } |
| |
| // If the overall connection is fine, but the latest push specifically was |
| // rejected, the error is in `resp`. |
| if sts := resp.GetError(); sts != nil { |
| return grpcutil.WrapIfTransient(status.ErrorProto(sts)) |
| } |
| |
| recordSent(s.ctx, s.tableID, len(data.Data)) |
| return nil |
| } |
| |
| func (s *logSender) stop() { |
| s.m.Lock() |
| defer s.m.Unlock() |
| if s.stream != nil { |
| s.stream.CloseSend() |
| s.stream.Recv() // wait for ACK |
| s.stream = nil |
| } |
| } |