Skip to content

Commit

Permalink
Improve ingester flush memory usage. (#3200)
Browse files Browse the repository at this point in the history
* Add basic benchmark.

Signed-off-by: Cyril Tovena <[email protected]>

* Improves memory usage of checkpointer series iterator.

Signed-off-by: Cyril Tovena <[email protected]>

* make lint.

Signed-off-by: Cyril Tovena <[email protected]>

* Add benchmark for flush loop.

```
goos: darwin
goarch: amd64
pkg: github.com/grafana/loki/pkg/ingester
Benchmark_FlushLoop-16    	      16	  70040359 ns/op	241901661 B/op	    1112 allocs/op
PASS
ok  	github.com/grafana/loki/pkg/ingester	4.829s
```

Signed-off-by: Cyril Tovena <[email protected]>

* Improve benchmark.

Signed-off-by: Cyril Tovena <[email protected]>

* Re-use buffer when flushing chunks.

benchcmp

```
❯ benchcmp  before.txt after.txt
benchmark                  old ns/op     new ns/op     delta
Benchmark_FlushLoop-16     104723243     9233780       -91.18%

benchmark                  old allocs     new allocs     delta
Benchmark_FlushLoop-16     1115           568            -49.06%

benchmark                  old bytes     new bytes     delta
Benchmark_FlushLoop-16     241857243     989590        -99.59%
```

Signed-off-by: Cyril Tovena <[email protected]>

* better size computation.

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes test ordering flakyness.

Signed-off-by: Cyril Tovena <[email protected]>

* lint.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Jan 21, 2021
1 parent 07ece2b commit ea488f9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
6 changes: 3 additions & 3 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
// Starting with something sane first then we can refine with more experience.

// Buckets [1KB 2KB 4KB 16KB 32KB to 4MB] by 2
blocksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2)
chunksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2)
// Buckets [64B 128B 256B 512B... to 2MB] by 2
headBufferPool = pool.NewBuffer(64, 2*1024*1024, 2)
)
Expand All @@ -45,7 +45,7 @@ type chunkWithBuffer struct {
func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithBuffer, error) {
// release memory from previous list of chunks.
for _, wc := range wireChunks {
blocksBufferPool.Put(wc.blocks)
chunksBufferPool.Put(wc.blocks)
headBufferPool.Put(wc.head)
wc.Data = nil
wc.Head = nil
Expand All @@ -70,7 +70,7 @@ func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithB
LastUpdated: d.lastUpdated,
Synced: d.synced,
},
blocks: blocksBufferPool.Get(chunkSize),
blocks: chunksBufferPool.Get(chunkSize),
head: headBufferPool.Get(headSize),
}

Expand Down
20 changes: 15 additions & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"bytes"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -321,32 +322,41 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
labelsBuilder.Set(nameLabel, logsValue)
metric := labelsBuilder.Labels()

wireChunks := make([]chunk.Chunk, 0, len(cs))
wireChunks := make([]chunk.Chunk, len(cs))
buffers := make([]*bytes.Buffer, len(cs))
defer func() {
for j := range buffers {
chunksBufferPool.Put(buffers[j])
}
}()

// use anonymous function to make lock releasing simpler.
err = func() error {
chunkMtx.Lock()
defer chunkMtx.Unlock()

for _, c := range cs {
for j, c := range cs {
// Ensure that new blocks are cut before flushing as data in the head block is not included otherwise.
if err := c.chunk.Close(); err != nil {
return err
}
firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
c := chunk.NewChunk(
ch := chunk.NewChunk(
userID, fp, metric,
chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize),
firstTime,
lastTime,
)

chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
buffer := chunksBufferPool.Get(chunkSize)
start := time.Now()
if err := c.Encode(); err != nil {
if err := ch.EncodeTo(buffer); err != nil {
return err
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
wireChunks = append(wireChunks, c)
buffers[j] = buffer
wireChunks[j] = ch
}
return nil
}()
Expand Down
44 changes: 43 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,48 @@ type fullWAL struct{}
func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Stop() error { return nil }

func Benchmark_FlushLoop(b *testing.B) {
var (
size = 5
descs [][]*chunkDesc
lbs = makeRandomLabels()
ctx = user.InjectOrgID(context.Background(), "foo")
_, ing = newTestStore(b, defaultIngesterTestConfig(b), nil)
)

for i := 0; i < size; i++ {
descs = append(descs, buildChunkDecs(b))
}

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for i := 0; i < size; i++ {
wg.Add(1)
go func(loop int) {
defer wg.Done()
require.NoError(b, ing.flushChunks(ctx, 0, lbs, descs[loop], &sync.RWMutex{}))
}(i)
}
wg.Wait()
}
}

func buildChunkDecs(t testing.TB) []*chunkDesc {
res := make([]*chunkDesc, 10)
for i := range res {
res[i] = &chunkDesc{
closed: true,
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, dummyConf().BlockSize, dummyConf().TargetChunkSize),
}
fillChunk(t, res[i].chunk)
require.NoError(t, res[i].chunk.Close())
}
return res
}

func TestWALFullFlush(t *testing.T) {
// technically replaced with a fake wal, but the ingester New() function creates a regular wal first,
// so we enable creation/cleanup even though it remains unused.
Expand Down Expand Up @@ -211,7 +253,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
}

// nolint
func defaultIngesterTestConfig(t *testing.T) Config {
func defaultIngesterTestConfig(t testing.TB) Config {
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec(), nil)
require.NoError(t, err)

Expand Down

0 comments on commit ea488f9

Please sign in to comment.