Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce chunk write queue memory usage #131

Merged
merged 9 commits into from
Jun 2, 2022
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 68 additions & 9 deletions tsdb/chunks/chunk_write_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ package chunks
import (
"errors"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/prometheus/tsdb/chunkenc"
)

const (
// Minimum recorded peak since since the last shrinking of chunkWriteQueue.chunkrefMap to shrink it again.
chunkRefMapShrinkThreshold = 1000

// Minimum interval between shrinking of chunkWriteQueue.chunkRefMap.
chunkRefMapMinShrinkInterval = 10 * time.Minute
)

type chunkWriteJob struct {
cutFile bool
seriesRef HeadSeriesRef
Expand All @@ -38,21 +47,28 @@ type chunkWriteJob struct {
type chunkWriteQueue struct {
jobs chan chunkWriteJob

chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapPeakSize int // Largest size that chunkRefMap has grown to since the last time we shrank it.
chunkRefMapLastShrink time.Time // When the chunkRefMap has been shrunk the last time.

isRunningMtx sync.Mutex // Protects the isRunning property.
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
// isRunningMtx serves two purposes:
// 1. It protects isRunning field.
// 2. It serializes adding of jobs to the chunkRefMap in addJob() method. If jobs channel is full then addJob() will block
// while holding this mutex, which guarantees that chunkRefMap won't ever grow beyond the queue size + 1.
isRunningMtx sync.Mutex
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.

workerWg sync.WaitGroup

writeChunk writeChunkF

// Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical
// Keeping separate counters instead of only a single CounterVec to improve the performance of the critical
// addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec.
adds prometheus.Counter
gets prometheus.Counter
completed prometheus.Counter
shrink prometheus.Counter
}

// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
Expand All @@ -68,13 +84,15 @@ func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChu
)

q := &chunkWriteQueue{
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size),
writeChunk: writeChunk,
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk),
chunkRefMapLastShrink: time.Now(),
writeChunk: writeChunk,

adds: counters.WithLabelValues("add"),
gets: counters.WithLabelValues("get"),
completed: counters.WithLabelValues("complete"),
shrink: counters.WithLabelValues("shrink"),
}

if reg != nil {
Expand Down Expand Up @@ -112,6 +130,42 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
delete(c.chunkRefMap, job.ref)

c.completed.Inc()

c.shrinkChunkRefMap()
}

// shrinkChunkRefMap checks whether the conditions to shrink the chunkRefMap are met,
// if so chunkRefMap is reinitialized. The chunkRefMapMtx must be held when calling this method.
//
// We do this because Go runtime doesn't release internal memory used by map after map has been emptied.
// To achieve that we create new map instead and throw the old one away.
func (c *chunkWriteQueue) shrinkChunkRefMap() {
if len(c.chunkRefMap) > 0 {
// Can't shrink it while there is data in it.
return
}

if c.chunkRefMapPeakSize < chunkRefMapShrinkThreshold {
// Not shrinking it because it has not grown to the minimum threshold yet.
return
}

now := time.Now()

if now.Sub(c.chunkRefMapLastShrink) < chunkRefMapMinShrinkInterval {
// Not shrinking it because the minimum duration between shrink-events has not passed yet.
return
}

// Re-initialize the chunk ref map to half of the peak size that it has grown to since the last re-init event.
// We are trying to hit the sweet spot in the trade-off between initializing it to a very small size
// potentially resulting in many allocations to re-grow it, and initializing it to a large size potentially
// resulting in unused allocated memory.
c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.chunkRefMapPeakSize/2)

c.chunkRefMapPeakSize = 0
c.chunkRefMapLastShrink = now
c.shrink.Inc()
}

func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
Expand All @@ -125,11 +179,16 @@ func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
defer c.isRunningMtx.Unlock()

if !c.isRunning {
return errors.New("queue is not started")
return errors.New("queue is not running")
}

c.chunkRefMapMtx.Lock()
c.chunkRefMap[job.ref] = job.chk

// Keep track of the peak usage of c.chunkRefMap.
if len(c.chunkRefMap) > c.chunkRefMapPeakSize {
c.chunkRefMapPeakSize = len(c.chunkRefMap)
}
c.chunkRefMapMtx.Unlock()

c.jobs <- job
Expand Down