blob: 1b665a92ff8df1c2ac1d1e69cb511c2396b18602 [file] [log] [blame]
// Copyright 2020 Mostyn Bramley-Moore.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package syncpool provides a non-leaky sync.Pool for
// github.com/klauspost/compress/zstd's Encoder and Decoder types,
// using wrappers (EncoderWrapper and DecoderWrapper).
package syncpool
import (
"io"
"runtime"
"sync"
"github.com/klauspost/compress/zstd"
)
// DecoderWrapper is a wrapper that embeds a *zstd.Decoder, and is safe for
// use in a sync.Pool.
type DecoderWrapper struct {
// *zstd.Decoder is not safe for use in a sync.Pool directly, since it
// leaks data and goroutines. Finalizers on the *zstd.Decoder don't help
// because the aforementioned goroutines reference the *zstd.Decoder and
// prevent it from being garbage collected (so the finalizers don't run).
//
// We can work around this by storing this wrapper with an embedded
// *zstd.Decoder in the sync.Pool, and using a finalizer on the wrapper
// to Close the *zstd.Decoder.
*zstd.Decoder
// Instead of Closing a *zstd.Decoder, we can Reset it and return it
// to this pool.
pool *sync.Pool
}
// Close does not close the embedded *zstd.Decoder (once closed, they cannot
// be reused), but instead resets it and places this *DecoderWrapper back in
// the pool.
func (w *DecoderWrapper) Close() {
err := w.Decoder.Reset(nil)
if err == nil {
w.pool.Put(w)
}
}
// IOReadCloser returns an io.ReadCloser that will return this *DecoderWrapper
// to the pool when it is closed.
func (w *DecoderWrapper) IOReadCloser() io.ReadCloser {
return &decoderReadCloser{
dw: w,
Reader: w.Decoder.IOReadCloser(),
}
}
type decoderReadCloser struct {
dw *DecoderWrapper
io.Reader
}
// Close does not close the underlying *zstd.Decoder, but instead returns
// it to the pool it came from.
func (w *decoderReadCloser) Close() error {
w.dw.Close() // Returns the DecoderWrapper to the pool.
return nil
}
// NewDecoderPool returns a sync.Pool that provides DecoderWrapper
// objects, which embed *zstd.Decoders. You probably want to include
// zstd.WithDecoderConcurrency(1) in the list of options.
func NewDecoderPool(options ...zstd.DOption) *sync.Pool {
p := &sync.Pool{}
p.New = func() interface{} {
d, _ := zstd.NewReader(nil, options...)
dw := &DecoderWrapper{
Decoder: d,
pool: p,
}
runtime.SetFinalizer(dw, func(dw *DecoderWrapper) {
dw.Decoder.Close()
})
return dw
}
return p
}
// DecoderPoolWrapper is a convenience wrapper for sync.Pool which only
// accepts and returns *DecoderWrapper's.
type DecoderPoolWrapper struct {
pool *sync.Pool
}
// Get returns a *DecoderWrapper that has been Reset to use r.
func (d *DecoderPoolWrapper) Get(r io.Reader) *DecoderWrapper {
dw := d.pool.Get().(*DecoderWrapper)
err := dw.Reset(r)
if err != nil {
// Decoder.Reset only returns a non-nil error if Close has been
// called. But DecoderWrapper.Close() intentionally doesn't call
// Decoder.Close(), so this can only happen if a *DecoderWrapper
// is type-asserted back to a *Decoder.
panic(err)
}
return dw
}
// Put returns a *DecoderWrapper to the pool.
func (d *DecoderPoolWrapper) Put(w *DecoderWrapper) {
err := w.Reset(nil)
if err == nil {
d.pool.Put(w)
}
}
// NewDecoderPoolWapper returns a *DecoderPoolWrapper that provides
// *DecoderWrapper objects that do not need to be type asserted.
// As with NewDecoderPool, you probably want to include
// zstd.WithDecoderConcurrency(1) in the list of options.
func NewDecoderPoolWrapper(options ...zstd.DOption) *DecoderPoolWrapper {
return &DecoderPoolWrapper{pool: NewDecoderPool(options...)}
}