From 1145585049dabc161e62b140db43da8e81316adf Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Tue, 18 Sep 2018 23:35:03 +0100 Subject: [PATCH 1/6] compact: avoid memory leak while downsampling Add instant writer implementation to shrink memory consumption during the downsampling stage. Encoded chunks are written to chunks blob files right away after series was handled. Flush method closes chunk writer and sync all symbols, series, labels, posting and meta data to files. It still works in one thread, hence operates only on one core. Estimated memory consumption is unlikely more than 1Gb, but depends on data set, labels size and series' density: chunk data size (512MB) + encoded buffers + index data Fixes #297 --- .errcheck_excludes.txt | 2 +- pkg/compact/downsample/downsample.go | 149 ++-------- pkg/compact/downsample/downsample_test.go | 113 ++++++- pkg/compact/downsample/writer.go | 342 ++++++++++++++++++++++ 4 files changed, 466 insertions(+), 140 deletions(-) create mode 100644 pkg/compact/downsample/writer.go diff --git a/.errcheck_excludes.txt b/.errcheck_excludes.txt index 5fad7c252e..9e2e3a71e1 100644 --- a/.errcheck_excludes.txt +++ b/.errcheck_excludes.txt @@ -1,3 +1,3 @@ (github.com/improbable-eng/thanos/vendor/github.com/go-kit/kit/log.Logger).Log fmt.Fprintln -fmt.Fprint \ No newline at end of file +fmt.Fprint diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index f5afecdcd0..c0bc5a2344 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -2,16 +2,12 @@ package downsample import ( "math" - "path/filepath" - "sort" "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunkenc" - "os" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" @@ -53,13 +49,16 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") - rng := origMeta.MaxTime - origMeta.MinTime - - // Write downsampled data in a custom memory block where we have fine-grained control - // over created chunks. + // NewWriter downsampled data and puts chunks immediately into files, allow save lot of memory of aggregated data. + // Flushes index and meta data afterwards aggregations. // This is necessary since we need to inject special values at the end of chunks for // some aggregations. - newb := newMemBlock() + writer, err := NewWriter(dir, logger, *origMeta, resolution) + defer runutil.CloseWithErrCapture(logger, &err, writer, "downsample instant writer") + + if err != nil { + return id, errors.Wrap(err, "get instantWriter") + } pall, err := indexr.Postings(index.AllPostingsKey()) if err != nil { @@ -86,7 +85,7 @@ func Downsample( for i, c := range chks { chk, err := chunkr.Chunk(c.Ref) if err != nil { - return id, errors.Wrapf(err, "get chunk %d", c.Ref) + return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, pall.At()) } chks[i].Chunk = chk } @@ -95,10 +94,12 @@ func Downsample( if origMeta.Thanos.Downsample.Resolution == 0 { for _, c := range chks { if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil { - return id, errors.Wrapf(err, "expand chunk %d", c.Ref) + return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At()) } } - newb.addSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}) + if err := writer.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil { + return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At()) + } continue } @@ -116,134 +117,24 @@ func Downsample( ) if err != nil { - return id, errors.Wrap(err, "downsample aggregate block") + return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At()) + } + if err := writer.AddSeries(&series{lset: lset, chunks: res}); err != nil { + return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At()) } - newb.addSeries(&series{lset: lset, chunks: res}) } if pall.Err() != nil { return id, errors.Wrap(pall.Err(), "iterate series set") } - comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, NewPool()) - if err != nil { - return id, errors.Wrap(err, "create compactor") - } - id, err = comp.Write(dir, newb, origMeta.MinTime, origMeta.MaxTime, &origMeta.BlockMeta) - if err != nil { - return id, errors.Wrap(err, "compact head") - } - bdir := filepath.Join(dir, id.String()) - - var tmeta metadata.Thanos - tmeta = origMeta.Thanos - tmeta.Source = metadata.CompactorSource - tmeta.Downsample.Resolution = resolution - _, err = metadata.InjectThanos(logger, bdir, tmeta, &origMeta.BlockMeta) + id, err = writer.Flush() if err != nil { - return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) + return id, errors.Wrap(err, "compact head") } - if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return id, errors.Wrap(err, "remove tombstones") - } return id, nil } -// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface -// to allow tsdb.LeveledCompactor to persist the data as a block. -type memBlock struct { - // Dummies to implement unused methods. - tsdb.IndexReader - - symbols map[string]struct{} - postings []uint64 - series []*series - chunks []chunkenc.Chunk -} - -func newMemBlock() *memBlock { - return &memBlock{symbols: map[string]struct{}{}} -} - -func (b *memBlock) addSeries(s *series) { - sid := uint64(len(b.series)) - b.postings = append(b.postings, sid) - b.series = append(b.series, s) - - for _, l := range s.lset { - b.symbols[l.Name] = struct{}{} - b.symbols[l.Value] = struct{}{} - } - - for i, cm := range s.chunks { - cid := uint64(len(b.chunks)) - s.chunks[i].Ref = cid - b.chunks = append(b.chunks, cm.Chunk) - } -} - -func (b *memBlock) Postings(name, val string) (index.Postings, error) { - allName, allVal := index.AllPostingsKey() - - if name != allName || val != allVal { - return nil, errors.New("unsupported call to Postings()") - } - sort.Slice(b.postings, func(i, j int) bool { - return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 - }) - return index.NewListPostings(b.postings), nil -} - -func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error { - if id >= uint64(len(b.series)) { - return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id) - } - s := b.series[id] - - *lset = append((*lset)[:0], s.lset...) - *chks = append((*chks)[:0], s.chunks...) - - return nil -} - -func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { - if id >= uint64(len(b.chunks)) { - return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) - } - return b.chunks[id], nil -} - -func (b *memBlock) Symbols() (map[string]struct{}, error) { - return b.symbols, nil -} - -func (b *memBlock) SortedPostings(p index.Postings) index.Postings { - return p -} - -func (b *memBlock) Index() (tsdb.IndexReader, error) { - return b, nil -} - -func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { - return b, nil -} - -func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return emptyTombstoneReader{}, nil -} - -func (b *memBlock) Close() error { - return nil -} - -type emptyTombstoneReader struct{} - -func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } -func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } -func (emptyTombstoneReader) Total() uint64 { return 0 } -func (emptyTombstoneReader) Close() error { return nil } - // currentWindow returns the end timestamp of the window that t falls into. func currentWindow(t, r int64) int64 { // The next timestamp is the next number after s.t that's aligned with window. @@ -492,7 +383,7 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, inRes, outRes return res, nil } -// expandChunkIterator reads all samples from the iterater and appends them to buf. +// expandChunkIterator reads all samples from the iterator and appends them to buf. // Stale markers and out of order samples are skipped. func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error { // For safety reasons, we check for each sample that it does not go back in time. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index f55cfc7e61..177113fde5 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -5,6 +5,7 @@ import ( "math" "os" "path/filepath" + "sort" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunkenc" @@ -68,30 +70,30 @@ func TestDownsampleAggr(t *testing.T) { { lset: labels.FromStrings("__name__", "a"), inAggr: map[AggrType][]sample{ - AggrCount: []sample{ + AggrCount: { {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrSum: []sample{ + AggrSum: { {199, 5}, {299, 1}, {399, 10}, {400, 3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrMin: []sample{ + AggrMin: { {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrMax: []sample{ + AggrMax: { {199, 5}, {299, 1}, {399, 10}, {400, -3}, {499, 10}, {699, 0}, {999, 100}, }, - AggrCounter: []sample{ + AggrCounter: { {99, 100}, {299, 150}, {499, 210}, {499, 10}, // chunk 1 {599, 20}, {799, 50}, {999, 120}, {999, 50}, // chunk 2, no reset {1099, 40}, {1199, 80}, {1299, 110}, // chunk 3, reset }, }, output: map[AggrType][]sample{ - AggrCount: []sample{{499, 29}, {999, 100}}, - AggrSum: []sample{{499, 29}, {999, 100}}, - AggrMin: []sample{{499, -3}, {999, 0}}, - AggrMax: []sample{{499, 10}, {999, 100}}, - AggrCounter: []sample{{499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, + AggrCount: {{499, 29}, {999, 100}}, + AggrSum: {{499, 29}, {999, 100}}, + AggrMin: {{499, -3}, {999, 0}}, + AggrMax: {{499, 10}, {999, 100}}, + AggrCounter: {{499, 210}, {999, 320}, {1299, 430}, {1299, 110}}, }, }, } @@ -375,3 +377,94 @@ func (it *sampleIterator) Seek(int64) bool { func (it *sampleIterator) At() (t int64, v float64) { return it.l[it.i].t, it.l[it.i].v } + +// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface +// to allow tsdb.instantWriter to persist the data as a block. +type memBlock struct { + // Dummies to implement unused methods. + tsdb.IndexReader + + symbols map[string]struct{} + postings []uint64 + series []*series + chunks []chunkenc.Chunk + + numberOfChunks uint64 +} + +func newMemBlock() *memBlock { + return &memBlock{symbols: map[string]struct{}{}} +} + +func (b *memBlock) addSeries(s *series) { + sid := uint64(len(b.series)) + b.postings = append(b.postings, sid) + b.series = append(b.series, s) + + for _, l := range s.lset { + b.symbols[l.Name] = struct{}{} + b.symbols[l.Value] = struct{}{} + } + + for i, cm := range s.chunks { + s.chunks[i].Ref = b.numberOfChunks + b.chunks = append(b.chunks, cm.Chunk) + b.numberOfChunks++ + } +} + +func (b *memBlock) Postings(name, val string) (index.Postings, error) { + allName, allVal := index.AllPostingsKey() + + if name != allName || val != allVal { + return nil, errors.New("unsupported call to Postings()") + } + sort.Slice(b.postings, func(i, j int) bool { + return labels.Compare(b.series[b.postings[i]].lset, b.series[b.postings[j]].lset) < 0 + }) + return index.NewListPostings(b.postings), nil +} + +func (b *memBlock) Series(id uint64, lset *labels.Labels, chks *[]chunks.Meta) error { + if id >= uint64(len(b.series)) { + return errors.Wrapf(tsdb.ErrNotFound, "series with ID %d does not exist", id) + } + s := b.series[id] + + *lset = append((*lset)[:0], s.lset...) + *chks = append((*chks)[:0], s.chunks...) + + return nil +} + +func (b *memBlock) Chunk(id uint64) (chunkenc.Chunk, error) { + if id >= uint64(b.numberOfChunks) { + return nil, errors.Wrapf(tsdb.ErrNotFound, "chunk with ID %d does not exist", id) + } + + return b.chunks[id], nil +} + +func (b *memBlock) Symbols() (map[string]struct{}, error) { + return b.symbols, nil +} + +func (b *memBlock) SortedPostings(p index.Postings) index.Postings { + return p +} + +func (b *memBlock) Index() (tsdb.IndexReader, error) { + return b, nil +} + +func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { + return b, nil +} + +func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { + return tsdb.EmptyTombstoneReader(), nil +} + +func (b *memBlock) Close() error { + return nil +} diff --git a/pkg/compact/downsample/writer.go b/pkg/compact/downsample/writer.go new file mode 100644 index 0000000000..b7c13469f9 --- /dev/null +++ b/pkg/compact/downsample/writer.go @@ -0,0 +1,342 @@ +package downsample + +import ( + "encoding/json" + "math/rand" + "os" + "path/filepath" + "sort" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/index" + "github.com/prometheus/tsdb/labels" +) + +type symbols map[string]struct{} +type labelValues map[string]struct{} + +func (lv labelValues) add(value string) { + lv[value] = struct{}{} +} +func (lv labelValues) get(set *[]string) { + for value := range lv { + *set = append(*set, value) + } +} + +type labelsValues map[string]labelValues + +func (lv labelsValues) add(labelSet labels.Labels) { + for _, label := range labelSet { + values, ok := lv[label.Name] + if !ok { + // Add new label. + values = labelValues{} + lv[label.Name] = values + } + values.add(label.Value) + } +} + +// InstantWriter writes downsampled block to a new data block. Chunks will be written immediately in order to avoid +// memory consumption. +type instantWriter struct { + dir string + tmpDir string + logger log.Logger + uid ulid.ULID + resolution int64 + + symbols symbols + postings []uint64 + series []*series + + chunkWriter tsdb.ChunkWriter + meta block.Meta + totalChunks uint64 + totalSamples uint64 +} + +func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*instantWriter, error) { + var err error + var chunkWriter tsdb.ChunkWriter + + // Generate new block id + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) + uid := ulid.MustNew(ulid.Now(), entropy) + + // Populate chunk, meta and index files into temporary directory with + // data of all blocks. + dir = filepath.Join(dir, uid.String()) + tmpDir, err := createTmpDir(dir) + if err != nil { + return nil, err + } + + chunkDir := func(dir string) string { + return filepath.Join(dir, block.ChunksDirname) + } + + chunkWriter, err = chunks.NewWriter(chunkDir(tmpDir)) + if err != nil { + return nil, errors.Wrap(err, "create tmp chunk instantWriter") + } + + return &instantWriter{ + logger: l, + dir: dir, + tmpDir: tmpDir, + symbols: symbols{}, + chunkWriter: chunkWriter, + uid: uid, + meta: originMeta, + resolution: resolution, + }, nil +} + +func (w *instantWriter) AddSeries(s *series) error { + if len(s.chunks) == 0 { + level.Info(w.logger).Log("empty chunks happened", s.lset) + } + + if err := w.chunkWriter.WriteChunks(s.chunks...); err != nil { + return errors.Wrap(err, "add series") + } + + w.postings = append(w.postings, uint64(len(w.series))) + w.series = append(w.series, s) + + for _, l := range s.lset { + w.symbols[l.Name] = struct{}{} + w.symbols[l.Value] = struct{}{} + } + + w.totalChunks += uint64(len(s.chunks)) + for i := range s.chunks { + chk := &s.chunks[i] + w.totalSamples += uint64(chk.Chunk.NumSamples()) + chk.Chunk = nil + } + + return nil +} + +func (w *instantWriter) Flush() (ulid.ULID, error) { + var err error + + // All the chunks have been written by this moment, can close writer. + if err := w.chunkWriter.Close(); err != nil { + return w.uid, errors.Wrap(err, "close chunk writer") + } + w.chunkWriter = nil + + indexw, err := index.NewWriter(filepath.Join(w.tmpDir, block.IndexFilename)) + if err != nil { + return w.uid, errors.Wrap(err, "open index instantWriter") + } + + if err := w.populateBlock(indexw); err != nil { + return w.uid, errors.Wrap(err, "write compaction") + } + + if err = w.writeMetaFile(w.tmpDir); err != nil { + return w.uid, errors.Wrap(err, "write merged meta") + } + + if err = indexw.Close(); err != nil { + return w.uid, errors.Wrap(err, "close index instantWriter") + } + + df, err := fileutil.OpenDir(w.tmpDir) + if err != nil { + return w.uid, errors.Wrap(err, "open temporary block dir") + } + defer func() { + if df != nil { + if err := df.Close(); err != nil { + log.Logger(w.logger).Log(err, "close temporary block dir") + } + } + }() + + if err := fileutil.Fsync(df); err != nil { + return w.uid, errors.Wrap(err, "sync temporary dir") + } + + // Close temp dir before rename block dir (for windows platform). + if err = df.Close(); err != nil { + return w.uid, errors.Wrap(err, "close temporary dir") + } + df = nil + + // Block successfully written, make visible and remove old ones. + err = renameFile(w.tmpDir, w.dir) + // Assume we cleaned tmp dir up + w.tmpDir = "" + if err != nil { + return w.uid, errors.Wrap(err, "rename block dir") + } + + level.Info(w.logger).Log( + "msg", "write downsampled block", + "mint", w.meta.MinTime, + "maxt", w.meta.MaxTime, + "ulid", w.meta.ULID, + "resolution", w.meta.Thanos.Downsample.Resolution, + ) + return w.uid, nil +} + +// populateBlock fills the index and chunk writers with new data gathered as the union +// of the provided blocks. It returns meta information for the new block. +func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { + var ( + i = uint64(0) + labelsValues = labelsValues{} + memPostings = index.NewUnorderedMemPostings() + ) + + if err := indexWriter.AddSymbols(w.symbols); err != nil { + return errors.Wrap(err, "add symbols") + } + + sort.Slice(w.postings, func(i, j int) bool { + return labels.Compare(w.series[w.postings[i]].lset, w.series[w.postings[j]].lset) < 0 + }) + + all := index.NewListPostings(w.postings) + // all := w.postings.All() + for all.Next() { + // i := all.At() + s := w.series[i] + // Skip the series with all deleted chunks. + if len(s.chunks) == 0 { + level.Info(w.logger).Log("empty chunks", i, s.lset) + continue + } + + if err := indexWriter.AddSeries(uint64(i), s.lset, s.chunks...); err != nil { + return errors.Wrap(err, "add series") + } + + labelsValues.add(s.lset) + memPostings.Add(i, s.lset) + i++ + } + + s := make([]string, 0, 256) + for n, v := range labelsValues { + s = s[:0] + v.get(&s) + if err := indexWriter.WriteLabelIndex([]string{n}, s); err != nil { + return errors.Wrap(err, "write label index") + } + } + + memPostings.EnsureOrder() + + for _, l := range memPostings.SortedKeys() { + if err := indexWriter.WritePostings(l.Name, l.Value, memPostings.Get(l.Name, l.Value)); err != nil { + return errors.Wrap(err, "write postings") + } + } + return nil +} + +// TODO probably tsdb.BlockMeta should expose method writeToFile /w encode. +func (w *instantWriter) writeMetaFile(dest string) error { + w.meta.ULID = w.uid + w.meta.Version = 1 + w.meta.Thanos.Source = block.CompactorSource + w.meta.Thanos.Downsample.Resolution = w.resolution + w.meta.Stats.NumChunks = w.totalChunks + w.meta.Stats.NumSamples = w.totalSamples + w.meta.Stats.NumSeries = uint64(len(w.series)) + + // Make any changes to the file appear atomic. + path := filepath.Join(dest, block.MetaFilename) + tmp := path + ".tmp" + + f, err := os.Create(tmp) + if err != nil { + return errors.Wrapf(err, "create tmp meta file %s", tmp) + } + + enc := json.NewEncoder(f) + enc.SetIndent("", "\t") + + var merr tsdb.MultiError + + if merr.Add(enc.Encode(w.meta)); merr.Err() != nil { + merr.Add(f.Close()) + return errors.Wrapf(merr.Err(), "encoding meta file to json %s", tmp) + } + if err := f.Close(); err != nil { + return errors.Wrapf(err, "close tmp meta file %s", tmp) + } + + if err := renameFile(tmp, path); err != nil { + return errors.Wrapf(err, "rename tmp meta file %s", tmp) + } + + return nil +} + +func (w *instantWriter) Close() error { + var merr tsdb.MultiError + + if w.tmpDir != "" { + merr.Add(os.RemoveAll(w.tmpDir)) + } + + if w.chunkWriter != nil { + merr.Add(w.chunkWriter.Close()) + } + + if merr.Err() != nil { + return errors.Wrap(merr.Err(), "close chunk writer") + } + return nil +} + +func renameFile(from, to string) error { + if err := os.RemoveAll(to); err != nil { + return err + } + if err := os.Rename(from, to); err != nil { + return err + } + + // Directory was renamed; sync parent dir to persist rename. + pdir, err := fileutil.OpenDir(filepath.Dir(to)) + if err != nil { + return err + } + + var merr tsdb.MultiError + merr.Add(fileutil.Fsync(pdir)) + merr.Add(pdir.Close()) + return merr.Err() +} + +func createTmpDir(parent string) (string, error) { + tmp := parent + ".tmp" + + if err := os.RemoveAll(tmp); err != nil { + return "", errors.Wrap(err, "removing tmp dir") + } + + if err := os.MkdirAll(tmp, 0777); err != nil { + return "", errors.Wrap(err, "mkdir tmp dir") + } + + return tmp, nil +} From 9467c0321403d31d933f33af3b6a9aeff2500ca8 Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Tue, 9 Oct 2018 18:46:45 +0100 Subject: [PATCH 2/6] compact: clarify purpose of streamed block writer Add comments and close resources properly. --- pkg/compact/downsample/downsample.go | 17 ++--- pkg/compact/downsample/downsample_test.go | 2 +- .../{writer.go => streamed_block_writer.go} | 72 +++++++++++-------- 3 files changed, 50 insertions(+), 41 deletions(-) rename pkg/compact/downsample/{writer.go => streamed_block_writer.go} (79%) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index c0bc5a2344..fdb2fb7b83 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -49,16 +49,13 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") - // NewWriter downsampled data and puts chunks immediately into files, allow save lot of memory of aggregated data. + // Writes downsampled chunks right into the files, avoiding excess memory allocation. // Flushes index and meta data afterwards aggregations. - // This is necessary since we need to inject special values at the end of chunks for - // some aggregations. - writer, err := NewWriter(dir, logger, *origMeta, resolution) - defer runutil.CloseWithErrCapture(logger, &err, writer, "downsample instant writer") - + streamedBlockWriter, err := NewWriter(dir, logger, *origMeta, resolution) if err != nil { - return id, errors.Wrap(err, "get instantWriter") + return id, errors.Wrap(err, "get streamed block writer") } + defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer") pall, err := indexr.Postings(index.AllPostingsKey()) if err != nil { @@ -97,7 +94,7 @@ func Downsample( return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At()) } } - if err := writer.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil { + if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil { return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At()) } continue @@ -119,7 +116,7 @@ func Downsample( if err != nil { return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At()) } - if err := writer.AddSeries(&series{lset: lset, chunks: res}); err != nil { + if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: res}); err != nil { return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At()) } } @@ -127,7 +124,7 @@ func Downsample( return id, errors.Wrap(pall.Err(), "iterate series set") } - id, err = writer.Flush() + id, err = streamedBlockWriter.Flush() if err != nil { return id, errors.Wrap(err, "compact head") } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 177113fde5..9d4bb5e753 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -379,7 +379,7 @@ func (it *sampleIterator) At() (t int64, v float64) { } // memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface -// to allow tsdb.instantWriter to persist the data as a block. +// to allow tsdb.StreamedBlockWriter to persist the data as a block. type memBlock struct { // Dummies to implement unused methods. tsdb.IndexReader diff --git a/pkg/compact/downsample/writer.go b/pkg/compact/downsample/streamed_block_writer.go similarity index 79% rename from pkg/compact/downsample/writer.go rename to pkg/compact/downsample/streamed_block_writer.go index b7c13469f9..c251c5ca15 100644 --- a/pkg/compact/downsample/writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -21,11 +21,13 @@ import ( ) type symbols map[string]struct{} + type labelValues map[string]struct{} func (lv labelValues) add(value string) { lv[value] = struct{}{} } + func (lv labelValues) get(set *[]string) { for value := range lv { *set = append(*set, value) @@ -46,14 +48,14 @@ func (lv labelsValues) add(labelSet labels.Labels) { } } -// InstantWriter writes downsampled block to a new data block. Chunks will be written immediately in order to avoid -// memory consumption. -type instantWriter struct { - dir string - tmpDir string - logger log.Logger - uid ulid.ULID - resolution int64 +// StreamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption +// by means writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be +// flushed afterwards, when there aren't more series to process. +type StreamedBlockWriter struct { + dir string + tmpDir string + logger log.Logger + uid ulid.ULID symbols symbols postings []uint64 @@ -65,11 +67,13 @@ type instantWriter struct { totalSamples uint64 } -func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*instantWriter, error) { +// NewWriter returns StreamedBlockWriter instance. Caller is responsible to finalize the writing with Flush method to write +// the meta and index file and Close all io.Closers +func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*StreamedBlockWriter, error) { var err error var chunkWriter tsdb.ChunkWriter - // Generate new block id + // Generate new block id. entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -81,16 +85,14 @@ func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64 return nil, err } - chunkDir := func(dir string) string { - return filepath.Join(dir, block.ChunksDirname) - } - - chunkWriter, err = chunks.NewWriter(chunkDir(tmpDir)) + chunkWriter, err = chunks.NewWriter(filepath.Join(tmpDir, block.ChunksDirname)) if err != nil { - return nil, errors.Wrap(err, "create tmp chunk instantWriter") + return nil, errors.Wrap(err, "create tmp chunk StreamedBlockWriter") } - return &instantWriter{ + originMeta.Thanos.Downsample.Resolution = resolution + + return &StreamedBlockWriter{ logger: l, dir: dir, tmpDir: tmpDir, @@ -98,13 +100,15 @@ func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64 chunkWriter: chunkWriter, uid: uid, meta: originMeta, - resolution: resolution, }, nil } -func (w *instantWriter) AddSeries(s *series) error { +func (w *StreamedBlockWriter) AddSeries(s *series) error { + if w.chunkWriter == nil { + panic("Series can't be added, ChunkWriter has been closed") + } if len(s.chunks) == 0 { - level.Info(w.logger).Log("empty chunks happened", s.lset) + level.Warn(w.logger).Log("empty chunks happened", s.lset) } if err := w.chunkWriter.WriteChunks(s.chunks...); err != nil { @@ -129,10 +133,11 @@ func (w *instantWriter) AddSeries(s *series) error { return nil } -func (w *instantWriter) Flush() (ulid.ULID, error) { +// Flush saves prepared index and meta data to corresponding files. +// Be sure to call this, if all series have to be handled by this moment, as +func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { var err error - // All the chunks have been written by this moment, can close writer. if err := w.chunkWriter.Close(); err != nil { return w.uid, errors.Wrap(err, "close chunk writer") } @@ -140,9 +145,17 @@ func (w *instantWriter) Flush() (ulid.ULID, error) { indexw, err := index.NewWriter(filepath.Join(w.tmpDir, block.IndexFilename)) if err != nil { - return w.uid, errors.Wrap(err, "open index instantWriter") + return w.uid, errors.Wrap(err, "open index StreamedBlockWriter") } + defer func() { + if indexw != nil { + if err := indexw.Close(); err != nil { + level.Error(w.logger).Log(err, "close index StreamedBlockWriter") + } + } + }() + if err := w.populateBlock(indexw); err != nil { return w.uid, errors.Wrap(err, "write compaction") } @@ -152,8 +165,9 @@ func (w *instantWriter) Flush() (ulid.ULID, error) { } if err = indexw.Close(); err != nil { - return w.uid, errors.Wrap(err, "close index instantWriter") + return w.uid, errors.Wrap(err, "close index StreamedBlockWriter") } + indexw = nil df, err := fileutil.OpenDir(w.tmpDir) if err != nil { @@ -197,7 +211,7 @@ func (w *instantWriter) Flush() (ulid.ULID, error) { // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { +func (w *StreamedBlockWriter) populateBlock(indexWriter tsdb.IndexWriter) error { var ( i = uint64(0) labelsValues = labelsValues{} @@ -213,9 +227,7 @@ func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { }) all := index.NewListPostings(w.postings) - // all := w.postings.All() for all.Next() { - // i := all.At() s := w.series[i] // Skip the series with all deleted chunks. if len(s.chunks) == 0 { @@ -252,11 +264,10 @@ func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { } // TODO probably tsdb.BlockMeta should expose method writeToFile /w encode. -func (w *instantWriter) writeMetaFile(dest string) error { +func (w *StreamedBlockWriter) writeMetaFile(dest string) error { w.meta.ULID = w.uid w.meta.Version = 1 w.meta.Thanos.Source = block.CompactorSource - w.meta.Thanos.Downsample.Resolution = w.resolution w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples w.meta.Stats.NumSeries = uint64(len(w.series)) @@ -290,7 +301,7 @@ func (w *instantWriter) writeMetaFile(dest string) error { return nil } -func (w *instantWriter) Close() error { +func (w *StreamedBlockWriter) Close() error { var merr tsdb.MultiError if w.tmpDir != "" { @@ -299,6 +310,7 @@ func (w *instantWriter) Close() error { if w.chunkWriter != nil { merr.Add(w.chunkWriter.Close()) + w.chunkWriter = nil } if merr.Err() != nil { From cbca7e478e3b5eebcc10c64c84bf1e58d956b611 Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Wed, 14 Nov 2018 01:22:42 +0000 Subject: [PATCH 3/6] downsample: fix postings index Use proper posting index to fetch series data with label set and chunks --- pkg/compact/downsample/streamed_block_writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index c251c5ca15..7ec60ec932 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -228,7 +228,7 @@ func (w *StreamedBlockWriter) populateBlock(indexWriter tsdb.IndexWriter) error all := index.NewListPostings(w.postings) for all.Next() { - s := w.series[i] + s := w.series[all.At()] // Skip the series with all deleted chunks. if len(s.chunks) == 0 { level.Info(w.logger).Log("empty chunks", i, s.lset) From b695d1316c2af3871c2266f63df04ac1ec436f1d Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Thu, 20 Dec 2018 21:20:22 +0000 Subject: [PATCH 4/6] Add stream writer an ability to write index data right during the downsampling process. One of the trade-offs is to preserve symbols from raw blocks, as we have to write them before preserving the series. Stream writer allows downsample a huge data blocks with no needs to keep all series in RAM, the only need it preserve label values and postings references. --- cmd/thanos/downsample.go | 4 +- pkg/compact/downsample/downsample.go | 54 ++-- pkg/compact/downsample/downsample_test.go | 5 + .../downsample/streamed_block_writer.go | 296 +++++++++--------- 4 files changed, 177 insertions(+), 182 deletions(-) diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index e53cd7680b..fed8946bd4 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -173,7 +173,7 @@ func downsampleBucket( continue } if err := processDownsampling(ctx, logger, bkt, m, dir, 5*60*1000); err != nil { - return err + return errors.Wrap(err, "downsampling to 5 min") } case 5 * 60 * 1000: @@ -194,7 +194,7 @@ func downsampleBucket( continue } if err := processDownsampling(ctx, logger, bkt, m, dir, 60*60*1000); err != nil { - return err + return errors.Wrap(err, "downsampling to 60 min") } } } diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index fdb2fb7b83..43b4c4624b 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -51,7 +51,7 @@ func Downsample( // Writes downsampled chunks right into the files, avoiding excess memory allocation. // Flushes index and meta data afterwards aggregations. - streamedBlockWriter, err := NewWriter(dir, logger, *origMeta, resolution) + streamedBlockWriter, err := NewWriter(dir, indexr, logger, *origMeta, resolution) if err != nil { return id, errors.Wrap(err, "get streamed block writer") } @@ -65,9 +65,10 @@ func Downsample( aggrChunks []*AggrChunk all []sample chks []chunks.Meta + lset labels.Labels ) for pall.Next() { - var lset labels.Labels + lset = lset[:0] chks = chks[:0] all = all[:0] aggrChunks = aggrChunks[:0] @@ -94,30 +95,28 @@ func Downsample( return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At()) } } - if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil { + if err := streamedBlockWriter.AddSeries(lset, downsampleRaw(all, resolution)); err != nil { return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At()) } - continue - } - - // Downsample a block that contains aggregate chunks already. - for _, c := range chks { - aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk)) - } - res, err := downsampleAggr( - aggrChunks, - &all, - chks[0].MinTime, - chks[len(chks)-1].MaxTime, - origMeta.Thanos.Downsample.Resolution, - resolution, - ) - - if err != nil { - return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At()) - } - if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: res}); err != nil { - return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At()) + } else { + // Downsample a block that contains aggregate chunks already. + for _, c := range chks { + aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk)) + } + downsampledChunks, err := downsampleAggr( + aggrChunks, + &all, + chks[0].MinTime, + chks[len(chks)-1].MaxTime, + origMeta.Thanos.Downsample.Resolution, + resolution, + ) + if err != nil { + return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At()) + } + if err := streamedBlockWriter.AddSeries(lset, downsampledChunks); err != nil { + return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At()) + } } } if pall.Err() != nil { @@ -126,7 +125,7 @@ func Downsample( id, err = streamedBlockWriter.Flush() if err != nil { - return id, errors.Wrap(err, "compact head") + return id, errors.Wrap(err, "flush data in stream data") } return id, nil @@ -502,11 +501,6 @@ type sample struct { v float64 } -type series struct { - lset labels.Labels - chunks []chunks.Meta -} - // CounterSeriesIterator iterates over an ordered sequence of chunks and treats decreasing // values as counter reset. // Additionally, it can deal with downsampled counter chunks, which set the last value of a chunk diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 9d4bb5e753..65e49831e0 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -392,6 +392,11 @@ type memBlock struct { numberOfChunks uint64 } +type series struct { + lset labels.Labels + chunks []chunks.Meta +} + func newMemBlock() *memBlock { return &memBlock{symbols: map[string]struct{}{}} } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 7ec60ec932..8e426ae4d4 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -2,10 +2,10 @@ package downsample import ( "encoding/json" + "io" "math/rand" "os" "path/filepath" - "sort" "time" "github.com/go-kit/kit/log" @@ -20,8 +20,6 @@ import ( "github.com/prometheus/tsdb/labels" ) -type symbols map[string]struct{} - type labelValues map[string]struct{} func (lv labelValues) add(value string) { @@ -50,32 +48,43 @@ func (lv labelsValues) add(labelSet labels.Labels) { // StreamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption // by means writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be -// flushed afterwards, when there aren't more series to process. +// sealed afterwards, when there aren't more series to process. type StreamedBlockWriter struct { dir string tmpDir string logger log.Logger uid ulid.ULID - symbols symbols - postings []uint64 - series []*series + // postings is a current posting position. + postings uint64 + + chunkWriter tsdb.ChunkWriter + indexWriter tsdb.IndexWriter + indexReader tsdb.IndexReader + closers []io.Closer - chunkWriter tsdb.ChunkWriter meta block.Meta totalChunks uint64 totalSamples uint64 + + // labelsValues list of used label sets: name -> []values. + labelsValues labelsValues + + // memPostings contains references from label name:value -> postings. + memPostings *index.MemPostings + sealed bool } -// NewWriter returns StreamedBlockWriter instance. Caller is responsible to finalize the writing with Flush method to write -// the meta and index file and Close all io.Closers -func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*StreamedBlockWriter, error) { +// NewWriter returns StreamedBlockWriter instance. +// Caller is responsible to finalize the writing with Flush method to write the meta and index file and Close all io.Closers +func NewWriter(dir string, indexReader tsdb.IndexReader, l log.Logger, originMeta block.Meta, resolution int64) (*StreamedBlockWriter, error) { var err error - var chunkWriter tsdb.ChunkWriter + + // change downsampling resolution to the new one. + originMeta.Thanos.Downsample.Resolution = resolution // Generate new block id. - entropy := rand.New(rand.NewSource(time.Now().UnixNano())) - uid := ulid.MustNew(ulid.Now(), entropy) + uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) // Populate chunk, meta and index files into temporary directory with // data of all blocks. @@ -85,93 +94,139 @@ func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64 return nil, err } - chunkWriter, err = chunks.NewWriter(filepath.Join(tmpDir, block.ChunksDirname)) + sw := &StreamedBlockWriter{ + logger: l, + dir: dir, + indexReader: indexReader, + tmpDir: tmpDir, + uid: uid, + meta: originMeta, + closers: make([]io.Closer, 0), + labelsValues: make(labelsValues, 1024), + memPostings: index.NewUnorderedMemPostings(), + } + + sw.chunkWriter, err = chunks.NewWriter(filepath.Join(tmpDir, block.ChunksDirname)) if err != nil { return nil, errors.Wrap(err, "create tmp chunk StreamedBlockWriter") } + sw.closers = append(sw.closers, sw.chunkWriter) - originMeta.Thanos.Downsample.Resolution = resolution + sw.indexWriter, err = index.NewWriter(filepath.Join(tmpDir, block.IndexFilename)) + if err != nil { + return nil, errors.Wrap(err, "open index StreamedBlockWriter") + } + sw.closers = append(sw.closers, sw.indexWriter) - return &StreamedBlockWriter{ - logger: l, - dir: dir, - tmpDir: tmpDir, - symbols: symbols{}, - chunkWriter: chunkWriter, - uid: uid, - meta: originMeta, - }, nil + if err := sw.init(); err != nil { + return nil, err + } + + return sw, nil } -func (w *StreamedBlockWriter) AddSeries(s *series) error { - if w.chunkWriter == nil { - panic("Series can't be added, ChunkWriter has been closed") +// AddSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to +// labelsValues sets and memPostings to be written on the Flush state in the end of downsampling process. +func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta) error { + if w.sealed { + panic("Series can't be added, writers has been flushed|closed") } - if len(s.chunks) == 0 { - level.Warn(w.logger).Log("empty chunks happened", s.lset) + + if len(chunks) == 0 { + level.Warn(w.logger).Log("empty chunks happened, skip series", lset) + return nil } - if err := w.chunkWriter.WriteChunks(s.chunks...); err != nil { + if err := w.chunkWriter.WriteChunks(chunks...); err != nil { return errors.Wrap(err, "add series") } - w.postings = append(w.postings, uint64(len(w.series))) - w.series = append(w.series, s) - - for _, l := range s.lset { - w.symbols[l.Name] = struct{}{} - w.symbols[l.Value] = struct{}{} + if err := w.indexWriter.AddSeries(w.postings, lset, chunks...); err != nil { + return errors.Wrap(err, "add series") } - w.totalChunks += uint64(len(s.chunks)) - for i := range s.chunks { - chk := &s.chunks[i] - w.totalSamples += uint64(chk.Chunk.NumSamples()) - chk.Chunk = nil + w.labelsValues.add(lset) + w.memPostings.Add(w.postings, lset) + w.postings++ + + w.totalChunks += uint64(len(chunks)) + for i := range chunks { + w.totalSamples += uint64(chunks[i].Chunk.NumSamples()) } return nil } // Flush saves prepared index and meta data to corresponding files. -// Be sure to call this, if all series have to be handled by this moment, as +// Be sure to call this, if all series have to be handled by this moment, you can't call AddSeries afterwards. func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { var err error + w.sealed = true - if err := w.chunkWriter.Close(); err != nil { - return w.uid, errors.Wrap(err, "close chunk writer") + if err = w.writeLabelSets(); err != nil { + return w.uid, errors.Wrap(err, "write label sets") } - w.chunkWriter = nil - indexw, err := index.NewWriter(filepath.Join(w.tmpDir, block.IndexFilename)) - if err != nil { - return w.uid, errors.Wrap(err, "open index StreamedBlockWriter") + if err = w.writeMemPostings(); err != nil { + return w.uid, errors.Wrap(err, "write mem postings") } - defer func() { - if indexw != nil { - if err := indexw.Close(); err != nil { - level.Error(w.logger).Log(err, "close index StreamedBlockWriter") - } - } - }() + if err = w.writeMetaFile(); err != nil { + return w.uid, errors.Wrap(err, "write meta meta") + } - if err := w.populateBlock(indexw); err != nil { - return w.uid, errors.Wrap(err, "write compaction") + if err = w.finalize(); err != nil { + return w.uid, errors.Wrap(err, "sync and rename tmp dir") } - if err = w.writeMetaFile(w.tmpDir); err != nil { - return w.uid, errors.Wrap(err, "write merged meta") + level.Info(w.logger).Log( + "msg", "write downsampled block", + "mint", w.meta.MinTime, + "maxt", w.meta.MaxTime, + "ulid", w.meta.ULID, + "resolution", w.meta.Thanos.Downsample.Resolution, + ) + return w.uid, nil +} + +// Close closes all io.CLoser writers +func (w *StreamedBlockWriter) Close() error { + var merr tsdb.MultiError + w.sealed = true + + if w.tmpDir != "" { + merr.Add(os.RemoveAll(w.tmpDir)) + } + + for _, cl := range w.closers { + merr.Add(cl.Close()) + } + + w.chunkWriter = nil + w.indexWriter = nil + + return errors.Wrap(merr.Err(), "close closers") +} + +// init writes all available symbols in the beginning of the index file. +func (w *StreamedBlockWriter) init() error { + symbols, err := w.indexReader.Symbols() + if err != nil { + return errors.Wrap(err, "read symbols") } - if err = indexw.Close(); err != nil { - return w.uid, errors.Wrap(err, "close index StreamedBlockWriter") + if err := w.indexWriter.AddSymbols(symbols); err != nil { + return errors.Wrap(err, "add symbols") } - indexw = nil + return nil +} + +// finalize sync tmp dir on disk and rename to dir. +func (w *StreamedBlockWriter) finalize() error { df, err := fileutil.OpenDir(w.tmpDir) if err != nil { - return w.uid, errors.Wrap(err, "open temporary block dir") + return errors.Wrap(err, "open temporary block dir") } defer func() { if df != nil { @@ -182,12 +237,12 @@ func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { }() if err := fileutil.Fsync(df); err != nil { - return w.uid, errors.Wrap(err, "sync temporary dir") + return errors.Wrap(err, "sync temporary dir") } // Close temp dir before rename block dir (for windows platform). if err = df.Close(); err != nil { - return w.uid, errors.Wrap(err, "close temporary dir") + return errors.Wrap(err, "close temporary dir") } df = nil @@ -196,67 +251,30 @@ func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { // Assume we cleaned tmp dir up w.tmpDir = "" if err != nil { - return w.uid, errors.Wrap(err, "rename block dir") + return errors.Wrap(err, "rename block dir") } - level.Info(w.logger).Log( - "msg", "write downsampled block", - "mint", w.meta.MinTime, - "maxt", w.meta.MaxTime, - "ulid", w.meta.ULID, - "resolution", w.meta.Thanos.Downsample.Resolution, - ) - return w.uid, nil + return nil } -// populateBlock fills the index and chunk writers with new data gathered as the union -// of the provided blocks. It returns meta information for the new block. -func (w *StreamedBlockWriter) populateBlock(indexWriter tsdb.IndexWriter) error { - var ( - i = uint64(0) - labelsValues = labelsValues{} - memPostings = index.NewUnorderedMemPostings() - ) - - if err := indexWriter.AddSymbols(w.symbols); err != nil { - return errors.Wrap(err, "add symbols") - } - - sort.Slice(w.postings, func(i, j int) bool { - return labels.Compare(w.series[w.postings[i]].lset, w.series[w.postings[j]].lset) < 0 - }) - - all := index.NewListPostings(w.postings) - for all.Next() { - s := w.series[all.At()] - // Skip the series with all deleted chunks. - if len(s.chunks) == 0 { - level.Info(w.logger).Log("empty chunks", i, s.lset) - continue - } - - if err := indexWriter.AddSeries(uint64(i), s.lset, s.chunks...); err != nil { - return errors.Wrap(err, "add series") - } - - labelsValues.add(s.lset) - memPostings.Add(i, s.lset) - i++ - } - +// writeLabelSets fills the index writer with label sets. +func (w *StreamedBlockWriter) writeLabelSets() error { s := make([]string, 0, 256) - for n, v := range labelsValues { + for n, v := range w.labelsValues { s = s[:0] v.get(&s) - if err := indexWriter.WriteLabelIndex([]string{n}, s); err != nil { + if err := w.indexWriter.WriteLabelIndex([]string{n}, s); err != nil { return errors.Wrap(err, "write label index") } } + return nil +} - memPostings.EnsureOrder() - - for _, l := range memPostings.SortedKeys() { - if err := indexWriter.WritePostings(l.Name, l.Value, memPostings.Get(l.Name, l.Value)); err != nil { +// writeMemPostings fills the index writer with mem postings. +func (w *StreamedBlockWriter) writeMemPostings() error { + w.memPostings.EnsureOrder() + for _, l := range w.memPostings.SortedKeys() { + if err := w.indexWriter.WritePostings(l.Name, l.Value, w.memPostings.Get(l.Name, l.Value)); err != nil { return errors.Wrap(err, "write postings") } } @@ -264,59 +282,37 @@ func (w *StreamedBlockWriter) populateBlock(indexWriter tsdb.IndexWriter) error } // TODO probably tsdb.BlockMeta should expose method writeToFile /w encode. -func (w *StreamedBlockWriter) writeMetaFile(dest string) error { +// writeMetaFile writes meta file +func (w *StreamedBlockWriter) writeMetaFile() error { + var merr tsdb.MultiError + w.meta.ULID = w.uid w.meta.Version = 1 w.meta.Thanos.Source = block.CompactorSource w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples - w.meta.Stats.NumSeries = uint64(len(w.series)) + w.meta.Stats.NumSeries = w.postings // Make any changes to the file appear atomic. - path := filepath.Join(dest, block.MetaFilename) - tmp := path + ".tmp" + path := filepath.Join(w.tmpDir, block.MetaFilename) - f, err := os.Create(tmp) + f, err := os.Create(path) if err != nil { - return errors.Wrapf(err, "create tmp meta file %s", tmp) + return errors.Wrapf(err, "create tmp meta file %s", path) } enc := json.NewEncoder(f) enc.SetIndent("", "\t") - var merr tsdb.MultiError - if merr.Add(enc.Encode(w.meta)); merr.Err() != nil { merr.Add(f.Close()) - return errors.Wrapf(merr.Err(), "encoding meta file to json %s", tmp) - } - if err := f.Close(); err != nil { - return errors.Wrapf(err, "close tmp meta file %s", tmp) - } - - if err := renameFile(tmp, path); err != nil { - return errors.Wrapf(err, "rename tmp meta file %s", tmp) - } - - return nil -} - -func (w *StreamedBlockWriter) Close() error { - var merr tsdb.MultiError - - if w.tmpDir != "" { - merr.Add(os.RemoveAll(w.tmpDir)) + return errors.Wrapf(merr.Err(), "encoding meta file to json %s", path) } - if w.chunkWriter != nil { - merr.Add(w.chunkWriter.Close()) - w.chunkWriter = nil - } + merr.Add(errors.Wrapf(fileutil.Fsync(f), "sync meta file %s", path)) + merr.Add(errors.Wrapf(f.Close(), "close meta file %s", path)) - if merr.Err() != nil { - return errors.Wrap(merr.Err(), "close chunk writer") - } - return nil + return merr.Err() } func renameFile(from, to string) error { From ff9c7767be9e8c685a1c7244a8412a37a2fbd178 Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Tue, 5 Feb 2019 22:48:18 +0000 Subject: [PATCH 5/6] fix nitpicks --- pkg/compact/downsample/downsample.go | 30 +++++++++---------- .../downsample/streamed_block_writer.go | 20 ++++++------- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 43b4c4624b..e8b08917e3 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -3,16 +3,14 @@ package downsample import ( "math" - "github.com/improbable-eng/thanos/pkg/block/metadata" - - "github.com/prometheus/prometheus/pkg/value" - "github.com/prometheus/tsdb/chunkenc" - "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -57,7 +55,7 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer") - pall, err := indexr.Postings(index.AllPostingsKey()) + postings, err := indexr.Postings(index.AllPostingsKey()) if err != nil { return id, errors.Wrap(err, "get all postings list") } @@ -67,7 +65,7 @@ func Downsample( chks []chunks.Meta lset labels.Labels ) - for pall.Next() { + for postings.Next() { lset = lset[:0] chks = chks[:0] all = all[:0] @@ -75,15 +73,15 @@ func Downsample( // Get series labels and chunks. Downsampled data is sensitive to chunk boundaries // and we need to preserve them to properly downsample previously downsampled data. - if err := indexr.Series(pall.At(), &lset, &chks); err != nil { - return id, errors.Wrapf(err, "get series %d", pall.At()) + if err := indexr.Series(postings.At(), &lset, &chks); err != nil { + return id, errors.Wrapf(err, "get series %d", postings.At()) } // While #183 exists, we sanitize the chunks we retrieved from the block // before retrieving their samples. for i, c := range chks { chk, err := chunkr.Chunk(c.Ref) if err != nil { - return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, pall.At()) + return id, errors.Wrapf(err, "get chunk %d, series %d", c.Ref, postings.At()) } chks[i].Chunk = chk } @@ -92,11 +90,11 @@ func Downsample( if origMeta.Thanos.Downsample.Resolution == 0 { for _, c := range chks { if err := expandChunkIterator(c.Chunk.Iterator(), &all); err != nil { - return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At()) + return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At()) } } if err := streamedBlockWriter.AddSeries(lset, downsampleRaw(all, resolution)); err != nil { - return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At()) + return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At()) } } else { // Downsample a block that contains aggregate chunks already. @@ -112,15 +110,15 @@ func Downsample( resolution, ) if err != nil { - return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At()) + return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At()) } if err := streamedBlockWriter.AddSeries(lset, downsampledChunks); err != nil { - return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At()) + return id, errors.Wrapf(err, "downsample aggregated block, series: %d", postings.At()) } } } - if pall.Err() != nil { - return id, errors.Wrap(pall.Err(), "iterate series set") + if postings.Err() != nil { + return id, errors.Wrap(postings.Err(), "iterate series set") } id, err = streamedBlockWriter.Flush() diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 8e426ae4d4..5d51345ebe 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb" @@ -63,7 +64,7 @@ type StreamedBlockWriter struct { indexReader tsdb.IndexReader closers []io.Closer - meta block.Meta + meta metadata.Meta totalChunks uint64 totalSamples uint64 @@ -77,9 +78,7 @@ type StreamedBlockWriter struct { // NewWriter returns StreamedBlockWriter instance. // Caller is responsible to finalize the writing with Flush method to write the meta and index file and Close all io.Closers -func NewWriter(dir string, indexReader tsdb.IndexReader, l log.Logger, originMeta block.Meta, resolution int64) (*StreamedBlockWriter, error) { - var err error - +func NewWriter(dir string, indexReader tsdb.IndexReader, l log.Logger, originMeta metadata.Meta, resolution int64) (*StreamedBlockWriter, error) { // change downsampling resolution to the new one. originMeta.Thanos.Downsample.Resolution = resolution @@ -129,7 +128,7 @@ func NewWriter(dir string, indexReader tsdb.IndexReader, l log.Logger, originMet // labelsValues sets and memPostings to be written on the Flush state in the end of downsampling process. func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta) error { if w.sealed { - panic("Series can't be added, writers has been flushed|closed") + return errors.Errorf("Series can't be added, writers has been flushed|closed") } if len(chunks) == 0 { @@ -160,22 +159,21 @@ func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta // Flush saves prepared index and meta data to corresponding files. // Be sure to call this, if all series have to be handled by this moment, you can't call AddSeries afterwards. func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { - var err error w.sealed = true - if err = w.writeLabelSets(); err != nil { + if err := w.writeLabelSets(); err != nil { return w.uid, errors.Wrap(err, "write label sets") } - if err = w.writeMemPostings(); err != nil { + if err := w.writeMemPostings(); err != nil { return w.uid, errors.Wrap(err, "write mem postings") } - if err = w.writeMetaFile(); err != nil { + if err := w.writeMetaFile(); err != nil { return w.uid, errors.Wrap(err, "write meta meta") } - if err = w.finalize(); err != nil { + if err := w.finalize(); err != nil { return w.uid, errors.Wrap(err, "sync and rename tmp dir") } @@ -288,7 +286,7 @@ func (w *StreamedBlockWriter) writeMetaFile() error { w.meta.ULID = w.uid w.meta.Version = 1 - w.meta.Thanos.Source = block.CompactorSource + w.meta.Thanos.Source = metadata.CompactorSource w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples w.meta.Stats.NumSeries = w.postings From 12ffbe3adaaa0ae8081aa7b7dc7d9795d2ebe261 Mon Sep 17 00:00:00 2001 From: Aleksei Semiglazov Date: Wed, 6 Feb 2019 02:00:16 +0000 Subject: [PATCH 6/6] downsampling: simplify StreamedBlockWriter interface Reduce of use public Flush method to finalize index and meta files. In case of error, a caller has to remove block directory with a preserved garbage inside. Rid of use tmp directories and renaming, syncing the final block on disk before upload. --- pkg/block/metadata/meta.go | 7 +- pkg/compact/downsample/downsample.go | 48 ++- pkg/compact/downsample/downsample_test.go | 9 +- .../downsample/streamed_block_writer.go | 309 +++++++----------- 4 files changed, 163 insertions(+), 210 deletions(-) diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 44f1d3768d..0d8b22dd19 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -35,6 +35,11 @@ const ( MetaFilename = "meta.json" ) +const ( + // MetaVersion is a enumeration of versions supported by Thanos. + MetaVersion1 = iota + 1 +) + // Meta describes the a block's meta. It wraps the known TSDB meta structure and // extends it by Thanos-specific fields. type Meta struct { @@ -135,7 +140,7 @@ func Read(dir string) (*Meta, error) { if err := json.Unmarshal(b, &m); err != nil { return nil, err } - if m.Version != 1 { + if m.Version != MetaVersion1 { return nil, errors.Errorf("unexpected meta file version %d", m.Version) } return &m, nil diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index e8b08917e3..2263d61359 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -2,6 +2,10 @@ package downsample import ( "math" + "math/rand" + "os" + "path/filepath" + "time" "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block/metadata" @@ -47,9 +51,33 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") + // Generate new block id. + uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) + + // Create block directory to populate with chunks, meta and index files into. + blockDir := filepath.Join(dir, uid.String()) + if err := os.MkdirAll(blockDir, 0777); err != nil { + return id, errors.Wrap(err, "mkdir block dir") + } + + // Remove blockDir in case of errors. + defer func() { + if err != nil { + var merr tsdb.MultiError + merr.Add(err) + merr.Add(os.RemoveAll(blockDir)) + err = merr.Err() + } + }() + + // Copy original meta to the new one. Update downsampling resolution and ULID for a new block. + newMeta := *origMeta + newMeta.Thanos.Downsample.Resolution = resolution + newMeta.ULID = uid + // Writes downsampled chunks right into the files, avoiding excess memory allocation. - // Flushes index and meta data afterwards aggregations. - streamedBlockWriter, err := NewWriter(dir, indexr, logger, *origMeta, resolution) + // Flushes index and meta data after aggregations. + streamedBlockWriter, err := NewStreamedBlockWriter(blockDir, indexr, logger, newMeta) if err != nil { return id, errors.Wrap(err, "get streamed block writer") } @@ -93,11 +121,11 @@ func Downsample( return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At()) } } - if err := streamedBlockWriter.AddSeries(lset, downsampleRaw(all, resolution)); err != nil { + if err := streamedBlockWriter.WriteSeries(lset, downsampleRaw(all, resolution)); err != nil { return id, errors.Wrapf(err, "downsample raw data, series: %d", postings.At()) } } else { - // Downsample a block that contains aggregate chunks already. + // Downsample a block that contains aggregated chunks already. for _, c := range chks { aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk)) } @@ -112,8 +140,8 @@ func Downsample( if err != nil { return id, errors.Wrapf(err, "downsample aggregate block, series: %d", postings.At()) } - if err := streamedBlockWriter.AddSeries(lset, downsampledChunks); err != nil { - return id, errors.Wrapf(err, "downsample aggregated block, series: %d", postings.At()) + if err := streamedBlockWriter.WriteSeries(lset, downsampledChunks); err != nil { + return id, errors.Wrapf(err, "write series: %d", postings.At()) } } } @@ -121,12 +149,8 @@ func Downsample( return id, errors.Wrap(postings.Err(), "iterate series set") } - id, err = streamedBlockWriter.Flush() - if err != nil { - return id, errors.Wrap(err, "flush data in stream data") - } - - return id, nil + id = uid + return } // currentWindow returns the end timestamp of the window that t falls into. diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 65e49831e0..d54d31f0e9 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -467,9 +467,16 @@ func (b *memBlock) Chunks() (tsdb.ChunkReader, error) { } func (b *memBlock) Tombstones() (tsdb.TombstoneReader, error) { - return tsdb.EmptyTombstoneReader(), nil + return emptyTombstoneReader{}, nil } func (b *memBlock) Close() error { return nil } + +type emptyTombstoneReader struct{} + +func (emptyTombstoneReader) Get(ref uint64) (tsdb.Intervals, error) { return nil, nil } +func (emptyTombstoneReader) Iter(func(uint64, tsdb.Intervals) error) error { return nil } +func (emptyTombstoneReader) Total() uint64 { return 0 } +func (emptyTombstoneReader) Close() error { return nil } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 5d51345ebe..2e2921ac34 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -1,18 +1,14 @@ package downsample import ( - "encoding/json" "io" - "math/rand" - "os" "path/filepath" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/block/metadata" - "github.com/oklog/ulid" + "github.com/improbable-eng/thanos/pkg/runutil" "github.com/pkg/errors" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunks" @@ -47,88 +43,95 @@ func (lv labelsValues) add(labelSet labels.Labels) { } } -// StreamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption -// by means writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be +// streamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption +// by writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be // sealed afterwards, when there aren't more series to process. -type StreamedBlockWriter struct { - dir string - tmpDir string - logger log.Logger - uid ulid.ULID - - // postings is a current posting position. - postings uint64 +type streamedBlockWriter struct { + blockDir string + finalized bool // set to true, if Close was called + logger log.Logger + ignoreFinalize bool // if true Close does not finalize block due to internal error. + meta metadata.Meta + totalChunks uint64 + totalSamples uint64 chunkWriter tsdb.ChunkWriter indexWriter tsdb.IndexWriter indexReader tsdb.IndexReader closers []io.Closer - meta metadata.Meta - totalChunks uint64 - totalSamples uint64 - - // labelsValues list of used label sets: name -> []values. - labelsValues labelsValues - - // memPostings contains references from label name:value -> postings. - memPostings *index.MemPostings - sealed bool + labelsValues labelsValues // labelsValues list of used label sets: name -> []values. + memPostings *index.MemPostings // memPostings contains references from label name:value -> postings. + postings uint64 // postings is a current posting position. } -// NewWriter returns StreamedBlockWriter instance. -// Caller is responsible to finalize the writing with Flush method to write the meta and index file and Close all io.Closers -func NewWriter(dir string, indexReader tsdb.IndexReader, l log.Logger, originMeta metadata.Meta, resolution int64) (*StreamedBlockWriter, error) { - // change downsampling resolution to the new one. - originMeta.Thanos.Downsample.Resolution = resolution - - // Generate new block id. - uid := ulid.MustNew(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano()))) +// NewStreamedBlockWriter returns streamedBlockWriter instance, it's not concurrency safe. +// Caller is responsible to Close all io.Closers by calling the Close when downsampling is done. +// In case if error happens outside of the StreamedBlockWriter during the processing, +// index and meta files will be written anyway, so the caller is always responsible for removing block directory with +// a garbage on error. +// This approach simplifies StreamedBlockWriter interface, which is a best trade-off taking into account the error is an +// exception, not a general case. +func NewStreamedBlockWriter( + blockDir string, + indexReader tsdb.IndexReader, + logger log.Logger, + originMeta metadata.Meta, +) (w *streamedBlockWriter, err error) { + closers := make([]io.Closer, 0, 2) + + // We should close any opened Closer up to an error. + defer func() { + if err != nil { + var merr tsdb.MultiError + merr.Add(err) + for _, cl := range closers { + merr.Add(cl.Close()) + } + err = merr.Err() + } + }() - // Populate chunk, meta and index files into temporary directory with - // data of all blocks. - dir = filepath.Join(dir, uid.String()) - tmpDir, err := createTmpDir(dir) + chunkWriter, err := chunks.NewWriter(filepath.Join(blockDir, block.ChunksDirname)) if err != nil { - return nil, err - } - - sw := &StreamedBlockWriter{ - logger: l, - dir: dir, - indexReader: indexReader, - tmpDir: tmpDir, - uid: uid, - meta: originMeta, - closers: make([]io.Closer, 0), - labelsValues: make(labelsValues, 1024), - memPostings: index.NewUnorderedMemPostings(), + return nil, errors.Wrap(err, "create chunk writer in streamedBlockWriter") } + closers = append(closers, chunkWriter) - sw.chunkWriter, err = chunks.NewWriter(filepath.Join(tmpDir, block.ChunksDirname)) + indexWriter, err := index.NewWriter(filepath.Join(blockDir, block.IndexFilename)) if err != nil { - return nil, errors.Wrap(err, "create tmp chunk StreamedBlockWriter") + return nil, errors.Wrap(err, "open index writer in streamedBlockWriter") } - sw.closers = append(sw.closers, sw.chunkWriter) + closers = append(closers, indexWriter) - sw.indexWriter, err = index.NewWriter(filepath.Join(tmpDir, block.IndexFilename)) + symbols, err := indexReader.Symbols() if err != nil { - return nil, errors.Wrap(err, "open index StreamedBlockWriter") + return nil, errors.Wrap(err, "read symbols") } - sw.closers = append(sw.closers, sw.indexWriter) - if err := sw.init(); err != nil { - return nil, err + err = indexWriter.AddSymbols(symbols) + if err != nil { + return nil, errors.Wrap(err, "add symbols") } - return sw, nil + return &streamedBlockWriter{ + logger: logger, + blockDir: blockDir, + indexReader: indexReader, + indexWriter: indexWriter, + chunkWriter: chunkWriter, + meta: originMeta, + closers: closers, + labelsValues: make(labelsValues, 1024), + memPostings: index.NewUnorderedMemPostings(), + }, nil } -// AddSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to -// labelsValues sets and memPostings to be written on the Flush state in the end of downsampling process. -func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta) error { - if w.sealed { - return errors.Errorf("Series can't be added, writers has been flushed|closed") +// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to +// labelsValues sets and memPostings to be written on the finalize state in the end of downsampling process. +func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Meta) error { + if w.finalized || w.ignoreFinalize { + return errors.Errorf("series can't be added, writers has been closed or internal error happened") } if len(chunks) == 0 { @@ -137,10 +140,12 @@ func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta } if err := w.chunkWriter.WriteChunks(chunks...); err != nil { - return errors.Wrap(err, "add series") + w.ignoreFinalize = true + return errors.Wrap(err, "add chunks") } if err := w.indexWriter.AddSeries(w.postings, lset, chunks...); err != nil { + w.ignoreFinalize = true return errors.Wrap(err, "add series") } @@ -156,25 +161,46 @@ func (w *StreamedBlockWriter) AddSeries(lset labels.Labels, chunks []chunks.Meta return nil } -// Flush saves prepared index and meta data to corresponding files. -// Be sure to call this, if all series have to be handled by this moment, you can't call AddSeries afterwards. -func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { - w.sealed = true +// Close calls finalizer to complete index and meta files and closes all io.CLoser writers. +// Idempotent. +func (w *streamedBlockWriter) Close() error { + if w.finalized { + return nil + } + + var merr tsdb.MultiError + w.finalized = true + + // Finalise data block only if there wasn't any internal errors. + if !w.ignoreFinalize { + merr.Add(w.finalize()) + } + + for _, cl := range w.closers { + merr.Add(cl.Close()) + } + + return errors.Wrap(merr.Err(), "close closers") +} +// finalize saves prepared index and meta data to corresponding files. +// It is called on Close. Even if an error happened outside of StreamWriter, it will finalize the block anyway, +// so it's a caller's responsibility to remove the block's directory. +func (w *streamedBlockWriter) finalize() error { if err := w.writeLabelSets(); err != nil { - return w.uid, errors.Wrap(err, "write label sets") + return errors.Wrap(err, "write label sets") } if err := w.writeMemPostings(); err != nil { - return w.uid, errors.Wrap(err, "write mem postings") + return errors.Wrap(err, "write mem postings") } if err := w.writeMetaFile(); err != nil { - return w.uid, errors.Wrap(err, "write meta meta") + return errors.Wrap(err, "write meta meta") } - if err := w.finalize(); err != nil { - return w.uid, errors.Wrap(err, "sync and rename tmp dir") + if err := w.syncDir(); err != nil { + return errors.Wrap(err, "sync blockDir") } level.Info(w.logger).Log( @@ -184,79 +210,27 @@ func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { "ulid", w.meta.ULID, "resolution", w.meta.Thanos.Downsample.Resolution, ) - return w.uid, nil -} - -// Close closes all io.CLoser writers -func (w *StreamedBlockWriter) Close() error { - var merr tsdb.MultiError - w.sealed = true - - if w.tmpDir != "" { - merr.Add(os.RemoveAll(w.tmpDir)) - } - - for _, cl := range w.closers { - merr.Add(cl.Close()) - } - - w.chunkWriter = nil - w.indexWriter = nil - - return errors.Wrap(merr.Err(), "close closers") -} - -// init writes all available symbols in the beginning of the index file. -func (w *StreamedBlockWriter) init() error { - symbols, err := w.indexReader.Symbols() - if err != nil { - return errors.Wrap(err, "read symbols") - } - - if err := w.indexWriter.AddSymbols(symbols); err != nil { - return errors.Wrap(err, "add symbols") - } - return nil } -// finalize sync tmp dir on disk and rename to dir. -func (w *StreamedBlockWriter) finalize() error { - df, err := fileutil.OpenDir(w.tmpDir) +// syncDir syncs blockDir on disk. +func (w *streamedBlockWriter) syncDir() (err error) { + df, err := fileutil.OpenDir(w.blockDir) if err != nil { - return errors.Wrap(err, "open temporary block dir") - } - defer func() { - if df != nil { - if err := df.Close(); err != nil { - log.Logger(w.logger).Log(err, "close temporary block dir") - } - } - }() - - if err := fileutil.Fsync(df); err != nil { - return errors.Wrap(err, "sync temporary dir") + return errors.Wrap(err, "open temporary block blockDir") } - // Close temp dir before rename block dir (for windows platform). - if err = df.Close(); err != nil { - return errors.Wrap(err, "close temporary dir") - } - df = nil + defer runutil.CloseWithErrCapture(w.logger, &err, df, "close temporary block blockDir") - // Block successfully written, make visible and remove old ones. - err = renameFile(w.tmpDir, w.dir) - // Assume we cleaned tmp dir up - w.tmpDir = "" - if err != nil { - return errors.Wrap(err, "rename block dir") + if err := fileutil.Fsync(df); err != nil { + return errors.Wrap(err, "sync temporary blockDir") } return nil } // writeLabelSets fills the index writer with label sets. -func (w *StreamedBlockWriter) writeLabelSets() error { +func (w *streamedBlockWriter) writeLabelSets() error { s := make([]string, 0, 256) for n, v := range w.labelsValues { s = s[:0] @@ -269,7 +243,7 @@ func (w *StreamedBlockWriter) writeLabelSets() error { } // writeMemPostings fills the index writer with mem postings. -func (w *StreamedBlockWriter) writeMemPostings() error { +func (w *streamedBlockWriter) writeMemPostings() error { w.memPostings.EnsureOrder() for _, l := range w.memPostings.SortedKeys() { if err := w.indexWriter.WritePostings(l.Name, l.Value, w.memPostings.Get(l.Name, l.Value)); err != nil { @@ -279,70 +253,13 @@ func (w *StreamedBlockWriter) writeMemPostings() error { return nil } -// TODO probably tsdb.BlockMeta should expose method writeToFile /w encode. -// writeMetaFile writes meta file -func (w *StreamedBlockWriter) writeMetaFile() error { - var merr tsdb.MultiError - - w.meta.ULID = w.uid - w.meta.Version = 1 +// writeMetaFile writes meta file. +func (w *streamedBlockWriter) writeMetaFile() error { + w.meta.Version = metadata.MetaVersion1 w.meta.Thanos.Source = metadata.CompactorSource w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples w.meta.Stats.NumSeries = w.postings - // Make any changes to the file appear atomic. - path := filepath.Join(w.tmpDir, block.MetaFilename) - - f, err := os.Create(path) - if err != nil { - return errors.Wrapf(err, "create tmp meta file %s", path) - } - - enc := json.NewEncoder(f) - enc.SetIndent("", "\t") - - if merr.Add(enc.Encode(w.meta)); merr.Err() != nil { - merr.Add(f.Close()) - return errors.Wrapf(merr.Err(), "encoding meta file to json %s", path) - } - - merr.Add(errors.Wrapf(fileutil.Fsync(f), "sync meta file %s", path)) - merr.Add(errors.Wrapf(f.Close(), "close meta file %s", path)) - - return merr.Err() -} - -func renameFile(from, to string) error { - if err := os.RemoveAll(to); err != nil { - return err - } - if err := os.Rename(from, to); err != nil { - return err - } - - // Directory was renamed; sync parent dir to persist rename. - pdir, err := fileutil.OpenDir(filepath.Dir(to)) - if err != nil { - return err - } - - var merr tsdb.MultiError - merr.Add(fileutil.Fsync(pdir)) - merr.Add(pdir.Close()) - return merr.Err() -} - -func createTmpDir(parent string) (string, error) { - tmp := parent + ".tmp" - - if err := os.RemoveAll(tmp); err != nil { - return "", errors.Wrap(err, "removing tmp dir") - } - - if err := os.MkdirAll(tmp, 0777); err != nil { - return "", errors.Wrap(err, "mkdir tmp dir") - } - - return tmp, nil + return metadata.Write(w.logger, w.blockDir, &w.meta) }