Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snappy mem fix #970

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 8 additions & 31 deletions tempodb/encoding/v2/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,9 @@ func (pool *LZ4Pool) ResetWriter(dst io.Writer, resetWriter io.WriteCloser) (io.
return writer, nil
}

// SnappyPool is a really cool looking pool. Dang that pool is _snappy_.
// SnappyPool is a really cool looking pool. Dang that pool is _snappy_. Reusing readers/writers causes a memory leak. So the pool acts more like
// a traditional factory.
type SnappyPool struct {
readers sync.Pool
writers sync.Pool
}

// Encoding implements WriterPool and ReaderPool
Expand All @@ -261,39 +260,28 @@ func (pool *SnappyPool) Encoding() backend.Encoding {

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*s2.Reader)
reader.Reset(src)
return reader, nil
}
return snappy.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
func (pool *SnappyPool) PutReader(reader io.Reader) {
pool.readers.Put(reader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:(

}

// ResetReader implements ReaderPool
func (pool *SnappyPool) ResetReader(src io.Reader, resetReader io.Reader) (io.Reader, error) {
reader := resetReader.(*s2.Reader)
reader := resetReader.(*snappy.Reader)
reader.Reset(src)
return reader, nil
}

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *SnappyPool) GetWriter(dst io.Writer) (io.WriteCloser, error) {
if w := pool.writers.Get(); w != nil {
writer := w.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
return snappy.NewBufferedWriter(dst), nil
}

// PutWriter places back in the pool a CompressionWriter
func (pool *SnappyPool) PutWriter(writer io.WriteCloser) {
pool.writers.Put(writer)
_ = writer.(*snappy.Writer).Close()
}

// ResetWriter implements WriterPool
Expand Down Expand Up @@ -400,10 +388,9 @@ func (pool *ZstdPool) ResetWriter(dst io.Writer, resetWriter io.WriteCloser) (io
return writer, nil
}

// S2Pool is one s short of s3
// S2Pool is one s short of S3Pool. Reusing readers/writers causes a memory leak. So the pool acts more like
// a traditional factory.
type S2Pool struct {
readers sync.Pool
writers sync.Pool
}

// Encoding implements WriterPool and ReaderPool
Expand All @@ -413,17 +400,11 @@ func (pool *S2Pool) Encoding() backend.Encoding {

// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *S2Pool) GetReader(src io.Reader) (io.Reader, error) {
if r := pool.readers.Get(); r != nil {
reader := r.(*s2.Reader)
reader.Reset(src)
return reader, nil
}
return s2.NewReader(src), nil
}

// PutReader places back in the pool a CompressionReader
func (pool *S2Pool) PutReader(reader io.Reader) {
pool.readers.Put(reader)
}

// ResetReader implements ReaderPool
Expand All @@ -435,17 +416,13 @@ func (pool *S2Pool) ResetReader(src io.Reader, resetReader io.Reader) (io.Reader

// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *S2Pool) GetWriter(dst io.Writer) (io.WriteCloser, error) {
if w := pool.writers.Get(); w != nil {
writer := w.(*s2.Writer)
writer.Reset(dst)
return writer, nil
}
return s2.NewWriter(dst), nil
}

// PutWriter places back in the pool a CompressionWriter
func (pool *S2Pool) PutWriter(writer io.WriteCloser) {
pool.writers.Put(writer)
// only put the writer back in the pool if close succeeds (in case erroring puts it in a bad state)
_ = writer.(*s2.Writer).Close()
}

// ResetWriter implements WriterPool
Expand Down