blob: 7c4714bed647ab94661de00d6926b62f99b42bec [file] [log] [blame]
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package iotools
import (
"container/list"
"fmt"
"io"
"sync"
)
// block is a contiguous chunk of data from the original file.
type block struct {
offset int64
data []byte
last bool
}
// blocksLRU is LRU of last read blocks, keyed by their offset in the file.
type blocksLRU struct {
capacity int // how many blocks we want to cache
evicted func(block) // called for each evicted block
blocks map[int64]*list.Element
ll list.List // each element's Value is block{}
}
// init initializes LRU guts.
func (l *blocksLRU) init(capacity int) {
l.capacity = capacity
l.blocks = make(map[int64]*list.Element, capacity)
l.ll.Init()
}
// get returns (block, true) if it's in the cache or (block{}, false) otherwise.
func (l *blocksLRU) get(offset int64) (b block, ok bool) {
elem, ok := l.blocks[offset]
if !ok {
return block{}, false
}
l.ll.MoveToFront(elem)
return elem.Value.(block), true
}
// prepareForAdd removes oldest item from the list if it is at the capacity.
//
// Does nothing if it's not yet full.
func (l *blocksLRU) prepareForAdd() {
switch {
case l.ll.Len() > l.capacity:
panic("impossible")
case l.ll.Len() == l.capacity:
oldest := l.ll.Remove(l.ll.Back()).(block)
delete(l.blocks, oldest.offset)
l.evicted(oldest)
}
}
// add adds a block to the cache (perhaps evicting the oldest block).
//
// The caller must verify there's no such block in the cache already using
// get(). Panics if it wasn't done.
func (l *blocksLRU) add(b block) {
if _, ok := l.blocks[b.offset]; ok {
panic(fmt.Sprintf("block with offset %d is already in the LRU", b.offset))
}
l.prepareForAdd()
l.blocks[b.offset] = l.ll.PushFront(b)
}
type bufferingReaderAt struct {
l sync.Mutex
r io.ReaderAt
blockSize int
lru blocksLRU
// Available buffers of blockSize length. Note that docs warn that sync.Pool
// is too heavy for this case and it's better to roll our own mini pool.
pool [][]byte
}
// NewBufferingReaderAt returns an io.ReaderAt that reads data in blocks of
// configurable size and keeps LRU of recently read blocks.
//
// It is great for cases when data is read sequentially from an io.ReaderAt,
// (e.g. when extracting files using zip.Reader), since by setting large block
// size we can effectively do lookahead reads.
//
// For example, zip.Reader reads data in 4096 byte chunks by default. By setting
// block size to 512Kb and LRU size to 1 we reduce the number of read operations
// significantly (128x), in exchange for the modest amount of RAM.
//
// The reader is safe to user concurrently (just like any ReaderAt), but beware
// that the LRU is shared and all reads from the underlying reader happen under
// the lock, so multiple goroutines may end up slowing down each other.
func NewBufferingReaderAt(r io.ReaderAt, blockSize int, lruSize int) io.ReaderAt {
if blockSize < 1 {
panic(fmt.Sprintf("block size should be >= 1, not %d", blockSize))
}
if lruSize < 1 {
panic(fmt.Sprintf("lru size should be >= 1, not %d", lruSize))
}
reader := &bufferingReaderAt{
r: r,
blockSize: blockSize,
// We actually pool at most 1 buffer, since buffers are grabbed from the
// pool immediately after they are evicted from LRU.
pool: make([][]byte, 0, 1),
}
reader.lru.init(lruSize)
reader.lru.evicted = func(b block) { reader.recycleBuf(b.data) }
return reader
}
// grabBuf returns a byte slice of blockSize size.
func (r *bufferingReaderAt) grabBuf() []byte {
if len(r.pool) != 0 {
b := r.pool[len(r.pool)-1]
r.pool = r.pool[:len(r.pool)-1]
return b
}
return make([]byte, r.blockSize)
}
// recycleBuf is called when the buffer is no longer needed to put it for reuse.
func (r *bufferingReaderAt) recycleBuf(b []byte) {
if cap(b) != r.blockSize {
panic("trying to return a buffer not initially requested via grabBuf")
}
if len(r.pool)+1 > cap(r.pool) {
panic("unexpected growth of byte buffer pool beyond capacity")
}
r.pool = append(r.pool, b[:cap(b)])
}
// readBlock returns the block of the file (of blockSize size) at an offset.
//
// Assumes the caller does not retain the returned buffer (just reads from it
// and forgets it right away, all under the lock).
//
// Returns one of:
// (full block, nil) on success
// (partial or full block, io.EOF) when reading the final block
// (partial block, err) on read errors
func (r *bufferingReaderAt) readBlock(offset int64) (data []byte, err error) {
// Have it cached already?
if b, ok := r.lru.get(offset); ok {
data = b.data
if b.last {
err = io.EOF
}
return
}
// Kick out the oldest block (if any) to move its buffer to the free buffers
// pool and then immediately grab this buffer.
r.lru.prepareForAdd()
data = r.grabBuf()
// Read the block from the underlying reader.
read, err := r.r.ReadAt(data, offset)
data = data[:read]
// ReadAt promises that it returns nil only if it read the full block. We rely
// on this later, so double check.
if err == nil && read != r.blockSize {
panic(fmt.Sprintf("broken ReaderAt: should have read %d bytes, but read only %d", r.blockSize, read))
}
// Cache fully read blocks and the partially read last block, but skip blocks
// that were read partially due to unexpected errors.
if err == nil || err == io.EOF {
r.lru.add(block{
offset: offset,
data: data,
last: err == io.EOF,
})
} else {
// Caller promises not to retain 'data', so we can return it right away.
r.recycleBuf(data)
}
return data, err
}
// ReadAt implements io.ReaderAt interface.
func (r *bufferingReaderAt) ReadAt(p []byte, offset int64) (read int, err error) {
if len(p) == 0 {
return r.r.ReadAt(p, offset)
}
r.l.Lock()
defer r.l.Unlock()
bs := int64(r.blockSize)
blockOff := int64((offset / bs) * bs) // block-aligned offset
// Sequentially read blocks that intersect with the requested segment.
for {
// err here may be EOF or some other error. We consume all data first and
// deal with errors later.
data, err := r.readBlock(blockOff)
// The first block may be read from the middle, since 'min' may be less than
// 'offset'.
if offset > blockOff {
pos := offset - blockOff // position inside the block to read from
if pos < int64(len(data)) {
data = data[pos:] // grab the tail of the block
} else {
data = nil // we probably hit EOF before the requested offset
}
}
// 'copy' copies min of len(data) and whatever space is left in 'p', so this
// is always safe. The last block may be copied partially (if there's no
// space left in 'p').
read += copy(p[read:], data)
switch {
case read == len(p):
// We managed to read everything we wanted, ignore the last error, if any.
return read, nil
case err != nil:
return read, err
}
// The last read was successful. Per ReaderAt contract (that we double
// checked in readBlock) it means it read ALL requested data (and we request
// 'bs' bytes). So move on to the next block.
blockOff += bs
}
}