diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index a7f3fff550e..022fc0af697 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -2,17 +2,22 @@ package main import ( "context" + "encoding/json" "fmt" "os" "path" + "path/filepath" "strconv" "strings" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" + "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/compact/downsample" + "github.com/improbable-eng/thanos/pkg/objstore" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/run" @@ -87,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). Short('w').Bool() + generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "Process indices' cache, upload them to object store and update metas."). + Hidden().Default("false").Bool() + // TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed. disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+ "as querying long time ranges without non-downsampled data is not efficient and not useful (is not possible to render all for human eye)."). @@ -110,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri *haltOnError, *acceptMalformedIndex, *wait, + *generateMissingIndexCacheFiles, map[compact.ResolutionLevel]time.Duration{ compact.ResolutionLevelRaw: time.Duration(*retentionRaw), compact.ResolutionLevel5m: time.Duration(*retention5m), @@ -135,6 +144,7 @@ func runCompact( haltOnError bool, acceptMalformedIndex bool, wait bool, + generateMissingIndexCacheFiles bool, retentionByResolution map[compact.ResolutionLevel]time.Duration, component string, disableDownsampling bool, @@ -197,6 +207,7 @@ func runCompact( var ( compactDir = path.Join(dataDir, "compact") downsamplingDir = path.Join(dataDir, "downsample") + indexCacheDir = path.Join(dataDir, "index_cache") ) if err := os.RemoveAll(downsamplingDir); err != nil { @@ -255,6 +266,13 @@ func runCompact( g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + // Generate index file + if generateMissingIndexCacheFiles { + if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil { + return err + } + } + if !wait { return f() } @@ -300,3 +318,121 @@ func runCompact( level.Info(logger).Log("msg", "starting compact node") return nil } + +// genMissingIndexCacheFiles generates missing index cache files and uploads them to object storage. +func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error { + if err := os.RemoveAll(dir); err != nil { + return errors.Wrap(err, "clean index cache directory") + } + if err := os.MkdirAll(dir, 0777); err != nil { + return errors.Wrap(err, "create dir") + } + + defer func() { + if err := os.RemoveAll(dir); err != nil { + level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err) + } + }() + + level.Info(logger).Log("msg", "start index cache processing") + + var ( + metas []*metadata.Meta + ) + + err := bkt.Iter(ctx, "", func(name string) error { + id, ok := block.IsBlockDir(name) + if !ok { + return nil + } + + rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename)) + if err != nil { + // Probably not finished block, skip it. + if bkt.IsObjNotFoundErr(err) { + level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String()) + return nil + } + return errors.Wrapf(err, "get meta for block %s", id) + } + defer runutil.CloseWithLogOnErr(logger, rc, "block reader") + + var meta metadata.Meta + if err := json.NewDecoder(rc).Decode(&meta); err != nil { + return errors.Wrap(err, "decode meta") + } + + // New version of compactor pushes index cache along with data block. + // Skip uncompacted blocks. + if meta.Compaction.Level == 1 { + return nil + } + + metas = append(metas, &meta) + + return nil + }) + if err != nil { + return errors.Wrap(err, "retrieve bucket block metas") + } + + for _, meta := range metas { + if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil { + return err + } + } + + level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`") + return nil +} + +func generateIndexCacheFile( + ctx context.Context, + bkt objstore.Bucket, + logger log.Logger, + indexCacheDir string, + meta *metadata.Meta, +) error { + id := meta.ULID + + bdir := filepath.Join(indexCacheDir, id.String()) + if err := os.MkdirAll(bdir, 0777); err != nil { + return errors.Wrap(err, "create block dir") + } + + defer func() { + if err := os.Remove(bdir); err != nil { + level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err) + } + }() + + cachePath := filepath.Join(bdir, block.IndexCacheFilename) + cache := path.Join(meta.ULID.String(), block.IndexCacheFilename) + + ok, err := objstore.Exists(ctx, bkt, cache) + if ok { + return nil + } + if err != nil { + return errors.Wrapf(err, "attempt to check if a cached index file exists") + } + + level.Debug(logger).Log("msg", "make index cache", "block", id) + + // Try to download index file from obj store. + indexPath := filepath.Join(bdir, block.IndexFilename) + index := path.Join(id.String(), block.IndexFilename) + + if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil { + return errors.Wrap(err, "download index file") + } + + if err := block.WriteIndexCache(logger, indexPath, cachePath); err != nil { + return errors.Wrap(err, "write index cache") + } + + if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil { + return errors.Wrap(err, "upload index cache") + } + return nil +} diff --git a/pkg/block/block.go b/pkg/block/block.go index 008e1798d4b..5c1033a7d8d 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -25,6 +25,8 @@ const ( MetaFilename = "meta.json" // IndexFilename is the known index file for block index. IndexFilename = "index" + // IndexCacheFilename is the canonical name for index cache file that stores essential information needed. + IndexCacheFilename = "index.cache.json" // ChunksDirname is the known dir name for chunks with compressed samples. ChunksDirname = "chunks" @@ -93,6 +95,12 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st return cleanUp(bkt, id, errors.Wrap(err, "upload index")) } + if meta.Thanos.Source == metadata.CompactorSource { + if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil { + return cleanUp(bkt, id, errors.Wrap(err, "upload index cache")) + } + } + // Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file // to be pending uploads. if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil { diff --git a/pkg/block/index.go b/pkg/block/index.go index 777654f2e6b..41294e76f28 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -26,8 +26,10 @@ import ( "github.com/prometheus/tsdb/labels" ) -// IndexCacheFilename is the canonical name for index cache files. -const IndexCacheFilename = "index.cache.json" +const ( + // IndexCacheVersion is a enumeration of index cache versions supported by Thanos. + IndexCacheVersion1 = iota + 1 +) type postingsRange struct { Name, Value string @@ -35,10 +37,11 @@ type postingsRange struct { } type indexCache struct { - Version int - Symbols map[uint32]string - LabelValues map[string][]string - Postings []postingsRange + Version int + CacheVersion int + Symbols map[uint32]string + LabelValues map[string][]string + Postings []postingsRange } type realByteSlice []byte @@ -112,9 +115,10 @@ func WriteIndexCache(logger log.Logger, indexFn string, fn string) error { defer runutil.CloseWithLogOnErr(logger, f, "index cache writer") v := indexCache{ - Version: indexr.Version(), - Symbols: symbols, - LabelValues: map[string][]string{}, + Version: indexr.Version(), + CacheVersion: IndexCacheVersion1, + Symbols: symbols, + LabelValues: map[string][]string{}, } // Extract label value indices. diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index 0d8b22dd190..765c3533db4 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -36,7 +36,7 @@ const ( ) const ( - // MetaVersion is a enumeration of versions supported by Thanos. + // MetaVersion is a enumeration of meta versions supported by Thanos. MetaVersion1 = iota + 1 ) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 2de928ab6a0..dc0ab1682bf 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -3,6 +3,7 @@ package compact import ( "context" "fmt" + "io/ioutil" "os" "path/filepath" "sort" @@ -11,8 +12,6 @@ import ( "github.com/improbable-eng/thanos/pkg/block/metadata" - "io/ioutil" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -61,6 +60,9 @@ type syncerMetrics struct { garbageCollectionDuration prometheus.Histogram compactions *prometheus.CounterVec compactionFailures *prometheus.CounterVec + indexCacheBlocks prometheus.Counter + indexCacheTraverse prometheus.Counter + indexCacheFailures prometheus.Counter } func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { @@ -535,7 +537,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( cg.compactionFailures.Inc() } cg.compactions.Inc() - return shouldRerun, compID, err } @@ -813,6 +814,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) bdir := filepath.Join(dir, compID.String()) + index := filepath.Join(bdir, block.IndexFilename) + indexCache := filepath.Join(bdir, block.IndexCacheFilename) newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{ Labels: cg.labels.Map(), @@ -828,7 +831,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { + if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } @@ -837,6 +840,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } + if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil { + return false, ulid.ULID{}, errors.Wrap(err, "write index cache") + } + begin = time.Now() if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { @@ -888,7 +895,14 @@ type BucketCompactor struct { } // NewBucketCompactor creates a new bucket compactor. -func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) { +func NewBucketCompactor( + logger log.Logger, + sy *Syncer, + comp tsdb.Compactor, + compactDir string, + bkt objstore.Bucket, + concurrency int, +) (*BucketCompactor, error) { if concurrency <= 0 { return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0") } diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 2e2921ac344..16213c0cd01 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -195,6 +195,10 @@ func (w *streamedBlockWriter) finalize() error { return errors.Wrap(err, "write mem postings") } + if err := w.writeIndexCache(); err != nil { + return errors.Wrap(err, "write index cache") + } + if err := w.writeMetaFile(); err != nil { return errors.Wrap(err, "write meta meta") } @@ -253,6 +257,16 @@ func (w *streamedBlockWriter) writeMemPostings() error { return nil } +func (w *streamedBlockWriter) writeIndexCache() error { + indexFile := filepath.Join(w.blockDir, block.IndexFilename) + indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename) + if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil { + return errors.Wrap(err, "write index cache") + } + + return nil +} + // writeMetaFile writes meta file. func (w *streamedBlockWriter) writeMetaFile() error { w.meta.Version = metadata.MetaVersion1 diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 4b241e972a1..81d52d7d457 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -106,7 +106,7 @@ func DeleteDir(ctx context.Context, bkt Bucket, dir string) error { // DownloadFile downloads the src file from the bucket to dst. If dst is an existing // directory, a file with the same name as the source is created in dst. // If destination file is already existing, download file will overwrite it. -func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) error { +func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, dst string) (err error) { if fi, err := os.Stat(dst); err == nil { if fi.IsDir() { dst = filepath.Join(dst, filepath.Base(src)) @@ -125,8 +125,6 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, if err != nil { return errors.Wrap(err, "create file") } - defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") - defer func() { if err != nil { if rerr := os.Remove(dst); rerr != nil { @@ -134,6 +132,8 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, } } }() + defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") + if _, err = io.Copy(f, rc); err != nil { return errors.Wrap(err, "copy object to file") } @@ -170,6 +170,23 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src, return nil } +// Exists returns true, if file exists, otherwise false and nil error if presence IsObjNotFoundErr, otherwise false with +// returning error. +func Exists(ctx context.Context, bkt Bucket, src string) (bool, error) { + rc, err := bkt.Get(ctx, src) + if rc != nil { + _ = rc.Close() + } + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrap(err, "stat object") + } + + return true, nil +} + // BucketWithMetrics takes a bucket and registers metrics with the given registry for // operations run against the bucket. func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d2070229f10..8e6c3533c58 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1033,10 +1033,11 @@ type bucketBlock struct { indexVersion int symbols map[uint32]string + symbolsV2 map[string]struct{} lvals map[string][]string postings map[labels.Label]index.Range - indexObj string + id ulid.ULID chunkObjs []string pendingReaders sync.WaitGroup @@ -1057,7 +1058,7 @@ func newBucketBlock( b = &bucketBlock{ logger: logger, bucket: bkt, - indexObj: path.Join(id.String(), block.IndexFilename), + id: id, indexCache: indexCache, chunkPool: chunkPool, dir: dir, @@ -1080,6 +1081,14 @@ func newBucketBlock( return b, nil } +func (b *bucketBlock) indexFilename() string { + return path.Join(b.id.String(), block.IndexFilename) +} + +func (b *bucketBlock) indexCacheFilename() string { + return path.Join(b.id.String(), block.IndexCacheFilename) +} + func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { // If we haven't seen the block before download the meta.json file. if _, err := os.Stat(b.dir); os.IsNotExist(err) { @@ -1104,20 +1113,29 @@ func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { cachefn := filepath.Join(b.dir, block.IndexCacheFilename) - - b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) - if err == nil { + if err = b.loadIndexCacheFromFile(ctx, cachefn); err == nil { return nil } if !os.IsNotExist(errors.Cause(err)) { return errors.Wrap(err, "read index cache") } - // No cache exists is on disk yet, build it from the downloaded index and retry. + + // Try to download index cache file from object store. + if err = objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexCacheFilename(), cachefn); err == nil { + return b.loadIndexCacheFromFile(ctx, cachefn) + } + + if !b.bucket.IsObjNotFoundErr(errors.Cause(err)) { + return errors.Wrap(err, "download index cache file") + } + + // No cache exists on disk yet, build it from the downloaded index and retry. fn := filepath.Join(b.dir, block.IndexFilename) - if err := objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexObj, fn); err != nil { + if err := objstore.DownloadFile(ctx, b.logger, b.bucket, b.indexFilename(), fn); err != nil { return errors.Wrap(err, "download index file") } + defer func() { if rerr := os.Remove(fn); rerr != nil { level.Error(b.logger).Log("msg", "failed to remove temp index file", "path", fn, "err", rerr) @@ -1128,15 +1146,16 @@ func (b *bucketBlock) loadIndexCache(ctx context.Context) (err error) { return errors.Wrap(err, "write index cache") } - b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cachefn) - if err != nil { - return errors.Wrap(err, "read index cache") - } - return nil + return errors.Wrap(b.loadIndexCacheFromFile(ctx, cachefn), "read index cache") +} + +func (b *bucketBlock) loadIndexCacheFromFile(ctx context.Context, cache string) (err error) { + b.indexVersion, b.symbols, b.lvals, b.postings, err = block.ReadIndexCache(b.logger, cache) + return err } func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) { - r, err := b.bucket.GetRange(ctx, b.indexObj, off, length) + r, err := b.bucket.GetRange(ctx, b.indexFilename(), off, length) if err != nil { return nil, errors.Wrap(err, "get range reader") }