-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store+compactor: process index cache during compaction #986
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,17 +2,22 @@ package main | |
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
"path" | ||
"path/filepath" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
"github.com/improbable-eng/thanos/pkg/block" | ||
"github.com/improbable-eng/thanos/pkg/block/metadata" | ||
"github.com/improbable-eng/thanos/pkg/compact" | ||
"github.com/improbable-eng/thanos/pkg/compact/downsample" | ||
"github.com/improbable-eng/thanos/pkg/objstore" | ||
"github.com/improbable-eng/thanos/pkg/objstore/client" | ||
"github.com/improbable-eng/thanos/pkg/runutil" | ||
"github.com/oklog/run" | ||
|
@@ -87,6 +92,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri | |
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work."). | ||
Short('w').Bool() | ||
|
||
generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload."). | ||
Hidden().Default("false").Bool() | ||
|
||
// TODO(bplotka): Remove this flag once https://github.com/improbable-eng/thanos/issues/297 is fixed. | ||
disableDownsampling := cmd.Flag("debug.disable-downsampling", "Disables downsampling. This is not recommended "+ | ||
"as querying long time ranges without non-downsampled data is not efficient and not useful (is not possible to render all for human eye)."). | ||
|
@@ -110,6 +118,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri | |
*haltOnError, | ||
*acceptMalformedIndex, | ||
*wait, | ||
*generateMissingIndexCacheFiles, | ||
map[compact.ResolutionLevel]time.Duration{ | ||
compact.ResolutionLevelRaw: time.Duration(*retentionRaw), | ||
compact.ResolutionLevel5m: time.Duration(*retention5m), | ||
|
@@ -135,6 +144,7 @@ func runCompact( | |
haltOnError bool, | ||
acceptMalformedIndex bool, | ||
wait bool, | ||
generateMissingIndexCacheFiles bool, | ||
retentionByResolution map[compact.ResolutionLevel]time.Duration, | ||
component string, | ||
disableDownsampling bool, | ||
|
@@ -197,6 +207,7 @@ func runCompact( | |
var ( | ||
compactDir = path.Join(dataDir, "compact") | ||
downsamplingDir = path.Join(dataDir, "downsample") | ||
indexCacheDir = path.Join(dataDir, "index_cache") | ||
) | ||
|
||
if err := os.RemoveAll(downsamplingDir); err != nil { | ||
|
@@ -255,6 +266,13 @@ func runCompact( | |
g.Add(func() error { | ||
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") | ||
|
||
// Generate index file. | ||
if generateMissingIndexCacheFiles { | ||
if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil { | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return err | ||
} | ||
} | ||
|
||
if !wait { | ||
return f() | ||
} | ||
|
@@ -300,3 +318,118 @@ func runCompact( | |
level.Info(logger).Log("msg", "starting compact node") | ||
return nil | ||
} | ||
|
||
// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. | ||
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error { | ||
if err := os.RemoveAll(dir); err != nil { | ||
return errors.Wrap(err, "clean index cache directory") | ||
} | ||
if err := os.MkdirAll(dir, 0777); err != nil { | ||
return errors.Wrap(err, "create dir") | ||
} | ||
|
||
defer func() { | ||
if err := os.RemoveAll(dir); err != nil { | ||
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", dir, "err", err) | ||
} | ||
}() | ||
|
||
level.Info(logger).Log("msg", "start index cache processing") | ||
|
||
var metas []*metadata.Meta | ||
|
||
if err := bkt.Iter(ctx, "", func(name string) error { | ||
id, ok := block.IsBlockDir(name) | ||
if !ok { | ||
return nil | ||
} | ||
|
||
rc, err := bkt.Get(ctx, path.Join(id.String(), block.MetaFilename)) | ||
if err != nil { | ||
// Probably not finished block, skip it. | ||
if bkt.IsObjNotFoundErr(err) { | ||
level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String()) | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return nil | ||
} | ||
return errors.Wrapf(err, "get meta for block %s", id) | ||
} | ||
defer runutil.CloseWithLogOnErr(logger, rc, "block reader") | ||
|
||
var meta metadata.Meta | ||
if err := json.NewDecoder(rc).Decode(&meta); err != nil { | ||
return errors.Wrap(err, "decode meta") | ||
} | ||
|
||
// New version of compactor pushes index cache along with data block. | ||
// Skip uncompacted blocks. | ||
bwplotka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if meta.Compaction.Level == 1 { | ||
return nil | ||
} | ||
|
||
metas = append(metas, &meta) | ||
|
||
return nil | ||
}); err != nil { | ||
return errors.Wrap(err, "retrieve bucket block metas") | ||
} | ||
|
||
for _, meta := range metas { | ||
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be more verbose I would check if index cache is missing first in this loop before going to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's keep where it is, otherwise you have to pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. passing another parameter is not that bad ;p There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but up to you There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not addressed I think |
||
return err | ||
} | ||
} | ||
|
||
level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`") | ||
return nil | ||
} | ||
|
||
func generateIndexCacheFile( | ||
ctx context.Context, | ||
bkt objstore.Bucket, | ||
logger log.Logger, | ||
indexCacheDir string, | ||
meta *metadata.Meta, | ||
) error { | ||
id := meta.ULID | ||
|
||
bdir := filepath.Join(indexCacheDir, id.String()) | ||
if err := os.MkdirAll(bdir, 0777); err != nil { | ||
return errors.Wrap(err, "create block dir") | ||
} | ||
|
||
defer func() { | ||
if err := os.RemoveAll(bdir); err != nil { | ||
level.Error(logger).Log("msg", "failed to remove index cache directory", "path", bdir, "err", err) | ||
} | ||
}() | ||
|
||
cachePath := filepath.Join(bdir, block.IndexCacheFilename) | ||
cache := path.Join(meta.ULID.String(), block.IndexCacheFilename) | ||
|
||
ok, err := objstore.Exists(ctx, bkt, cache) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's move it as commented above |
||
if ok { | ||
return nil | ||
} | ||
if err != nil { | ||
return errors.Wrapf(err, "attempt to check if a cached index file exists") | ||
} | ||
|
||
level.Debug(logger).Log("msg", "make index cache", "block", id) | ||
|
||
// Try to download index file from obj store. | ||
indexPath := filepath.Join(bdir, block.IndexFilename) | ||
index := path.Join(id.String(), block.IndexFilename) | ||
|
||
if err := objstore.DownloadFile(ctx, logger, bkt, index, indexPath); err != nil { | ||
return errors.Wrap(err, "download index file") | ||
} | ||
|
||
if err := block.WriteIndexCache(logger, indexPath, cachePath); err != nil { | ||
return errors.Wrap(err, "write index cache") | ||
} | ||
|
||
if err := objstore.UploadFile(ctx, logger, bkt, cachePath, cache); err != nil { | ||
return errors.Wrap(err, "upload index cache") | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be reworded, but we can do it later on cut 0.4.0 👍