| // Copyright 2018 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package bq |
| |
| import ( |
| "bytes" |
| "context" |
| "fmt" |
| "math" |
| "reflect" |
| "sort" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| "cloud.google.com/go/bigquery" |
| "github.com/golang/protobuf/jsonpb" |
| "github.com/golang/protobuf/proto" |
| "google.golang.org/protobuf/types/known/durationpb" |
| "google.golang.org/protobuf/types/known/structpb" |
| "google.golang.org/protobuf/types/known/timestamppb" |
| |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/logging" |
| "go.chromium.org/luci/common/sync/parallel" |
| "go.chromium.org/luci/common/tsmon/field" |
| "go.chromium.org/luci/common/tsmon/metric" |
| ) |
| |
| // ID is the global InsertIDGenerator |
| var ID InsertIDGenerator |
| |
| // fieldInfo is metadata of a proto field. |
| // Retrieve field infos using getFieldInfos. |
| // |
| // For oneof, one oneof declaration is mapped to one fieldinfo, |
| // as opposed to one fieldinfo per oneof member. |
| type fieldInfo struct { |
| *proto.Properties |
| structIndex []int |
| // oneOfFields maps a oneof struct type to its metadata. |
| // Initialized only for oneof declaration fields. |
| oneOfFields map[reflect.Type]oneOfFieldInfo |
| } |
| |
| type oneOfFieldInfo struct { |
| *proto.Properties |
| valueFieldIndex []int // index of the field within a oneof struct |
| } |
| |
| var bqFields = map[reflect.Type][]fieldInfo{} |
| var bqFieldsLock = sync.RWMutex{} |
| |
| var protoMessageType = reflect.TypeOf((*proto.Message)(nil)).Elem() |
| |
| const insertLimit = 10000 |
| const batchDefault = 500 |
| |
| // Uploader contains the necessary data for streaming data to BigQuery. |
| type Uploader struct { |
| *bigquery.Uploader |
| // Uploader is bound to a specific table. DatasetID and Table ID are |
| // provided for reference. |
| DatasetID string |
| TableID string |
| // UploadsMetricName is a string used to create a tsmon Counter metric |
| // for event upload attempts via Put, e.g. |
| // "/chrome/infra/commit_queue/events/count". If unset, no metric will |
| // be created. |
| UploadsMetricName string |
| // uploads is the Counter metric described by UploadsMetricName. It |
| // contains a field "status" set to either "success" or "failure." |
| uploads metric.Counter |
| initMetricOnce sync.Once |
| // BatchSize is the max number of rows to send to BigQuery at a time. |
| // The default is 500. |
| BatchSize int |
| } |
| |
| // Row implements bigquery.ValueSaver |
| type Row struct { |
| proto.Message // embedded |
| |
| // InsertID is unique per insert operation to handle deduplication. |
| InsertID string |
| } |
| |
| // Save is used by bigquery.Uploader.Put when inserting values into a table. |
| func (r *Row) Save() (map[string]bigquery.Value, string, error) { |
| m, err := mapFromMessage(r.Message, nil) |
| return m, r.InsertID, err |
| } |
| |
| // mapFromMessage returns a {BQ Field name: BQ value} map. |
| // path is a slice of Go field names leading to m. |
| func mapFromMessage(m proto.Message, path []string) (map[string]bigquery.Value, error) { |
| sPtr := reflect.ValueOf(m) |
| switch { |
| case sPtr.Kind() != reflect.Ptr: |
| return nil, fmt.Errorf("type %T implementing proto.Message is not a pointer", m) |
| case sPtr.IsNil(): |
| return nil, nil |
| } |
| |
| s := sPtr.Elem() |
| if s.Kind() != reflect.Struct { |
| return nil, fmt.Errorf("type %T implementing proto.Message is not a pointer to a struct", m) |
| } |
| |
| t := s.Type() |
| infos, err := getFieldInfos(t) |
| if err != nil { |
| return nil, errors.Annotate(err, "could not populate bqFields for type %v", t).Err() |
| } |
| path = append(path, "") |
| |
| var row map[string]bigquery.Value // keep it nil unless there are values |
| for _, fi := range infos { |
| var bqField string |
| var bqValue interface{} |
| path[len(path)-1] = fi.Name |
| |
| switch { |
| case len(fi.oneOfFields) != 0: |
| val := s.FieldByIndex(fi.structIndex) |
| if val.IsNil() { |
| continue |
| } |
| structPtr := val.Elem() |
| oof := fi.oneOfFields[structPtr.Type()] |
| bqField = oof.OrigName |
| rawValue := structPtr.Elem().FieldByIndex(oof.valueFieldIndex).Interface() |
| if bqValue, err = getValue(rawValue, path, oof.Properties); err != nil { |
| return nil, errors.Annotate(err, "%s", fi.OrigName).Err() |
| } else if bqValue == nil { |
| // Omit NULL values. |
| continue |
| } |
| |
| case fi.Repeated: |
| f := s.FieldByIndex(fi.structIndex) |
| // init value only if there are elements |
| n := f.Len() |
| if n == 0 { |
| // omit a repeated field with no elements. |
| continue |
| } |
| |
| elems := make([]interface{}, n) |
| vPath := append(path, "") |
| switch f.Kind() { |
| case reflect.Slice: |
| for i := 0; i < len(elems); i++ { |
| vPath[len(vPath)-1] = strconv.Itoa(i) |
| elems[i], err = getValue(f.Index(i).Interface(), vPath, fi.Properties) |
| if err != nil { |
| return nil, errors.Annotate(err, "%s[%d]", fi.OrigName, i).Err() |
| } |
| } |
| |
| case reflect.Map: |
| if f.Type().Key().Kind() != reflect.String { |
| return nil, fmt.Errorf("map key must be a string") |
| } |
| |
| keys := f.MapKeys() |
| sort.Slice(keys, func(i, j int) bool { |
| return keys[i].String() < keys[j].String() |
| }) |
| |
| for i, k := range keys { |
| kStr := k.String() |
| vPath[len(vPath)-1] = kStr |
| elemValue, err := getValue(f.MapIndex(k).Interface(), vPath, fi.Properties) |
| if err != nil { |
| return nil, errors.Annotate(err, "%s[%s]", fi.OrigName, kStr).Err() |
| } |
| elems[i] = map[string]bigquery.Value{ |
| "key": kStr, |
| "value": elemValue, |
| } |
| } |
| |
| default: |
| return nil, fmt.Errorf("kind %s not supported as a repeated field", f.Kind()) |
| } |
| bqField = fi.OrigName |
| bqValue = elems |
| |
| default: |
| bqField = fi.OrigName |
| if bqValue, err = getValue(s.FieldByIndex(fi.structIndex).Interface(), path, fi.Properties); err != nil { |
| return nil, errors.Annotate(err, "%s", fi.OrigName).Err() |
| } else if bqValue == nil { |
| // Omit NULL values. |
| continue |
| } |
| } |
| |
| if row == nil { |
| row = map[string]bigquery.Value{} |
| } |
| row[bqField] = bigquery.Value(bqValue) |
| } |
| return row, nil |
| } |
| |
| // getFieldInfos returns field metadata for a given proto go type. |
| // Caches results. |
| func getFieldInfos(t reflect.Type) ([]fieldInfo, error) { |
| bqFieldsLock.RLock() |
| f := bqFields[t] |
| bqFieldsLock.RUnlock() |
| if f != nil { |
| return f, nil |
| } |
| |
| bqFieldsLock.Lock() |
| defer bqFieldsLock.Unlock() |
| return getFieldInfosLocked(t) |
| } |
| |
| func getFieldInfosLocked(t reflect.Type) ([]fieldInfo, error) { |
| if f := bqFields[t]; f != nil { |
| return f, nil |
| } |
| |
| structProp := proto.GetProperties(t) |
| |
| oneOfs := map[int]map[reflect.Type]oneOfFieldInfo{} |
| for _, of := range structProp.OneofTypes { |
| f, ok := of.Type.Elem().FieldByName(of.Prop.Name) |
| if !ok { |
| return nil, fmt.Errorf("field %q not found in %q", of.Prop.Name, of.Type) |
| } |
| |
| typeMap := oneOfs[of.Field] |
| if typeMap == nil { |
| typeMap = map[reflect.Type]oneOfFieldInfo{} |
| oneOfs[of.Field] = typeMap |
| } |
| typeMap[of.Type] = oneOfFieldInfo{ |
| Properties: of.Prop, |
| valueFieldIndex: f.Index, |
| } |
| } |
| |
| fields := make([]fieldInfo, 0, len(structProp.Prop)) |
| for _, p := range structProp.Prop { |
| if strings.HasPrefix(p.Name, "XXX_") { |
| continue |
| } |
| |
| f, ok := t.FieldByName(p.Name) |
| if !ok { |
| return nil, fmt.Errorf("field %q not found in %q", p.Name, t) |
| } |
| |
| ft := f.Type |
| if ft.Kind() == reflect.Slice { |
| ft = ft.Elem() |
| } |
| if ft.Implements(protoMessageType) && ft.Kind() == reflect.Ptr { |
| if st := ft.Elem(); st.Kind() == reflect.Struct { |
| // Note: this will crash with a stack overflow if the protobuf |
| // message is recursive, but bqschemaupdater should catch that |
| // earlier. |
| subfields, err := getFieldInfosLocked(st) |
| if err != nil { |
| return nil, err |
| } |
| if len(subfields) == 0 { |
| // Skip RECORD fields with no sub-fields. |
| continue |
| } |
| } |
| } |
| fields = append(fields, fieldInfo{ |
| Properties: p, |
| structIndex: f.Index, |
| oneOfFields: oneOfs[f.Index[0]], |
| }) |
| } |
| |
| bqFields[t] = fields |
| return fields, nil |
| } |
| |
| func getValue(value interface{}, path []string, prop *proto.Properties) (interface{}, error) { |
| if prop.Enum != "" { |
| stringer, ok := value.(fmt.Stringer) |
| if !ok { |
| return nil, fmt.Errorf("could not convert enum value to string") |
| } |
| return stringer.String(), nil |
| } else if dpb, ok := value.(*durationpb.Duration); ok { |
| if dpb == nil { |
| return nil, nil |
| } |
| if err := dpb.CheckValid(); err != nil { |
| return nil, fmt.Errorf("tried to write an invalid duration for [%+v] for field %q", dpb, strings.Join(path, ".")) |
| } |
| value := dpb.AsDuration() |
| // Convert to FLOAT64. |
| return value.Seconds(), nil |
| } else if tspb, ok := value.(*timestamppb.Timestamp); ok { |
| if tspb == nil { |
| return nil, nil |
| } |
| if err := tspb.CheckValid(); err != nil { |
| return nil, fmt.Errorf("tried to write an invalid timestamp for [%+v] for field %q", tspb, strings.Join(path, ".")) |
| } |
| value := tspb.AsTime() |
| return value, nil |
| } else if s, ok := value.(*structpb.Struct); ok { |
| if s == nil { |
| return nil, nil |
| } |
| // Structs are persisted as JSONPB strings. |
| // See also https://bit.ly/chromium-bq-struct |
| var buf bytes.Buffer |
| if err := (&jsonpb.Marshaler{}).Marshal(&buf, s); err != nil { |
| return nil, err |
| } |
| return buf.String(), nil |
| } else if nested, ok := value.(proto.Message); ok { |
| if nested == nil { |
| return nil, nil |
| } |
| m, err := mapFromMessage(nested, path) |
| if m == nil { |
| // a nil map is not nil when converted to interface{}, |
| // so return nil explicitly. |
| return nil, err |
| } |
| return m, err |
| } else { |
| return value, nil |
| } |
| } |
| |
| // NewUploader constructs a new Uploader struct. |
| // |
| // DatasetID and TableID are provided to the BigQuery client to |
| // gain access to a particular table. |
| // |
| // You may want to change the default configuration of the bigquery.Uploader. |
| // Check the documentation for more details. |
| // |
| // Set UploadsMetricName on the resulting Uploader to use the default counter |
| // metric. |
| // |
| // Set BatchSize to set a custom batch size. |
| func NewUploader(ctx context.Context, c *bigquery.Client, datasetID, tableID string) *Uploader { |
| return &Uploader{ |
| DatasetID: datasetID, |
| TableID: tableID, |
| Uploader: c.Dataset(datasetID).Table(tableID).Uploader(), |
| } |
| } |
| |
| func (u *Uploader) batchSize() int { |
| switch { |
| case u.BatchSize > insertLimit: |
| return insertLimit |
| case u.BatchSize <= 0: |
| return batchDefault |
| default: |
| return u.BatchSize |
| } |
| } |
| |
| func (u *Uploader) getCounter() metric.Counter { |
| u.initMetricOnce.Do(func() { |
| if u.UploadsMetricName != "" { |
| desc := "Upload attempts; status is 'success' or 'failure'" |
| field := field.String("status") |
| u.uploads = metric.NewCounter(u.UploadsMetricName, desc, nil, field) |
| } |
| }) |
| return u.uploads |
| } |
| |
| func (u *Uploader) updateUploads(ctx context.Context, count int64, status string) { |
| if uploads := u.getCounter(); uploads != nil && count != 0 { |
| uploads.Add(ctx, count, status) |
| } |
| } |
| |
| // Put uploads one or more rows to the BigQuery service. Put takes care of |
| // adding InsertIDs, used by BigQuery to deduplicate rows. |
| // |
| // If any rows do now match one of the expected types, Put will not attempt to |
| // upload any rows and returns an InvalidTypeError. |
| // |
| // Put returns a PutMultiError if one or more rows failed to be uploaded. |
| // The PutMultiError contains a RowInsertionError for each failed row. |
| // |
| // Put will retry on temporary errors. If the error persists, the call will |
| // run indefinitely. Because of this, if ctx does not have a timeout, Put will |
| // add one. |
| // |
| // See bigquery documentation and source code for detailed information on how |
| // struct values are mapped to rows. |
| func (u *Uploader) Put(ctx context.Context, messages ...proto.Message) error { |
| if _, ok := ctx.Deadline(); !ok { |
| var cancel context.CancelFunc |
| ctx, cancel = context.WithTimeout(ctx, time.Minute) |
| defer cancel() |
| } |
| rows := make([]*Row, len(messages)) |
| for i, m := range messages { |
| rows[i] = &Row{ |
| Message: m, |
| InsertID: ID.Generate(), |
| } |
| } |
| |
| return parallel.WorkPool(16, func(workC chan<- func() error) { |
| for _, rowSet := range batch(rows, u.batchSize()) { |
| rowSet := rowSet |
| workC <- func() error { |
| var failed int |
| err := u.Uploader.Put(ctx, rowSet) |
| if err != nil { |
| logging.WithError(err).Errorf(ctx, "eventupload: Uploader.Put failed") |
| if merr, ok := err.(bigquery.PutMultiError); ok { |
| if failed = len(merr); failed > len(rowSet) { |
| logging.Errorf(ctx, "eventupload: %v failures trying to insert %v rows", failed, len(rowSet)) |
| } |
| } else { |
| failed = len(rowSet) |
| } |
| u.updateUploads(ctx, int64(failed), "failure") |
| } |
| succeeded := len(rowSet) - failed |
| u.updateUploads(ctx, int64(succeeded), "success") |
| return err |
| } |
| } |
| }) |
| } |
| |
| func batch(rows []*Row, batchSize int) [][]*Row { |
| rowSetsLen := int(math.Ceil(float64(len(rows) / batchSize))) |
| rowSets := make([][]*Row, 0, rowSetsLen) |
| for len(rows) > 0 { |
| batch := rows |
| if len(batch) > batchSize { |
| batch = batch[:batchSize] |
| } |
| rowSets = append(rowSets, batch) |
| rows = rows[len(batch):] |
| } |
| return rowSets |
| } |