Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: Made MarkForDeletion less strict; Added more debugability to block deletion logic, made meta sync explicit. #2385

Merged
merged 1 commit into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 57 additions & 31 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -113,7 +114,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
waitInterval := cmd.Flag("wait-interval", "Wait interval between consecutive compaction runs and bucket refreshes. Only works when --wait flag specified.").
Default("5m").Duration()

generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
generateMissingIndexCacheFiles := cmd.Flag("index.generate-missing-cache-file", "DEPRECATED flag. Will be removed in next release. If enabled, on startup compactor runs an on-off job that scans all the blocks to find all blocks with missing index cache file. It generates those if needed and upload.").
Hidden().Default("false").Bool()

disableDownsampling := cmd.Flag("downsampling.disable", "Disables downsampling. This is not recommended "+
Expand Down Expand Up @@ -290,29 +291,48 @@ func runCompact(
}()

// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, deleteDelay/2)
duplicateBlocksFilter := block.NewDeduplicateFilter()

baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
compactFetcher := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})

enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, compactFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader)
var sy *compact.Syncer
{
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)},
)
cf.UpdateOnChange(compactorView.Set)
sy, err = compact.NewSyncer(
logger,
reg,
bkt,
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
blocksMarkedForDeletion,
blockSyncConcurrency,
acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
}
}

levels, err := compactions.levels(maxCompactionLevel)
Expand Down Expand Up @@ -371,39 +391,52 @@ func runCompact(
// We run two passes of this to ensure that the 1h downsampling is generated
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
level.Info(logger).Log("msg", "downsampling was explicitly disabled")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, compactFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
// TODO(bwplotka): Find a way to avoid syncing if no op was done.
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

// No need to resync before partial uploads and delete marked blocks. Last sync should be valid.
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, compactFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
return nil
}

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

// Generate index file.
// Generate index files.
// TODO(bwplotka): Remove this in next release.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, compactFetcher, indexCacheDir); err != nil {
if err := sy.SyncMetas(ctx); err != nil {
return err
}
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, sy.Metas(), indexCacheDir); err != nil {
return err
}
}
Expand Down Expand Up @@ -451,16 +484,14 @@ func runCompact(
r := route.New()

ins := extpromhttp.NewInstrumentationMiddleware(reg)
compactorView := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/loaded"), prefixHeader)
compactorView.Register(r, ins)
compactFetcher.UpdateOnChange(compactorView.Set)

global := ui.NewBucketUI(logger, label, path.Join(externalPrefix, "/global"), prefixHeader)
global.Register(r, ins)

// Separate fetcher for global view.
// TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well.
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil)
f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, nil, "component", "globalBucketUI")
f.UpdateOnChange(global.Set)

srv.Handle("/", r)
Expand Down Expand Up @@ -494,7 +525,7 @@ func runCompact(
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, metas map[ulid.ULID]*metadata.Meta, dir string) error {
genIndex := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
Expand All @@ -515,11 +546,6 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom

level.Info(logger).Log("msg", "start index cache processing")

metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "fetch metas")
}

for _, meta := range metas {
// New version of compactor pushes index cache along with data block.
// Skip uncompacted blocks.
Expand Down
21 changes: 11 additions & 10 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,20 @@ func RunDownsample(
statusProber.Ready()

level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
metas, _, err := metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil {
metas, _, err = metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}

Expand Down Expand Up @@ -144,7 +150,7 @@ func downsampleBucket(
logger log.Logger,
metrics *DownsampleMetrics,
bkt objstore.Bucket,
fetcher block.MetadataFetcher,
metas map[ulid.ULID]*metadata.Meta,
dir string,
) error {
if err := os.RemoveAll(dir); err != nil {
Expand All @@ -160,11 +166,6 @@ func downsampleBucket(
}
}()

metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "downsampling meta fetch")
}

// mapping from a hash over all source IDs to blocks. We don't need to downsample a block
// if a downsampled version with the same hash already exists.
sources5m := map[ulid.ULID]struct{}{}
Expand Down
8 changes: 6 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metas, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)
Expand Down Expand Up @@ -119,7 +121,9 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metas, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))

_, err = os.Stat(dir)
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -129,14 +130,15 @@ func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) er
}

// MarkForDeletion creates a file which stores information about when the block was marked for deletion.
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error {
func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, markedForDeletion prometheus.Counter) error {
deletionMarkFile := path.Join(id.String(), metadata.DeletionMarkFilename)
deletionMarkExists, err := bkt.Exists(ctx, deletionMarkFile)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", deletionMarkFile)
}
if deletionMarkExists {
return errors.Errorf("file %s already exists in bucket", deletionMarkFile)
level.Warn(logger).Log("msg", "requested to mark for deletion, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", deletionMarkFile))
return nil
}

deletionMark, err := json.Marshal(metadata.DeletionMark{
Expand All @@ -151,7 +153,7 @@ func MarkForDeletion(ctx context.Context, logger log.Logger, bkt objstore.Bucket
if err := bkt.Upload(ctx, deletionMarkFile, bytes.NewBuffer(deletionMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", deletionMarkFile)
}

markedForDeletion.Inc()
level.Info(logger).Log("msg", "block has been marked for deletion", "block", id)
return nil
}
Expand All @@ -168,6 +170,7 @@ func Delete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid
if err != nil {
return errors.Wrapf(err, "stat %s", metaFile)
}

if ok {
if err := bkt.Delete(ctx, metaFile); err != nil {
return errors.Wrapf(err, "delete %s", metaFile)
Expand Down
82 changes: 45 additions & 37 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
Expand All @@ -17,6 +16,9 @@ import (

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -232,44 +234,50 @@ func TestMarkForDeletion(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := objstore.NewInMemBucket()
{
blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithoutDeletionMark.String())))
for _, tcase := range []struct {
name string
preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)

testutil.Ok(t, MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithoutDeletionMark))
exists, err := bkt.Exists(ctx, path.Join(blockWithoutDeletionMark.String(), metadata.DeletionMarkFilename))
testutil.Ok(t, err)
testutil.Equals(t, true, exists)
}
{
blockWithDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithDeletionMark.String())))
blocksMarked int
}{
{
name: "block marked for deletion",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksMarked: 1,
},
{
name: "block with deletion mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: id,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
},
blocksMarked: 0,
},
} {
t.Run(tcase.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)

deletionMark, err := json.Marshal(metadata.DeletionMark{
ID: blockWithDeletionMark,
DeletionTime: time.Now().Unix(),
Version: metadata.DeletionMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename), bytes.NewReader(deletionMark)))
tcase.preUpload(t, id, bkt)

err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, blockWithDeletionMark)
testutil.NotOk(t, err)
testutil.Equals(t, fmt.Sprintf("file %s already exists in bucket", path.Join(blockWithDeletionMark.String(), metadata.DeletionMarkFilename)), err.Error())
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForDeletion(ctx, log.NewNopLogger(), bkt, id, c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
}
}
Loading