Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groot/internal/rcompress: reuse buffers when possible #1023

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 113 additions & 9 deletions groot/internal/rcompress/rcompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"errors"
"fmt"
"io"
"sync"

"github.com/klauspost/compress/flate"
"github.com/klauspost/compress/zlib"
Expand Down Expand Up @@ -206,17 +207,17 @@
hdr[0] = 'Z'
hdr[1] = 'L'
hdr[2] = 8 // zlib deflated
w, err := zlib.NewWriterLevel(buf, lvl)
w, err := zlibGetWriterLevel(buf, lvl)
if err != nil {
return 0, fmt.Errorf("rcompress: could not create ZLIB compressor: %w", err)
}

_, err = w.Write(src)
if err != nil {
_ = w.Close()
_ = zlibPutWriterLevel(w, lvl)
return 0, fmt.Errorf("rcompress: could not write ZLIB compressed bytes: %w", err)
}
err = w.Close()
err = zlibPutWriterLevel(w, lvl)
switch {
case err == nil:
// ok.
Expand Down Expand Up @@ -268,7 +269,7 @@

const chksum = 8
var room = int(float64(srcsz) * 2e-4) // lz4 needs some extra scratch space
dst := make([]byte, HeaderSize+chksum+len(src)+room)
dst := lz4GetBuffer(HeaderSize + chksum + len(src) + room)
wrk := dst[HeaderSize:]
var n int
switch {
Expand All @@ -283,18 +284,21 @@
n, err = lz4.CompressBlock(src, wrk[chksum:], ht)
}
if err != nil {
lz4PutBuffer(dst)
return 0, fmt.Errorf("rcompress: could not compress with LZ4: %w", err)
}

if n == 0 {
// not compressible.
lz4PutBuffer(dst)
return len(src), errNoCompression
}

wrk = wrk[:n+chksum]
binary.BigEndian.PutUint64(wrk[:chksum], xxHash64.Checksum(wrk[chksum:], 0))
dstsz = int32(n + chksum)
n = copy(buf.p, wrk)
lz4PutBuffer(dst)
buf.c += n

case ZSTD:
Expand Down Expand Up @@ -371,21 +375,21 @@
lr := &io.LimitedReader{R: src, N: srcsz}
switch kindOf(hdr) {
case ZLIB:
rc, err := zlib.NewReader(lr)
rc, err := zlibGetReader(lr)
if err != nil {
return fmt.Errorf("rcompress: could not create ZLIB reader: %w", err)
}
defer rc.Close()

_, err = io.ReadFull(rc, dst[beg:end])
zlibPutReader(rc)
if err != nil {
return fmt.Errorf("rcompress: could not decompress ZLIB buffer: %w", err)
}

case LZ4:
src := make([]byte, srcsz)
src := lz4GetBuffer(int(srcsz))
_, err = io.ReadFull(lr, src)
if err != nil {
lz4PutBuffer(src)
return fmt.Errorf("rcompress: could not read LZ4 block: %w", err)
}
const chksum = 8
Expand All @@ -396,7 +400,9 @@
case srcsz > tgtsz:
// no compression
copy(dst[beg:end], src[chksum:])
lz4PutBuffer(src)
default:
lz4PutBuffer(src)
return fmt.Errorf("rcompress: could not decompress LZ4 block: %w", err)
}
}
Expand All @@ -419,11 +425,12 @@
}

case ZSTD:
rc, err := zstd.NewReader(lr)
rc, err := zstdGetReader(lr)
if err != nil {
return fmt.Errorf("rcompress: could not create ZSTD reader: %w", err)
}
_, err = io.ReadFull(rc, dst[beg:end])
zstdPutReader(rc)
if err != nil {
return fmt.Errorf("rcompress: could not decompress ZSTD block: %w", err)
}
Expand Down Expand Up @@ -457,3 +464,100 @@
var (
_ io.Writer = (*wbuff)(nil)
)

var (
lz4BufferPool sync.Pool
zlibReaderPool sync.Pool
zstdReaderPool sync.Pool
zlibWriterPools sync.Map // map[lvl]*pool
zstdWriterPools sync.Map // map[lvl]*pool

Check failure on line 473 in groot/internal/rcompress/rcompress.go

View workflow job for this annotation

GitHub Actions / Build (1.22.x, macos-latest)

var zstdWriterPools is unused (U1000)
)

func lz4GetBuffer(size int) []byte {
var b []byte
if bi := lz4BufferPool.Get(); bi != nil {
b = bi.([]byte)
}
if cap(b) >= size {
return b[:size]
}
return make([]byte, size)
}

func lz4PutBuffer(b []byte) {
lz4BufferPool.Put(b)

Check failure on line 488 in groot/internal/rcompress/rcompress.go

View workflow job for this annotation

GitHub Actions / Build (1.22.x, macos-latest)

argument should be pointer-like to avoid allocations (SA6002)
}

func zlibGetReader(r io.Reader) (io.ReadCloser, error) {
if ri := zlibReaderPool.Get(); ri != nil {
ri.(zlib.Resetter).Reset(r, nil)
return ri.(io.ReadCloser), nil
}
return zlib.NewReader(r)
}

func zlibPutReader(r io.ReadCloser) error {
// Note that zlib readers should be closed (but not reset)
err := r.Close()
zlibReaderPool.Put(r)
return err
}

func zstdGetReader(r io.Reader) (*zstd.Decoder, error) {
if ri := zstdReaderPool.Get(); ri != nil {
rd := ri.(*zstd.Decoder)
rd.Reset(r)
return rd, nil
}
return zstd.NewReader(r)
}

func zstdPutReader(r *zstd.Decoder) {
// Note that zstd decoders should be reset (but not closed)
r.Reset(nil)
zstdReaderPool.Put(r)
}

func zlibGetWriterLevel(w io.Writer, lvl int) (*zlib.Writer, error) {
if pi, ok := zlibWriterPools.Load(lvl); ok {
if wi := pi.(*sync.Pool).Get(); wi != nil {
z := wi.(*zlib.Writer)
z.Reset(w)
return z, nil
}
}
return zlib.NewWriterLevel(w, lvl)
}

func zlibPutWriterLevel(w *zlib.Writer, lvl int) error {
err := w.Close()
if pi, ok := zlibWriterPools.Load(lvl); ok {
pi.(*sync.Pool).Put(w)
} else {
pi, _ = zlibWriterPools.LoadOrStore(lvl, new(sync.Pool))
pi.(*sync.Pool).Put(w)
}
return err
}

func zstdGetWriterLevel(w io.Writer, lvl int) (*zstd.Encoder, error) {

Check failure on line 543 in groot/internal/rcompress/rcompress.go

View workflow job for this annotation

GitHub Actions / Build (1.22.x, macos-latest)

func zstdGetWriterLevel is unused (U1000)
if pi, ok := zstdWriterPools.Load(lvl); ok {
if wi := pi.(*sync.Pool).Get(); wi != nil {
z := wi.(*zstd.Encoder)
z.Reset(w)
return z, nil
}
}
return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevel(lvl)))
}

func zstdPutWriterLevel(w *zstd.Encoder, lvl int) error {

Check failure on line 554 in groot/internal/rcompress/rcompress.go

View workflow job for this annotation

GitHub Actions / Build (1.22.x, macos-latest)

func zstdPutWriterLevel is unused (U1000)
err := w.Close()
if pi, ok := zstdWriterPools.Load(lvl); ok {
pi.(*sync.Pool).Put(w)
} else {
pi, _ = zstdWriterPools.LoadOrStore(lvl, new(sync.Pool))
pi.(*sync.Pool).Put(w)
}
return err
}
Loading