blob: f9772409d6200e02fcd8c3a00ee47ce5adfe2b13 [file] [log] [blame]
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunkstream
import (
"bytes"
"errors"
"io"
)
// View is static read-only snapshot of the contents of the Buffer, presented
// as a contiguous stream of bytes.
//
// View implements the io.Reader and io.ByteReader interfaces. It also offers a
// series of utility functions optimized for the chunks.
type View struct {
// cur is the first node of the view.
cur *chunkNode
// cidx is the byte offset within cur of the current byte.
cidx int
// size is the size of thew view. Accesses beyond this size will fail.
size int64
// consumed is a count of the number of bytes in the view that have been
// consumed via Skip().
consumed int64
// b is the Buffer from which this View's snapshot was taken.
b *Buffer
}
var _ interface {
io.Reader
io.ByteReader
} = (*View)(nil)
func (r *View) Read(b []byte) (int, error) {
total := int64(0)
err := error(nil)
for len(b) > 0 {
chunk := r.chunkBytes()
if len(chunk) == 0 {
err = io.EOF
break
}
amount := copy(b, chunk)
total += int64(amount)
b = b[amount:]
r.Skip(int64(amount))
}
if r.Remaining() == 0 {
err = io.EOF
}
return int(total), err
}
// ReadByte implements io.ByteReader, reading a single byte from the buffer.
func (r *View) ReadByte() (byte, error) {
chunk := r.chunkBytes()
if len(chunk) == 0 {
return 0, io.EOF
}
r.Skip(1)
return chunk[0], nil
}
// Remaining returns the number of bytes remaining in the Reader view.
func (r *View) Remaining() int64 {
return r.size
}
// Consumed returns the number of bytes that have been skipped via Skip or
// higher-level calls.
func (r *View) Consumed() int64 {
return r.consumed
}
// Skip advances the View forwards a fixed number of bytes.
func (r *View) Skip(count int64) {
for count > 0 {
if r.cur == nil {
panic(errors.New("cannot skip past end buffer"))
}
amount := r.chunkRemaining()
if count < int64(amount) {
amount = int(count)
r.cidx += amount
} else {
// Finished consuming this chunk, move on to the next.
r.cur = r.cur.next
r.cidx = 0
}
count -= int64(amount)
r.consumed += int64(amount)
r.size -= int64(amount)
}
}
// Index scans the View for the specified needle bytes. If they are
// found, their index in the View is returned. Otherwise, Index returns
// -1.
//
// The View is not modified during the search.
func (r *View) Index(needle []byte) int64 {
if r.Remaining() == 0 {
return -1
}
if len(needle) == 0 {
return 0
}
rc := r.Clone()
if !rc.indexDestructive(needle) {
return -1
}
return rc.consumed - r.consumed
}
// indexDestructive implements Index by actively mutating the View.
//
// It returns true if the needle was found, and false if not. The view will be
// mutated regardless.
func (r *View) indexDestructive(needle []byte) bool {
tbuf := make([]byte, 2*len(needle))
idx := int64(0)
for {
data := r.chunkBytes()
if len(data) == 0 {
return false
}
// Scan the current chunk for needle. Note that if the current chunk is too
// small to hold needle, this is a no-op.
if idx = int64(bytes.Index(data, needle)); idx >= 0 {
r.Skip(idx)
return true
}
if len(data) > len(needle) {
// The needle is definitely not in this space.
r.Skip(int64(len(data) - len(needle)))
}
// needle isn't in the current chunk; however, it may begin at the end of
// the current chunk and complete in future chunks.
//
// We will scan a space twice the size of the needle, as otherwise, this
// would end up scanning for one possibility, incrementing by one, and
// repeating via 'for' loop iterations.
//
// Afterwards, we advance only the size of the needle, as we don't want to
// preclude the needle starting after our last scan range.
//
// For example, to find needle "NDL":
//
// AAAAND|L|AAAA
// |------|^- [NDLAAA], 0
//
// AAAAN|D|NDL|AAAA
// |------| [ANDNDL], 3
//
// AAAA|A|A|NDL
// |-------| [AAAAND], -1, consume 3 => A|NDL|
//
//
// Note that we perform the read with a cloned View so we don't
// actually consume this data.
pr := r.Clone()
amt, _ := pr.Read(tbuf)
if amt < len(needle) {
// All remaining buffers cannot hold the needle.
return false
}
if idx = int64(bytes.Index(tbuf[:amt], needle)); idx >= 0 {
r.Skip(idx)
return true
}
r.Skip(int64(len(needle)))
}
}
// Clone returns a copy of the View view.
//
// The clone is bound to the same underlying Buffer as the source.
func (r *View) Clone() *View {
return r.CloneLimit(r.size)
}
// CloneLimit returns a copy of the View view, optionally truncating it.
//
// The clone is bound to the same underlying Buffer as the source.
func (r *View) CloneLimit(limit int64) *View {
c := *r
if c.size > limit {
c.size = limit
}
return &c
}
func (r *View) chunkRemaining() int {
if r.cur == nil {
return 0
}
result := r.cur.length() - r.cidx
if int64(result) > r.size {
result = int(r.size)
}
return result
}
func (r *View) chunkBytes() []byte {
if r.cur == nil {
return nil
}
data := r.cur.Bytes()[r.cidx:]
remaining := r.Remaining()
if int64(len(data)) > remaining {
data = data[:remaining]
}
return data
}