Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ingester flush memory usage. #3200

Merged
merged 12 commits into from
Jan 21, 2021
6 changes: 3 additions & 3 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
// Starting with something sane first then we can refine with more experience.

// Buckets [1KB 2KB 4KB 16KB 32KB to 4MB] by 2
blocksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2)
chunksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2)
// Buckets [64B 128B 256B 512B... to 2MB] by 2
headBufferPool = pool.NewBuffer(64, 2*1024*1024, 2)
)
Expand All @@ -45,7 +45,7 @@ type chunkWithBuffer struct {
func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithBuffer, error) {
// release memory from previous list of chunks.
for _, wc := range wireChunks {
blocksBufferPool.Put(wc.blocks)
chunksBufferPool.Put(wc.blocks)
headBufferPool.Put(wc.head)
wc.Data = nil
wc.Head = nil
Expand All @@ -70,7 +70,7 @@ func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithB
LastUpdated: d.lastUpdated,
Synced: d.synced,
},
blocks: blocksBufferPool.Get(chunkSize),
blocks: chunksBufferPool.Get(chunkSize),
head: headBufferPool.Get(headSize),
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"bytes"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -321,32 +322,41 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
labelsBuilder.Set(nameLabel, logsValue)
metric := labelsBuilder.Labels()

wireChunks := make([]chunk.Chunk, 0, len(cs))
wireChunks := make([]chunk.Chunk, len(cs))
buffers := make([]*bytes.Buffer, len(cs))
defer func() {
for j := range buffers {
chunksBufferPool.Put(buffers[j])
}
}()

// use anonymous function to make lock releasing simpler.
err = func() error {
chunkMtx.Lock()
defer chunkMtx.Unlock()

for _, c := range cs {
for j, c := range cs {
// Ensure that new blocks are cut before flushing as data in the head block is not included otherwise.
if err := c.chunk.Close(); err != nil {
return err
}
firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
c := chunk.NewChunk(
ch := chunk.NewChunk(
userID, fp, metric,
chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
firstTime,
lastTime,
)

chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
buffer := chunksBufferPool.Get(chunkSize)
start := time.Now()
if err := c.Encode(); err != nil {
if err := ch.EncodeTo(buffer); err != nil {
return err
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
wireChunks = append(wireChunks, c)
buffers[j] = buffer
wireChunks[j] = ch
}
return nil
}()
Expand Down
44 changes: 43 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,48 @@ type fullWAL struct{}
func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Stop() error { return nil }

func Benchmark_FlushLoop(b *testing.B) {
var (
size = 5
descs [][]*chunkDesc
lbs = makeRandomLabels()
ctx = user.InjectOrgID(context.Background(), "foo")
_, ing = newTestStore(b, defaultIngesterTestConfig(b), nil)
)

for i := 0; i < size; i++ {
descs = append(descs, buildChunkDecs(b))
}

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for i := 0; i < size; i++ {
wg.Add(1)
go func(loop int) {
defer wg.Done()
require.NoError(b, ing.flushChunks(ctx, 0, lbs, descs[loop], &sync.RWMutex{}))
}(i)
}
wg.Wait()
}
}

func buildChunkDecs(t testing.TB) []*chunkDesc {
res := make([]*chunkDesc, 10)
for i := range res {
res[i] = &chunkDesc{
closed: true,
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, dummyConf().BlockSize, dummyConf().TargetChunkSize),
}
fillChunk(t, res[i].chunk)
require.NoError(t, res[i].chunk.Close())
}
return res
}

func TestWALFullFlush(t *testing.T) {
// technically replaced with a fake wal, but the ingester New() function creates a regular wal first,
// so we enable creation/cleanup even though it remains unused.
Expand Down Expand Up @@ -211,7 +253,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
}

// nolint
func defaultIngesterTestConfig(t *testing.T) Config {
func defaultIngesterTestConfig(t testing.TB) Config {
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec(), nil)
require.NoError(t, err)

Expand Down