Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: Improved memory usage while downsampling #529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .errcheck_excludes.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
(github.com/improbable-eng/thanos/vendor/github.com/go-kit/kit/log.Logger).Log
fmt.Fprintln
fmt.Fprint
xjewer marked this conversation as resolved.
Show resolved Hide resolved
fmt.Fprint
4 changes: 2 additions & 2 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 5*60*1000); err != nil {
return err
return errors.Wrap(err, "downsampling to 5 min")
}

case 5 * 60 * 1000:
Expand All @@ -194,7 +194,7 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 60*60*1000); err != nil {
return err
return errors.Wrap(err, "downsampling to 60 min")
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const (
MetaFilename = "meta.json"
)

const (
// MetaVersion is a enumeration of versions supported by Thanos.
MetaVersion1 = iota + 1
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
// extends it by Thanos-specific fields.
type Meta struct {
Expand Down Expand Up @@ -135,7 +140,7 @@ func Read(dir string) (*Meta, error) {
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
if m.Version != 1 {
if m.Version != MetaVersion1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
}
return &m, nil
Expand Down
238 changes: 71 additions & 167 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@ package downsample

import (
"math"
"path/filepath"
"sort"

"github.com/improbable-eng/thanos/pkg/block/metadata"

"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb/chunkenc"

"math/rand"
"os"
"path/filepath"
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
Expand Down Expand Up @@ -53,40 +51,65 @@ func Downsample(
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")

rng := origMeta.MaxTime - origMeta.MinTime
// Generate new block id.
uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))

// Write downsampled data in a custom memory block where we have fine-grained control
xjewer marked this conversation as resolved.
Show resolved Hide resolved
// over created chunks.
// This is necessary since we need to inject special values at the end of chunks for
// some aggregations.
newb := newMemBlock()
// Create block directory to populate with chunks, meta and index files into.
blockDir := filepath.Join(dir, uid.String())
if err := os.MkdirAll(blockDir, 0777); err != nil {
return id, errors.Wrap(err, "mkdir block dir")
}

// Remove blockDir in case of errors.
defer func() {
if err != nil {
var merr tsdb.MultiError
merr.Add(err)
xjewer marked this conversation as resolved.
Show resolved Hide resolved
merr.Add(os.RemoveAll(blockDir))
err = merr.Err()
}
}()

// Copy original meta to the new one. Update downsampling resolution and ULID for a new block.
newMeta := *origMeta
newMeta.Thanos.Downsample.Resolution = resolution
newMeta.ULID = uid

pall, err := indexr.Postings(index.AllPostingsKey())
// Writes downsampled chunks right into the files, avoiding excess memory allocation.
// Flushes index and meta data after aggregations.
streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta)
if err != nil {
return id, errors.Wrap(err, "get streamed block writer")
}
defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer")

postings, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
return id, errors.Wrap(err, "get all postings list")
}
var (
aggrChunks []*AggrChunk
all []sample
chks []chunks.Meta
lset labels.Labels
)
for pall.Next() {
var lset labels.Labels
for postings.Next() {
lset = lset[:0]
chks = chks[:0]
all = all[:0]
aggrChunks = aggrChunks[:0]

// Get series labels and chunks. Downsampled data is sensitive to chunk boundaries
// and we need to preserve them to properly downsample previously downsampled data.
if err := indexr.Series(pall.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", pall.At())
if err := indexr.Series(postings.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", postings.At())
}
// While #183 exists, we sanitize the chunks we retrieved from the block
// before retrieving their samples.
for i, c := range chks {
chk, err := chunkr.Chunk(c.Ref)
if err != nil {
return id, errors.Wrapf(err, "get chunk %d", c.Ref)
return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, postings.At())
}
chks[i].Chunk = chk
}
Expand All @@ -95,155 +118,41 @@ func Downsample(
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d", c.Ref)
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
}
newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)})
continue
}

// Downsample a block that contains aggregate chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
}
res, err := downsampleAggr(
aggrChunks,
&all,
chks[0].MinTime,
chks[len(chks)-1].MaxTime,
origMeta.Thanos.Downsample.Resolution,
resolution,
)

