Skip to content

Commit

Permalink
compact: clarify purpose of streamed block writer
Browse files Browse the repository at this point in the history
Add comments and close resources properly.
  • Loading branch information
xjewer committed Oct 9, 2018
1 parent e8c13e0 commit 3546c3d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 41 deletions.
17 changes: 7 additions & 10 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,13 @@ func Downsample(
}
defer runutil.CloseWithErrCapture(logger, &err, chunkr, "downsample chunk reader")

// NewWriter downsampled data and puts chunks immediately into files, allow save lot of memory of aggregated data.
// Writes downsampled chunks right into the files, avoiding excess memory allocation.
// Flushes index and meta data afterwards aggregations.
// This is necessary since we need to inject special values at the end of chunks for
// some aggregations.
writer, err := NewWriter(dir, logger, *origMeta, resolution)
defer runutil.CloseWithErrCapture(logger, &err, writer, "downsample instant writer")

streamedBlockWriter, err := NewWriter(dir, logger, *origMeta, resolution)
if err != nil {
return id, errors.Wrap(err, "get instantWriter")
return id, errors.Wrap(err, "get streamed block writer")
}
defer runutil.CloseWithErrCapture(logger, &err, streamedBlockWriter, "close stream block writer")

pall, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
Expand Down Expand Up @@ -96,7 +93,7 @@ func Downsample(
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, pall.At())
}
}
if err := writer.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil {
if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: downsampleRaw(all, resolution)}); err != nil {
return id, errors.Wrapf(err, "downsample raw data, series: %d", pall.At())
}
continue
Expand All @@ -117,15 +114,15 @@ func Downsample(
if err != nil {
return id, errors.Wrapf(err, "downsample aggregate block, series: %d", pall.At())
}
if err := writer.AddSeries(&series{lset: lset, chunks: res}); err != nil {
if err := streamedBlockWriter.AddSeries(&series{lset: lset, chunks: res}); err != nil {
return id, errors.Wrapf(err, "downsample aggregated block, series: %d", pall.At())
}
}
if pall.Err() != nil {
return id, errors.Wrap(pall.Err(), "iterate series set")
}

id, err = writer.Flush()
id, err = streamedBlockWriter.Flush()
if err != nil {
return id, errors.Wrap(err, "compact head")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (it *sampleIterator) At() (t int64, v float64) {
}

// memBlock is an in-memory block that implements a subset of the tsdb.BlockReader interface
// to allow tsdb.instantWriter to persist the data as a block.
// to allow tsdb.StreamedBlockWriter to persist the data as a block.
type memBlock struct {
// Dummies to implement unused methods.
tsdb.IndexReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
)

type symbols map[string]struct{}

type labelValues map[string]struct{}

func (lv labelValues) add(value string) {
lv[value] = struct{}{}
}

func (lv labelValues) get(set *[]string) {
for value := range lv {
*set = append(*set, value)
Expand All @@ -46,14 +48,14 @@ func (lv labelsValues) add(labelSet labels.Labels) {
}
}

// InstantWriter writes downsampled block to a new data block. Chunks will be written immediately in order to avoid
// memory consumption.
type instantWriter struct {
dir string
tmpDir string
logger log.Logger
uid ulid.ULID
resolution int64
// StreamedBlockWriter writes downsampled blocks to a new data block. Implemented to save memory consumption
// by means writing chunks data right into the files, omitting keeping them in-memory. Index and meta data should be
// flushed afterwards, when there aren't more series to process.
type StreamedBlockWriter struct {
dir string
tmpDir string
logger log.Logger
uid ulid.ULID

symbols symbols
postings []uint64
Expand All @@ -65,11 +67,13 @@ type instantWriter struct {
totalSamples uint64
}

func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*instantWriter, error) {
// NewWriter returns StreamedBlockWriter instance. Caller is responsible to finalize the writing with Flush method to write
// the meta and index file and Close all io.Closers
func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64) (*StreamedBlockWriter, error) {
var err error
var chunkWriter tsdb.ChunkWriter

// Generate new block id
// Generate new block id.
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)

Expand All @@ -81,30 +85,30 @@ func NewWriter(dir string, l log.Logger, originMeta block.Meta, resolution int64
return nil, err
}

chunkDir := func(dir string) string {
return filepath.Join(dir, block.ChunksDirname)
}

chunkWriter, err = chunks.NewWriter(chunkDir(tmpDir))
chunkWriter, err = chunks.NewWriter(filepath.Join(tmpDir, block.ChunksDirname))
if err != nil {
return nil, errors.Wrap(err, "create tmp chunk instantWriter")
return nil, errors.Wrap(err, "create tmp chunk StreamedBlockWriter")
}

return &instantWriter{
originMeta.Thanos.Downsample.Resolution = resolution

return &StreamedBlockWriter{
logger: l,
dir: dir,
tmpDir: tmpDir,
symbols: symbols{},
chunkWriter: chunkWriter,
uid: uid,
meta: originMeta,
resolution: resolution,
}, nil
}

func (w *instantWriter) AddSeries(s *series) error {
func (w *StreamedBlockWriter) AddSeries(s *series) error {
if w.chunkWriter == nil {
panic("Series can't be added, ChunkWriter has been closed")
}
if len(s.chunks) == 0 {
level.Info(w.logger).Log("empty chunks happened", s.lset)
level.Warn(w.logger).Log("empty chunks happened", s.lset)
}

if err := w.chunkWriter.WriteChunks(s.chunks...); err != nil {
Expand All @@ -129,20 +133,29 @@ func (w *instantWriter) AddSeries(s *series) error {
return nil
}

func (w *instantWriter) Flush() (ulid.ULID, error) {
// Flush saves prepared index and meta data to corresponding files.
// Be sure to call this, if all series have to be handled by this moment, as
func (w *StreamedBlockWriter) Flush() (ulid.ULID, error) {
var err error

// All the chunks have been written by this moment, can close writer.
if err := w.chunkWriter.Close(); err != nil {
return w.uid, errors.Wrap(err, "close chunk writer")
}
w.chunkWriter = nil

indexw, err := index.NewWriter(filepath.Join(w.tmpDir, block.IndexFilename))
if err != nil {
return w.uid, errors.Wrap(err, "open index instantWriter")
return w.uid, errors.Wrap(err, "open index StreamedBlockWriter")
}

defer func() {
if indexw != nil {
if err := indexw.Close(); err != nil {
level.Error(w.logger).Log(err, "close index StreamedBlockWriter")
}
}
}()

if err := w.populateBlock(indexw); err != nil {
return w.uid, errors.Wrap(err, "write compaction")
}
Expand All @@ -152,8 +165,9 @@ func (w *instantWriter) Flush() (ulid.ULID, error) {
}

if err = indexw.Close(); err != nil {
return w.uid, errors.Wrap(err, "close index instantWriter")
return w.uid, errors.Wrap(err, "close index StreamedBlockWriter")
}
indexw = nil

df, err := fileutil.OpenDir(w.tmpDir)
if err != nil {
Expand Down Expand Up @@ -197,7 +211,7 @@ func (w *instantWriter) Flush() (ulid.ULID, error) {

// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error {
func (w *StreamedBlockWriter) populateBlock(indexWriter tsdb.IndexWriter) error {
var (
i = uint64(0)
labelsValues = labelsValues{}
Expand All @@ -213,9 +227,7 @@ func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error {
})

all := index.NewListPostings(w.postings)
// all := w.postings.All()
for all.Next() {
// i := all.At()
s := w.series[i]
// Skip the series with all deleted chunks.
if len(s.chunks) == 0 {
Expand Down Expand Up @@ -252,11 +264,10 @@ func (w *instantWriter) populateBlock(indexWriter tsdb.IndexWriter) error {
}

// TODO probably tsdb.BlockMeta should expose method writeToFile /w encode.
func (w *instantWriter) writeMetaFile(dest string) error {
func (w *StreamedBlockWriter) writeMetaFile(dest string) error {
w.meta.ULID = w.uid
w.meta.Version = 1
w.meta.Thanos.Source = block.CompactorSource
w.meta.Thanos.Downsample.Resolution = w.resolution
w.meta.Stats.NumChunks = w.totalChunks
w.meta.Stats.NumSamples = w.totalSamples
w.meta.Stats.NumSeries = uint64(len(w.series))
Expand Down Expand Up @@ -290,7 +301,7 @@ func (w *instantWriter) writeMetaFile(dest string) error {
return nil
}

func (w *instantWriter) Close() error {
func (w *StreamedBlockWriter) Close() error {
var merr tsdb.MultiError

if w.tmpDir != "" {
Expand All @@ -299,6 +310,7 @@ func (w *instantWriter) Close() error {

if w.chunkWriter != nil {
merr.Add(w.chunkWriter.Close())
w.chunkWriter = nil
}

if merr.Err() != nil {
Expand Down

0 comments on commit 3546c3d

Please sign in to comment.