diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 1628cd3999c..c2cff7e09bd 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -48,16 +48,13 @@ func Downsample( } defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader") - // NewWriter downsampled data and puts chunks immediately into files, allow save lot of memory of aggregated data. + // Writes downsampled chunks right into the files, avoiding excess memory allocation. // Flushes index and meta data afterwards aggregations. - // This is necessary since we need to inject special values at the end of chunks for - // some aggregations. - writer, err := NewWriter(dir, logger, *origMeta, resolution) - defer runutil.CloseWithErrCapture(logger, &err, writer, "downsample instant writer") - + streamedBlockWriter, err := NewWriter(dir, logger, *origMeta, resolution) if err != nil { - return id, errors.Wrap(err, "get instantWriter") + return id, errors.Wrap(err, "get streamed block writer") } + defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer") pall, err := indexr.Postings(index.AllPostingsKey()) if err != nil { @@ -96,7 +93,7 @@ func Downsample( return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At()) } } - if err := writer.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil { + if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil { return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At()) } continue @@ -117,7 +114,7 @@ func Downsample( if err != nil { return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At()) } - if err := writer.AddSeries(&series{lset: lset, chunks: res}); err != nil { + if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: res}); err != nil { return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At()) } } @@ -125,7 +122,7 @@ func Downsample( return id, errors.Wrap(pall.Err(), "iterate series set") } - id, err = writer.Flush() + id, err = streamedBlockWriter.Flush() if err != nil { return id, errors.Wrap(err, "compact head") } diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index f550d1191ed..3df038d5a43 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -376,7 +376,7 @@ func (it *sampleIterator) At() (t int64, v float64) { } // memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface -// to allow tsdb.instantWriter to persist the data as a block. +// to allow tsdb.StreamedBlockWriter to persist the data as a block. type memBlock struct { // Dummies to implement unused methods. tsdb.IndexReader diff --git a/pkg/compact/downsample/writer.go b/pkg/compact/downsample/streamed_block_writer.go similarity index 79% rename from pkg/compact/downsample/writer.go rename to pkg/compact/downsample/streamed_block_writer.go index b7c13469f9e..c251c5ca150 100644 --- a/pkg/compact/downsample/writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -21,11 +21,13 @@ import ( ) type symbols map[string]struct{} + type labelValues map[string]struct{} func (lv labelValues) add(value string) { lv[value] = struct{}{} } + func (lv labelValues) get(set *[]string) { for value := range lv { *set = append(*set, value) @@ -46,14 +48,14 @@ func (lv labelsValues) add(labelSet labels.Labels) { } } -// InstantWriter writes downsampled block to a new data block. Chunks will be written immediately in order to avoid -// memory consumption. -type instantWriter struct { - dir string - tmpDir string - logger log.Logger - uid ulid.ULID - resolution int64 +// StreamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption +// by means writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be +// flushed afterwards, when there aren't more series to process. +type StreamedBlockWriter struct { + dir string + tmpDir string + logger log.Logger + uid ulid.ULID symbols symbols postings []uint64 @@ -65,11 +67,13 @@ type instantWriter struct { totalSamples uint64 } -func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*instantWriter, error) { +// NewWriter returns StreamedBlockWriter instance. Caller is responsible to finalize the writing with Flush method to write +// the meta and index file and Close all io.Closers +func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*StreamedBlockWriter, error) { var err error var chunkWriter tsdb.ChunkWriter - // Generate new block id + // Generate new block id. entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -81,16 +85,14 @@ func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64 return nil, err } - chunkDir := func(dir string) string { - return filepath.Join(dir, block.ChunksDirname) - } - - chunkWriter, err = chunks.NewWriter(chunkDir(tmpDir)) + chunkWriter, err = chunks.NewWriter(filepath.Join(tmpDir, block.ChunksDirname)) if err != nil { - return nil, errors.Wrap(err, "create tmp chunk instantWriter") + return nil, errors.Wrap(err, "create tmp chunk StreamedBlockWriter") } - return &instantWriter{ + originMeta.Thanos.Downsample.Resolution = resolution + + return &StreamedBlockWriter{ logger: l, dir: dir, tmpDir: tmpDir, @@ -98,13 +100,15 @@ func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64 chunkWriter: chunkWriter, uid: uid, meta: originMeta, - resolution: resolution, }, nil } -func (w *instantWriter) AddSeries(s *series) error { +func (w *StreamedBlockWriter) AddSeries(s *series) error { + if w.chunkWriter == nil { + panic("Series can't be added, ChunkWriter has been closed") + } if len(s.chunks) == 0 { - level.Info(w.logger).Log("empty chunks happened", s.lset) + level.Warn(w.logger).Log("empty chunks happened", s.lset) } if err := w.chunkWriter.WriteChunks(s.chunks...); err != nil { @@ -129,10 +133,11 @@ func (w *instantWriter) AddSeries(s *series) error { return nil } -func (w *instantWriter) Flush() (ulid.ULID, error) { +// Flush saves prepared index and meta data to corresponding files. +// Be sure to call this, if all series have to be handled by this moment, as +func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) { var err error - // All the chunks have been written by this moment, can close writer. if err := w.chunkWriter.Close(); err != nil { return w.uid, errors.Wrap(err, "close chunk writer") } @@ -140,9 +145,17 @@ func (w *instantWriter) Flush() (ulid.ULID, error) { indexw, err := index.NewWriter(filepath.Join(w.tmpDir, block.IndexFilename)) if err != nil { - return w.uid, errors.Wrap(err, "open index instantWriter") + return w.uid, errors.Wrap(err, "open index StreamedBlockWriter") } + defer func() { + if indexw != nil { + if err := indexw.Close(); err != nil { + level.Error(w.logger).Log(err, "close index StreamedBlockWriter") + } + } + }() + if err := w.populateBlock(indexw); err != nil { return w.uid, errors.Wrap(err, "write compaction") } @@ -152,8 +165,9 @@ func (w *instantWriter) Flush() (ulid.ULID, error) { } if err = indexw.Close(); err != nil { - return w.uid, errors.Wrap(err, "close index instantWriter") + return w.uid, errors.Wrap(err, "close index StreamedBlockWriter") } + indexw = nil df, err := fileutil.OpenDir(w.tmpDir) if err != nil { @@ -197,7 +211,7 @@ func (w *instantWriter) Flush() (ulid.ULID, error) { // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { +func (w *StreamedBlockWriter) populateBlock(indexWriter tsdb.IndexWriter) error { var ( i = uint64(0) labelsValues = labelsValues{} @@ -213,9 +227,7 @@ func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { }) all := index.NewListPostings(w.postings) - // all := w.postings.All() for all.Next() { - // i := all.At() s := w.series[i] // Skip the series with all deleted chunks. if len(s.chunks) == 0 { @@ -252,11 +264,10 @@ func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error { } // TODO probably tsdb.BlockMeta should expose method writeToFile /w encode. -func (w *instantWriter) writeMetaFile(dest string) error { +func (w *StreamedBlockWriter) writeMetaFile(dest string) error { w.meta.ULID = w.uid w.meta.Version = 1 w.meta.Thanos.Source = block.CompactorSource - w.meta.Thanos.Downsample.Resolution = w.resolution w.meta.Stats.NumChunks = w.totalChunks w.meta.Stats.NumSamples = w.totalSamples w.meta.Stats.NumSeries = uint64(len(w.series)) @@ -290,7 +301,7 @@ func (w *instantWriter) writeMetaFile(dest string) error { return nil } -func (w *instantWriter) Close() error { +func (w *StreamedBlockWriter) Close() error { var merr tsdb.MultiError if w.tmpDir != "" { @@ -299,6 +310,7 @@ func (w *instantWriter) Close() error { if w.chunkWriter != nil { merr.Add(w.chunkWriter.Close()) + w.chunkWriter = nil } if merr.Err() != nil {