blob: 660e0a7d93e7c89a9123fae07fef1118a442a32a [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsubprotocol
import (
"bytes"
"compress/zlib"
"errors"
"fmt"
"io"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/common/data/recordio"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/types"
)
const (
// DefaultCompressThreshold is the byte size threshold for compressing message
// data. Messages whose byte count is less than or equal to this threshold
// will not be compressed.
//
// This is the value used by Akamai for its compression threshold:
// "The reasons 860 bytes is the minimum size for compression is twofold:
// (1) The overhead of compressing an object under 860 bytes outweighs
// performance gain. (2) Objects under 860 bytes can be transmitted via a
// single packet anyway, so there isn't a compelling reason to compress them."
DefaultCompressThreshold = 860
)
// protoBase is the base type of protocol reader/writer objects.
type protoBase struct {
// maxSize is the maximum Butler protocol data size. By default, it is
// types.MaxButlerLogBundleSize. However, it can be overridden for testing
// here.
maxSize int64
}
func (p *protoBase) getMaxSize() int64 {
if p.maxSize == 0 {
return types.MaxButlerLogBundleSize
}
return p.maxSize
}
// Reader is a protocol reader instance.
type Reader struct {
protoBase
// Metadata is the unpacked ButlerMetadata. It is populated when the
// metadata has been read.
Metadata *logpb.ButlerMetadata
// Bundle is the unpacked ButlerLogBundle. It is populated when the
// protocol data has been read and the Metadata indicates a ButlerLogBundle
// type.
Bundle *logpb.ButlerLogBundle
}
// ReadMetadata reads the metadata header frame.
func (r *Reader) readMetadata(fr recordio.Reader) error {
data, err := fr.ReadFrameAll()
if err != nil {
return err
}
md := logpb.ButlerMetadata{}
if err := proto.Unmarshal(data, &md); err != nil {
return fmt.Errorf("butlerproto: failed to unmarshal Metadata frame: %s", err)
}
r.Metadata = &md
return nil
}
func (r *Reader) readData(fr recordio.Reader) ([]byte, error) {
var br io.Reader
size, br, err := fr.ReadFrame()
if err != nil {
return nil, fmt.Errorf("failed to read bundle frame: %s", err)
}
// Read the frame through a zlib reader.
switch r.Metadata.Compression {
case logpb.ButlerMetadata_NONE:
break
case logpb.ButlerMetadata_ZLIB:
br, err = zlib.NewReader(br)
if err != nil {
return nil, fmt.Errorf("failed to initialize zlib reader: %s", err)
}
default:
return nil, fmt.Errorf("unknown compression type: %v", r.Metadata.Compression)
}
// Wrap our reader in a limitErrorReader so we don't pull data beyond our
// soft maximum.
br = &limitErrorReader{
Reader: br,
limit: r.getMaxSize(),
}
buf := bytes.Buffer{}
buf.Grow(int(size))
_, err = buf.ReadFrom(br)
if err != nil {
return nil, fmt.Errorf("butlerproto: failed to buffer bundle frame: %s", err)
}
return buf.Bytes(), nil
}
func (r *Reader) Read(ir io.Reader) error {
fr := recordio.NewReader(ir, r.getMaxSize())
// Ensure that we have our Metadata.
if err := r.readMetadata(fr); err != nil {
return err
}
switch r.Metadata.Type {
case logpb.ButlerMetadata_ButlerLogBundle:
data, err := r.readData(fr)
if err != nil {
return fmt.Errorf("butlerproto: failed to read Bundle data: %s", err)
}
if r.Metadata.ProtoVersion == logpb.Version {
bundle := logpb.ButlerLogBundle{}
if err := proto.Unmarshal(data, &bundle); err != nil {
return fmt.Errorf("butlerproto: failed to unmarshal Bundle frame: %s", err)
}
r.Bundle = &bundle
}
return nil
default:
return fmt.Errorf("butlerproto: unknown data type: %s", r.Metadata.Type)
}
}
// limitErrorReader is similar to io.LimitReader, except that it returns
// a custom error instead of io.EOF.
//
// This is important, as it allows us to distinguish between the end of
// the compressed reader's data and a limit being hit.
type limitErrorReader struct {
io.Reader // underlying reader
limit int64 // max bytes remaining
}
func (r *limitErrorReader) Read(p []byte) (int, error) {
if r.limit <= 0 {
return 0, errors.New("limit exceeded")
}
if int64(len(p)) > r.limit {
p = p[0:r.limit]
}
n, err := r.Reader.Read(p)
r.limit -= int64(n)
return n, err
}
// Writer writes Butler messages that the Reader can read.
type Writer struct {
protoBase
// ProtoVersion is the protocol version string to use. If empty, the current
// ProtoVersion will be used.
ProtoVersion string
// Compress, if true, allows the Writer to choose to compress data when
// applicable.
Compress bool
// CompressThreshold is the minimum size that data must be in order to
CompressThreshold int
compressBuf bytes.Buffer
compressWriter *zlib.Writer
}
func (w *Writer) writeData(fw recordio.Writer, t logpb.ButlerMetadata_ContentType, data []byte) error {
if int64(len(data)) > w.getMaxSize() {
return fmt.Errorf("butlerproto: serialized size exceeds soft cap (%d > %d)", len(data), w.getMaxSize())
}
pv := w.ProtoVersion
if pv == "" {
pv = logpb.Version
}
md := logpb.ButlerMetadata{
Type: t,
ProtoVersion: pv,
}
// If we're configured to compress and the data is below our threshold,
// compress.
if w.Compress && len(data) >= w.CompressThreshold {
w.compressBuf.Reset()
if w.compressWriter == nil {
w.compressWriter = zlib.NewWriter(&w.compressBuf)
} else {
w.compressWriter.Reset(&w.compressBuf)
}
if _, err := w.compressWriter.Write(data); err != nil {
return err
}
if err := w.compressWriter.Close(); err != nil {
return err
}
compressed := true
if compressed {
md.Compression = logpb.ButlerMetadata_ZLIB
}
data = w.compressBuf.Bytes()
}
// Write metadata frame.
mdData, err := proto.Marshal(&md)
if err != nil {
return fmt.Errorf("butlerproto: failed to marshal Metadata: %s", err)
}
_, err = fw.Write(mdData)
if err != nil {
return fmt.Errorf("butlerproto: failed to write Metadata frame: %s", err)
}
if err := fw.Flush(); err != nil {
return fmt.Errorf("butlerproto: failed to flush Metadata frame: %s", err)
}
// Write data frame.
_, err = fw.Write(data)
if err != nil {
return fmt.Errorf("butlerproto: failed to write data frame: %s", err)
}
if err := fw.Flush(); err != nil {
return fmt.Errorf("butlerproto: failed to flush data frame: %s", err)
}
return nil
}
// WriteWith writes a ButlerLogBundle to the supplied Writer.
func (w *Writer) Write(iw io.Writer, b *logpb.ButlerLogBundle) error {
return w.WriteWith(recordio.NewWriter(iw), b)
}
// WriteWith writes a ButlerLogBundle to the supplied recordio.Writer.
func (w *Writer) WriteWith(fw recordio.Writer, b *logpb.ButlerLogBundle) error {
data, err := proto.Marshal(b)
if err != nil {
entries := b.GetEntries()
// TODO(tandrii, hinoka): leave just error after crbug.com/859995 is fixed.
if len(entries) > 100 {
return fmt.Errorf("butlerproto: failed to marshal Bundle of len %d with first 100 entries %s: %s",
len(entries), entries[:100], err)
}
return fmt.Errorf("butlerproto: failed to marshal Bundle %s: %s", entries, err)
}
return w.writeData(fw, logpb.ButlerMetadata_ButlerLogBundle, data)
}