From 3cd3857b046c3d26494c6e9a34dbcee707413648 Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Tue, 20 Aug 2024 17:19:02 -0700 Subject: [PATCH] refactor(chunker): reduce overall memory use (#649) --- CHANGELOG.md | 2 ++ chunker/benchmark_test.go | 36 +++++++++++++++++++++++ chunker/splitting.go | 48 +++++++++++++++++++++++------- chunker/splitting_test.go | 61 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 135 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25f652999..07527a74e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ The following emojis are used to highlight certain changes: ### Changed +- `chunker` refactored to reduce overall memory use by reducing heap fragmentation [#649](https://github.com/ipfs/boxo/pull/649) + ### Removed ### Fixed diff --git a/chunker/benchmark_test.go b/chunker/benchmark_test.go index 5069b0653..9374da45e 100644 --- a/chunker/benchmark_test.go +++ b/chunker/benchmark_test.go @@ -57,3 +57,39 @@ func benchmarkChunkerSize(b *testing.B, ns newSplitter, size int) { } Res = Res + res } + +func benchmarkFilesAlloc(b *testing.B, ns newSplitter) { + const ( + chunkSize = 4096 + minDataSize = 20000 + maxDataSize = 60000 + fileCount = 10000 + ) + rng := rand.New(rand.NewSource(1)) + data := make([]byte, maxDataSize) + rng.Read(data) + + b.SetBytes(maxDataSize) + b.ReportAllocs() + b.ResetTimer() + + var res uint64 + + for i := 0; i < b.N; i++ { + for j := 0; j < fileCount; j++ { + fileSize := rng.Intn(maxDataSize-minDataSize) + minDataSize + r := ns(bytes.NewReader(data[:fileSize])) + for { + chunk, err := r.NextBytes() + if err != nil { + if err == io.EOF { + break + } + b.Fatal(err) + } + res = res + uint64(len(chunk)) + } + } + } + Res = Res + res +} diff --git a/chunker/splitting.go b/chunker/splitting.go index 64306943b..6340c0f50 100644 --- a/chunker/splitting.go +++ b/chunker/splitting.go @@ -5,7 +5,9 @@ package chunk import ( + "errors" "io" + "math/bits" logging "github.com/ipfs/go-log/v2" pool "github.com/libp2p/go-buffer-pool" @@ -13,6 +15,10 @@ import ( var log = logging.Logger("chunk") +// maxOverAllocBytes is the maximum unused space a chunk can have without being +// reallocated to a smaller size to fit the data. +const maxOverAllocBytes = 1024 + // A Splitter reads bytes from a Reader and creates "chunks" (byte slices) // that can be used to build DAG nodes. type Splitter interface { @@ -81,19 +87,41 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { full := pool.Get(int(ss.size)) n, err := io.ReadFull(ss.r, full) - switch err { - case io.ErrUnexpectedEOF: - ss.err = io.EOF - small := make([]byte, n) - copy(small, full) - pool.Put(full) - return small, nil - case nil: - return full, nil - default: + if err != nil { + if errors.Is(err, io.ErrUnexpectedEOF) { + ss.err = io.EOF + return reallocChunk(full, n), nil + } pool.Put(full) return nil, err } + return full, nil +} + +func reallocChunk(full []byte, n int) []byte { + // Do not return an empty buffer. + if n == 0 { + pool.Put(full) + return nil + } + + // If chunk is close enough to fully used. + if cap(full)-n <= maxOverAllocBytes { + return full[:n] + } + + var small []byte + // If reallocating to the nearest power of two saves space without leaving + // too much unused space. + powTwoSize := 1 << bits.Len32(uint32(n-1)) + if powTwoSize-n <= maxOverAllocBytes { + small = make([]byte, n, powTwoSize) + } else { + small = make([]byte, n) + } + copy(small, full) + pool.Put(full) + return small } // Reader returns the io.Reader associated to this Splitter. diff --git a/chunker/splitting_test.go b/chunker/splitting_test.go index 23170ee37..d21faf512 100644 --- a/chunker/splitting_test.go +++ b/chunker/splitting_test.go @@ -2,6 +2,7 @@ package chunk import ( "bytes" + "errors" "io" "testing" @@ -33,7 +34,7 @@ func TestSizeSplitterOverAllocate(t *testing.T) { if err != nil { t.Fatal(err) } - if cap(chunk) > len(chunk) { + if cap(chunk)-len(chunk) > maxOverAllocBytes { t.Fatal("chunk capacity too large") } } @@ -89,7 +90,6 @@ func TestSizeSplitterFillsChunks(t *testing.T) { sofar := 0 whole := make([]byte, max) for chunk := range c { - bc := b[sofar : sofar+len(chunk)] if !bytes.Equal(bc, chunk) { t.Fatalf("chunk not correct: (sofar: %d) %d != %d, %v != %v", sofar, len(bc), len(chunk), bc[:100], chunk[:100]) @@ -127,3 +127,60 @@ func BenchmarkDefault(b *testing.B) { return DefaultSplitter(r) }) } + +// BenchmarkFilesAllocPool benchmarks splitter that uses go-buffer-pool, +// simulating use in unixfs with many small files. +func BenchmarkFilesAllocPool(b *testing.B) { + const fileBlockSize = 4096 + + benchmarkFilesAlloc(b, func(r io.Reader) Splitter { + return NewSizeSplitter(r, fileBlockSize) + }) +} + +// BenchmarkFilesAllocPool benchmarks splitter that does not use +// go-buffer-pool, simulating use in unixfs with many small files. +func BenchmarkFilesAllocNoPool(b *testing.B) { + const fileBlockSize = 4096 + + benchmarkFilesAlloc(b, func(r io.Reader) Splitter { + return &sizeSplitterNoPool{ + r: r, + size: uint32(fileBlockSize), + } + }) +} + +// sizeSplitterNoPool implements Splitter that allocates without pool. Provided +// for benchmarking against implementation with pool. +type sizeSplitterNoPool struct { + r io.Reader + size uint32 + err error +} + +func (ss *sizeSplitterNoPool) NextBytes() ([]byte, error) { + if ss.err != nil { + return nil, ss.err + } + + full := make([]byte, ss.size) + n, err := io.ReadFull(ss.r, full) + if err != nil { + if errors.Is(err, io.ErrUnexpectedEOF) { + ss.err = io.EOF + if n == 0 { + return nil, nil + } + small := make([]byte, n) + copy(small, full) + return small, nil + } + return nil, err + } + return full, nil +} + +func (ss *sizeSplitterNoPool) Reader() io.Reader { + return ss.r +}