Skip to content

Commit

Permalink
store+compactor: process index cache during compaction
Browse files Browse the repository at this point in the history
Add few steps during compaction:

1. Generate index cache for old blocks made by compactor until this version.
2. Generate index cache during group compaction.
3. Generate index cache during downsampling.
4. Add index cache version to cache file.

Store downloads index cache files from object store or generate on the
fly if they don't exist.

Signed-off-by: Aleksei Semiglazov <[email protected]>
  • Loading branch information
xjewer committed Apr 12, 2019
1 parent e4a9fa5 commit 5c32ed5
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 31 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ New options:

* `--store.grpc.series-sample-limit` limits the amount of samples that might be retrieved on a single Series() call. By default it is 0. Consider enabling it by setting it to more than 0 if you are running on limited resources.
* `--store.grpc.series-max-concurrency` limits the number of concurrent Series() calls in Thanos Store. By default it is 20. Considering making it lower or bigger depending on the scale of your deployment.
* `--index.generate-missing-cache-file` if enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload. By default is disabled. Check logs on existence the line `generating index cache files is done`, then you can disable this flag.

New metrics:
* `thanos_bucket_store_queries_dropped_total` shows how many queries were dropped due to the samples limit;
Expand All @@ -35,6 +36,7 @@ New tracing span:
- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.
- [#1016](https://github.com/improbable-eng/thanos/pull/1016) Added option for another DNS resolver (miekg/dns client).
This to have SRV resolution working on [Golang 1.11+ with KubeDNS below v1.14](https://github.com/golang/go/issues/27546)
- [#986](https://github.com/improbable-eng/thanos/pull/986) Store index cache files in object storage, reduces store start-up time by skipping the generating the index cache for all blocks and only do this for recently created uncompacted blocks.

### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.
Expand Down
133 changes: 133 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
Expand Down Expand Up @@ -87,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Hidden().Default("false").Bool()

// TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed.
disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+
"as querying long time ranges without non-downsampled data is not efficient and not useful (is not possible to render all for human eye).").
Expand All @@ -110,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
*haltOnError,
*acceptMalformedIndex,
*wait,
*generateMissingIndexCacheFiles,
map[compact.ResolutionLevel]time.Duration{
compact.ResolutionLevelRaw: time.Duration(*retentionRaw),
compact.ResolutionLevel5m: time.Duration(*retention5m),
Expand All @@ -135,6 +144,7 @@ func runCompact(
haltOnError bool,
acceptMalformedIndex bool,
wait bool,
generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
disableDownsampling bool,
Expand Down Expand Up @@ -197,6 +207,7 @@ func runCompact(
var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
indexCacheDir = path.Join(dataDir, "index_cache")
)

if err := os.RemoveAll(downsamplingDir); err != nil {
Expand Down Expand Up @@ -255,6 +266,13 @@ func runCompact(
g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil {
return err
}
}

if !wait {
return f()
}
Expand Down Expand Up @@ -300,3 +318,118 @@ func runCompact(
level.Info(logger).Log("msg", "starting compact node")
return nil
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error {
if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean index cache directory")
}
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err)
}
}()

level.Info(logger).Log("msg", "start index cache processing")

var metas []*metadata.Meta

if err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}

rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename))
if err != nil {
// Probably not finished block, skip it.
if bkt.IsObjNotFoundErr(err) {
level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String())
return nil
}
return errors.Wrapf(err, "get meta for block %s", id)
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")

var meta metadata.Meta
if err := json.NewDecoder(rc).Decode(&meta); err != nil {
return errors.Wrap(err, "decode meta")
}

// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
if meta.Compaction.Level == 1 {
return nil
}

metas = append(metas, &meta)

return nil
}); err != nil {
return errors.Wrap(err, "retrieve bucket block metas")
}

for _, meta := range metas {
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
}

