From fb07627c051fcebb652e5b864ffa3243283df543 Mon Sep 17 00:00:00 2001 From: Blake Burghgrave Date: Sat, 16 Nov 2024 11:17:07 -0600 Subject: [PATCH 1/2] first attempt at reusing objects in rcompress with sync.Pool, readers only for now --- groot/internal/rcompress/rcompress.go | 51 ++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/groot/internal/rcompress/rcompress.go b/groot/internal/rcompress/rcompress.go index 22f5c798..060ef930 100644 --- a/groot/internal/rcompress/rcompress.go +++ b/groot/internal/rcompress/rcompress.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "io" + "sync" "github.com/klauspost/compress/flate" "github.com/klauspost/compress/zlib" @@ -371,21 +372,22 @@ func Decompress(dst []byte, src io.Reader) error { lr := &io.LimitedReader{R: src, N: srcsz} switch kindOf(hdr) { case ZLIB: - rc, err := zlib.NewReader(lr) + rc, err := zlibNewReader(lr) if err != nil { return fmt.Errorf("rcompress: could not create ZLIB reader: %w", err) } - defer rc.Close() - _, err = io.ReadFull(rc, dst[beg:end]) + rc.Close() + zlibReaderPool.Put(rc) if err != nil { return fmt.Errorf("rcompress: could not decompress ZLIB buffer: %w", err) } case LZ4: - src := make([]byte, srcsz) + src := lz4NewBuffer(srcsz) _, err = io.ReadFull(lr, src) if err != nil { + lz4BufferPool.Put(src) return fmt.Errorf("rcompress: could not read LZ4 block: %w", err) } const chksum = 8 @@ -396,7 +398,9 @@ func Decompress(dst []byte, src io.Reader) error { case srcsz > tgtsz: // no compression copy(dst[beg:end], src[chksum:]) + lz4BufferPool.Put(src) default: + lz4BufferPool.Put(src) return fmt.Errorf("rcompress: could not decompress LZ4 block: %w", err) } } @@ -419,11 +423,13 @@ func Decompress(dst []byte, src io.Reader) error { } case ZSTD: - rc, err := zstd.NewReader(lr) + rc, err := zstdNewReader(lr) if err != nil { return fmt.Errorf("rcompress: could not create ZSTD reader: %w", err) } _, err = io.ReadFull(rc, dst[beg:end]) + rc.Reset(nil) + zstdReaderPool.Put(rc) if err != nil { return fmt.Errorf("rcompress: could not decompress ZSTD block: %w", err) } @@ -457,3 +463,38 @@ func (w *wbuff) Write(p []byte) (int, error) { var ( _ io.Writer = (*wbuff)(nil) ) + +// TODO writers, need to index by options (e.g. compression level) +var ( + lz4BufferPool = sync.Pool{} + zlibReaderPool = sync.Pool{} + zstdReaderPool = sync.Pool{} +) + +func lz4NewBuffer(size int64) []byte { + var b []byte + if bi := lz4BufferPool.Get(); bi != nil { + b = bi.([]byte) + } + if int64(cap(b)) >= size { + return b[:size] + } + return make([]byte, size) +} + +func zlibNewReader(r io.Reader) (io.ReadCloser, error) { + if ri := zlibReaderPool.Get(); ri != nil { + ri.(zlib.Resetter).Reset(r, nil) + return ri.(io.ReadCloser), nil + } + return zlib.NewReader(r) +} + +func zstdNewReader(r io.Reader) (*zstd.Decoder, error) { + if ri := zstdReaderPool.Get(); ri != nil { + rd := ri.(*zstd.Decoder) + rd.Reset(r) + return rd, nil + } + return zstd.NewReader(r) +} From eb21700f2e818ffca515adf750939717471c7159 Mon Sep 17 00:00:00 2001 From: Blake Burghgrave Date: Sat, 16 Nov 2024 14:31:02 -0600 Subject: [PATCH 2/2] pool writers (indexed by compression level, since that can't be reset) --- groot/internal/rcompress/rcompress.go | 107 ++++++++++++++++++++------ 1 file changed, 85 insertions(+), 22 deletions(-) diff --git a/groot/internal/rcompress/rcompress.go b/groot/internal/rcompress/rcompress.go index 060ef930..04727fbd 100644 --- a/groot/internal/rcompress/rcompress.go +++ b/groot/internal/rcompress/rcompress.go @@ -207,17 +207,17 @@ func compressBlock(alg Kind, lvl int, tgt, src []byte) (int, error) { hdr[0] = 'Z' hdr[1] = 'L' hdr[2] = 8 // zlib deflated - w, err := zlib.NewWriterLevel(buf, lvl) + w, err := zlibGetWriterLevel(buf, lvl) if err != nil { return 0, fmt.Errorf("rcompress: could not create ZLIB compressor: %w", err) } _, err = w.Write(src) if err != nil { - _ = w.Close() + _ = zlibPutWriterLevel(w, lvl) return 0, fmt.Errorf("rcompress: could not write ZLIB compressed bytes: %w", err) } - err = w.Close() + err = zlibPutWriterLevel(w, lvl) switch { case err == nil: // ok. @@ -269,7 +269,7 @@ func compressBlock(alg Kind, lvl int, tgt, src []byte) (int, error) { const chksum = 8 var room = int(float64(srcsz) * 2e-4) // lz4 needs some extra scratch space - dst := make([]byte, HeaderSize+chksum+len(src)+room) + dst := lz4GetBuffer(HeaderSize + chksum + len(src) + room) wrk := dst[HeaderSize:] var n int switch { @@ -284,11 +284,13 @@ func compressBlock(alg Kind, lvl int, tgt, src []byte) (int, error) { n, err = lz4.CompressBlock(src, wrk[chksum:], ht) } if err != nil { + lz4PutBuffer(dst) return 0, fmt.Errorf("rcompress: could not compress with LZ4: %w", err) } if n == 0 { // not compressible. + lz4PutBuffer(dst) return len(src), errNoCompression } @@ -296,6 +298,7 @@ func compressBlock(alg Kind, lvl int, tgt, src []byte) (int, error) { binary.BigEndian.PutUint64(wrk[:chksum], xxHash64.Checksum(wrk[chksum:], 0)) dstsz = int32(n + chksum) n = copy(buf.p, wrk) + lz4PutBuffer(dst) buf.c += n case ZSTD: @@ -372,22 +375,21 @@ func Decompress(dst []byte, src io.Reader) error { lr := &io.LimitedReader{R: src, N: srcsz} switch kindOf(hdr) { case ZLIB: - rc, err := zlibNewReader(lr) + rc, err := zlibGetReader(lr) if err != nil { return fmt.Errorf("rcompress: could not create ZLIB reader: %w", err) } _, err = io.ReadFull(rc, dst[beg:end]) - rc.Close() - zlibReaderPool.Put(rc) + zlibPutReader(rc) if err != nil { return fmt.Errorf("rcompress: could not decompress ZLIB buffer: %w", err) } case LZ4: - src := lz4NewBuffer(srcsz) + src := lz4GetBuffer(int(srcsz)) _, err = io.ReadFull(lr, src) if err != nil { - lz4BufferPool.Put(src) + lz4PutBuffer(src) return fmt.Errorf("rcompress: could not read LZ4 block: %w", err) } const chksum = 8 @@ -398,9 +400,9 @@ func Decompress(dst []byte, src io.Reader) error { case srcsz > tgtsz: // no compression copy(dst[beg:end], src[chksum:]) - lz4BufferPool.Put(src) + lz4PutBuffer(src) default: - lz4BufferPool.Put(src) + lz4PutBuffer(src) return fmt.Errorf("rcompress: could not decompress LZ4 block: %w", err) } } @@ -423,13 +425,12 @@ func Decompress(dst []byte, src io.Reader) error { } case ZSTD: - rc, err := zstdNewReader(lr) + rc, err := zstdGetReader(lr) if err != nil { return fmt.Errorf("rcompress: could not create ZSTD reader: %w", err) } _, err = io.ReadFull(rc, dst[beg:end]) - rc.Reset(nil) - zstdReaderPool.Put(rc) + zstdPutReader(rc) if err != nil { return fmt.Errorf("rcompress: could not decompress ZSTD block: %w", err) } @@ -464,25 +465,30 @@ var ( _ io.Writer = (*wbuff)(nil) ) -// TODO writers, need to index by options (e.g. compression level) var ( - lz4BufferPool = sync.Pool{} - zlibReaderPool = sync.Pool{} - zstdReaderPool = sync.Pool{} + lz4BufferPool sync.Pool + zlibReaderPool sync.Pool + zstdReaderPool sync.Pool + zlibWriterPools sync.Map // map[lvl]*pool + zstdWriterPools sync.Map // map[lvl]*pool ) -func lz4NewBuffer(size int64) []byte { +func lz4GetBuffer(size int) []byte { var b []byte if bi := lz4BufferPool.Get(); bi != nil { b = bi.([]byte) } - if int64(cap(b)) >= size { + if cap(b) >= size { return b[:size] } return make([]byte, size) } -func zlibNewReader(r io.Reader) (io.ReadCloser, error) { +func lz4PutBuffer(b []byte) { + lz4BufferPool.Put(b) +} + +func zlibGetReader(r io.Reader) (io.ReadCloser, error) { if ri := zlibReaderPool.Get(); ri != nil { ri.(zlib.Resetter).Reset(r, nil) return ri.(io.ReadCloser), nil @@ -490,7 +496,14 @@ func zlibNewReader(r io.Reader) (io.ReadCloser, error) { return zlib.NewReader(r) } -func zstdNewReader(r io.Reader) (*zstd.Decoder, error) { +func zlibPutReader(r io.ReadCloser) error { + // Note that zlib readers should be closed (but not reset) + err := r.Close() + zlibReaderPool.Put(r) + return err +} + +func zstdGetReader(r io.Reader) (*zstd.Decoder, error) { if ri := zstdReaderPool.Get(); ri != nil { rd := ri.(*zstd.Decoder) rd.Reset(r) @@ -498,3 +511,53 @@ func zstdNewReader(r io.Reader) (*zstd.Decoder, error) { } return zstd.NewReader(r) } + +func zstdPutReader(r *zstd.Decoder) { + // Note that zstd decoders should be reset (but not closed) + r.Reset(nil) + zstdReaderPool.Put(r) +} + +func zlibGetWriterLevel(w io.Writer, lvl int) (*zlib.Writer, error) { + if pi, ok := zlibWriterPools.Load(lvl); ok { + if wi := pi.(*sync.Pool).Get(); wi != nil { + z := wi.(*zlib.Writer) + z.Reset(w) + return z, nil + } + } + return zlib.NewWriterLevel(w, lvl) +} + +func zlibPutWriterLevel(w *zlib.Writer, lvl int) error { + err := w.Close() + if pi, ok := zlibWriterPools.Load(lvl); ok { + pi.(*sync.Pool).Put(w) + } else { + pi, _ = zlibWriterPools.LoadOrStore(lvl, new(sync.Pool)) + pi.(*sync.Pool).Put(w) + } + return err +} + +func zstdGetWriterLevel(w io.Writer, lvl int) (*zstd.Encoder, error) { + if pi, ok := zstdWriterPools.Load(lvl); ok { + if wi := pi.(*sync.Pool).Get(); wi != nil { + z := wi.(*zstd.Encoder) + z.Reset(w) + return z, nil + } + } + return zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.EncoderLevel(lvl))) +} + +func zstdPutWriterLevel(w *zstd.Encoder, lvl int) error { + err := w.Close() + if pi, ok := zstdWriterPools.Load(lvl); ok { + pi.(*sync.Pool).Put(w) + } else { + pi, _ = zstdWriterPools.LoadOrStore(lvl, new(sync.Pool)) + pi.(*sync.Pool).Put(w) + } + return err +}