Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
First pass at merging chunks together during compaction
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Marchbanks <[email protected]>
  • Loading branch information
csmarchbanks committed Sep 26, 2018
1 parent 3140f72 commit ef12bcb
Showing 1 changed file with 60 additions and 9 deletions.
69 changes: 60 additions & 9 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ type Compactor interface {

// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
dir string
metrics *compactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
dir string
metrics *compactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
mergeChunks bool
}

type compactorMetrics struct {
Expand Down Expand Up @@ -137,10 +138,11 @@ func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64,
pool = chunkenc.NewPool()
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
mergeChunks: true,
}, nil
}

Expand Down Expand Up @@ -620,6 +622,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
chks[i].Chunk = newChunk
}
}

if c.mergeChunks {
chks = mergeChunks(chks)
}

if err := chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
Expand Down Expand Up @@ -676,6 +683,50 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil
}

func mergeChunks(chks []chunks.Meta) []chunks.Meta {
newChks := make([]chunks.Meta, 0, len(chks))
for i := 0; i < len(chks); i++ {
if i < len(chks)-1 && chks[i].Chunk.NumSamples()+chks[i+1].Chunk.NumSamples() <= 480 {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return chks
}
it := chks[i].Chunk.Iterator()
for it.Next() {
app.Append(it.At())
}
if it.Err() != nil {
return chks
}
minTime := chks[i].MinTime
maxTime := chks[i].MaxTime
ref := chks[i].Ref

for i < len(chks)-1 && newChunk.NumSamples()+chks[i+1].Chunk.NumSamples() <= 480 {
it = chks[i+1].Chunk.Iterator()
for it.Next() {
app.Append(it.At())
}
if it.Err() != nil {
return chks
}
maxTime = chks[i+1].MaxTime
i++
}
newChks = append(newChks, chunks.Meta{
MinTime: minTime,
MaxTime: maxTime,
Ref: ref,
Chunk: newChunk,
})
} else {
newChks = append(newChks, chks[i])
}
}
return newChks
}

type compactionSeriesSet struct {
p index.Postings
index IndexReader
Expand Down

0 comments on commit ef12bcb

Please sign in to comment.