level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`")
return nil
}

func generateIndexCacheFile(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
indexCacheDir string,
meta *metadata.Meta,
) error {
id := meta.ULID

bdir := filepath.Join(indexCacheDir, id.String())
if err := os.MkdirAll(bdir, 0777); err != nil {
return errors.Wrap(err, "create block dir")
}

defer func() {
if err := os.RemoveAll(bdir); err != nil {
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err)
}
}()

cachePath := filepath.Join(bdir, block.IndexCacheFilename)
cache := path.Join(meta.ULID.String(), block.IndexCacheFilename)

ok, err := objstore.Exists(ctx, bkt, cache)
if ok {
return nil
}
if err != nil {
return errors.Wrapf(err, "attempt to check if a cached index file exists")
}

level.Debug(logger).Log("msg", "make index cache", "block", id)

// Try to download index file from obj store.
indexPath := filepath.Join(bdir, block.IndexFilename)
index := path.Join(id.String(), block.IndexFilename)

if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil {
return errors.Wrap(err, "download index file")
}

if err := block.WriteIndexCache(logger, indexPath, cachePath); err != nil {
return errors.Wrap(err, "write index cache")
}

if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil {
return errors.Wrap(err, "upload index cache")
}
return nil
}
8 changes: 8 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
MetaFilename = "meta.json"
// IndexFilename is the known index file for block index.
IndexFilename = "index"
// IndexCacheFilename is the canonical name for index cache file that stores essential information needed.
IndexCacheFilename = "index.cache.json"
// ChunksDirname is the known dir name for chunks with compressed samples.
ChunksDirname = "chunks"

Expand Down Expand Up @@ -93,6 +95,12 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return cleanUp(bkt, id, errors.Wrap(err, "upload index"))
}

if meta.Thanos.Source == metadata.CompactorSource {
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload index cache"))
}
}

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file
// to be pending uploads.
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil {
Expand Down
22 changes: 13 additions & 9 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ import (
"github.com/prometheus/tsdb/labels"
)

// IndexCacheFilename is the canonical name for index cache files.
const IndexCacheFilename = "index.cache.json"
const (
// IndexCacheVersion is a enumeration of index cache versions supported by Thanos.
IndexCacheVersion1 = iota + 1
)

type postingsRange struct {
Name, Value string
Start, End int64
}

type indexCache struct {
Version int
Symbols map[uint32]string
LabelValues map[string][]string
Postings []postingsRange
Version int
CacheVersion int
Symbols map[uint32]string
LabelValues map[string][]string
Postings []postingsRange
}

type realByteSlice []byte
Expand Down Expand Up @@ -112,9 +115,10 @@ func WriteIndexCache(logger log.Logger, indexFn string, fn string) error {
defer runutil.CloseWithLogOnErr(logger, f, "index cache writer")

v := indexCache{
Version: indexr.Version(),
Symbols: symbols,
LabelValues: map[string][]string{},
Version: indexr.Version(),
CacheVersion: IndexCacheVersion1,
Symbols: symbols,
LabelValues: map[string][]string{},
}

// Extract label value indices.
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

const (
// MetaVersion is a enumeration of versions supported by Thanos.
// MetaVersion is a enumeration of meta versions supported by Thanos.
MetaVersion1 = iota + 1
)

Expand Down
24 changes: 19 additions & 5 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package compact
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
Expand All @@ -11,8 +12,6 @@ import (

"github.com/improbable-eng/thanos/pkg/block/metadata"

"io/ioutil"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
Expand Down Expand Up @@ -61,6 +60,9 @@ type syncerMetrics struct {
garbageCollectionDuration prometheus.Histogram
compactions *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
indexCacheBlocks prometheus.Counter
indexCacheTraverse prometheus.Counter
indexCacheFailures prometheus.Counter
}

func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
Expand Down Expand Up @@ -535,7 +537,6 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (
cg.compactionFailures.Inc()
}
cg.compactions.Inc()

return shouldRerun, compID, err
}

Expand Down Expand Up @@ -813,6 +814,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin))

bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)
indexCache := filepath.Join(bdir, block.IndexCacheFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Expand All @@ -828,7 +831,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand All @@ -837,6 +840,10 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir))
}

if err := block.WriteIndexCache(cg.logger, index, indexCache); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "write index cache")
}

begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil {
Expand Down Expand Up @@ -888,7 +895,14 @@ type BucketCompactor struct {
}

// NewBucketCompactor creates a new bucket compactor.
func NewBucketCompactor(logger log.Logger, sy *Syncer, comp tsdb.Compactor, compactDir string, bkt objstore.Bucket, concurrency int) (*BucketCompactor, error) {
func NewBucketCompactor(
logger log.Logger,
sy *Syncer,
comp tsdb.Compactor,
compactDir string,
bkt objstore.Bucket,
concurrency int,
) (*BucketCompactor, error) {
if concurrency <= 0 {
return nil, errors.New("invalid concurrency level (%d), concurrency level must be > 0")
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/compact/downsample/streamed_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ func (w *streamedBlockWriter) finalize() error {
return errors.Wrap(err, "write mem postings")
}

if err := w.writeIndexCache(); err != nil {
return errors.Wrap(err, "write index cache")
}

if err := w.writeMetaFile(); err != nil {
return errors.Wrap(err, "write meta meta")
}
Expand Down Expand Up @@ -253,6 +257,16 @@ func (w *streamedBlockWriter) writeMemPostings() error {
return nil
}

func (w *streamedBlockWriter) writeIndexCache() error {
indexFile := filepath.Join(w.blockDir, block.IndexFilename)
indexCacheFile := filepath.Join(w.blockDir, block.IndexCacheFilename)
if err := block.WriteIndexCache(w.logger, indexFile, indexCacheFile); err != nil {
return errors.Wrap(err, "write index cache")
}

return nil
}

// writeMetaFile writes meta file.
func (w *streamedBlockWriter) writeMetaFile() error {
w.meta.Version = metadata.MetaVersion1
Expand Down
Loading

0 comments on commit 5c32ed5

Please sign in to comment.