if err != nil {
return id, errors.Wrap(err, "downsample aggregate block")
if err := streamedBlockWriter.WriteSeries(lset, downsampleRaw(all, resolution)); err != nil {
return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At())
}
} else {
// Downsample a block that contains aggregated chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
}
downsampledChunks, err := downsampleAggr(
aggrChunks,
&all,
chks[0].MinTime,
chks[len(chks)-1].MaxTime,
origMeta.Thanos.Downsample.Resolution,
resolution,
)
if err != nil {
return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At())
}
if err := streamedBlockWriter.WriteSeries(lset, downsampledChunks); err != nil {
return id, errors.Wrapf(err, "write series: %d", postings.At())
}
}
newb.addSeries(&series{lset: lset, chunks: res})
}
if pall.Err() != nil {
return id, errors.Wrap(pall.Err(), "iterate series set")
}
comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, NewPool())
if err != nil {
return id, errors.Wrap(err, "create compactor")
}
id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrap(err, "compact head")
}
bdir := filepath.Join(dir, id.String())

var tmeta metadata.Thanos
tmeta = origMeta.Thanos
tmeta.Source = metadata.CompactorSource
tmeta.Downsample.Resolution = resolution

_, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta)
if err != nil {
return id, errors.Wrapf(err, "failed to finalize the block %s", bdir)
if postings.Err() != nil {
return id, errors.Wrap(postings.Err(), "iterate series set")
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return id, errors.Wrap(err, "remove tombstones")
}
return id, nil
id = uid
return
}

// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface
// to allow tsdb.LeveledCompactor to persist the data as a block.
type memBlock struct {
// Dummies to implement unused methods.
tsdb.IndexReader

symbols map[string]struct{}
postings []uint64
series []*series
chunks []chunkenc.Chunk
}

func newMemBlock() *memBlock {
return &memBlock{symbols: map[string]struct{}{}}
}

func (b *memBlock) addSeries(s *series) {
sid := uint64(len(b.series))
b.postings = append(b.postings, sid)
b.series = append(b.series, s)

for _, l := range s.lset {
b.symbols[l.Name] = struct{}{}
b.symbols[l.Value] = struct{}{}
}

for i, cm := range s.chunks {
cid := uint64(len(b.chunks))
s.chunks[i].Ref = cid
b.chunks = append(b.chunks, cm.Chunk)
}
}

func (b *memBlock) Postings(name, val string) (index.Postings, error) {
allName, allVal := index.AllPostingsKey()

if name != allName || val != allVal {
return nil, errors.New("unsupported call to Postings()")
}
sort.Slice(b.postings, func(i, j int) bool {
return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0
})
return index.NewListPostings(b.postings), nil
}

func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
if id >= uint64(len(b.series)) {
return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id)
}
s := b.series[id]

*lset = append((*lset)[:0], s.lset...)
*chks = append((*chks)[:0], s.chunks...)

return nil
}

func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) {
if id >= uint64(len(b.chunks)) {
return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id)
}
return b.chunks[id], nil
}

func (b *memBlock) Symbols() (map[string]struct{}, error) {
return b.symbols, nil
}

func (b *memBlock) SortedPostings(p index.Postings) index.Postings {
return p
}

func (b *memBlock) Index() (tsdb.IndexReader, error) {
return b, nil
}

func (b *memBlock) Chunks() (tsdb.ChunkReader, error) {
return b, nil
}

func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) {
return emptyTombstoneReader{}, nil
}

func (b *memBlock) Close() error {
return nil
}

type emptyTombstoneReader struct{}

func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil }
func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil }
func (emptyTombstoneReader) Total() uint64 { return 0 }
func (emptyTombstoneReader) Close() error { return nil }

// currentWindow returns the end timestamp of the window that t falls into.
func currentWindow(t, r int64) int64 {
// The next timestamp is the next number after s.t that's aligned with window.
Expand Down Expand Up @@ -492,7 +401,7 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes
return res, nil
}

// expandChunkIterator reads all samples from the iterater and appends them to buf.
// expandChunkIterator reads all samples from the iterator and appends them to buf.
// Stale markers and out of order samples are skipped.
func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
// For safety reasons, we check for each sample that it does not go back in time.
Expand Down Expand Up @@ -614,11 +523,6 @@ type sample struct {
v float64
}

type series struct {
lset labels.Labels
chunks []chunks.Meta
}

// CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing
// values as counter reset.
// Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk
Expand Down
Loading