Skip to content

Commit

Permalink
fetcher: Made metaFetcher go routine safe; Fixed multiple bucket UI +…
Browse files Browse the repository at this point in the history
… fetcher issues. (#2354)

Fixed #2349
Fixed races (we were reusing fetcher by both bucket UI and compaction syncs...
Fixed logging
Added singleflight to ensure we don't synchronize too often.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Apr 1, 2020
1 parent b5298ba commit 0aa9954
Show file tree
Hide file tree
Showing 20 changed files with 297 additions and 223 deletions.
53 changes: 6 additions & 47 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
issues = append(issues, issueFn)
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -373,7 +373,8 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
// TODO(bwplotka): Allow Bucket UI to visualisate the state of block as well.
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand All @@ -382,7 +383,7 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
g.Add(func() error {
statusProber.Ready()
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return refresh(ctx, logger, bucketUI, *interval, *timeout, fetcher)
return bucketUI.RunRefreshLoop(ctx, fetcher, *interval, *timeout)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -459,48 +460,6 @@ func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, n
}
}

// refresh metadata from remote storage periodically and update the UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, fetcher *block.MetaFetcher) error {
return runutil.Repeat(duration, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, timeout)
defer iterCancel()

blocks, err := download(iterCtx, logger, fetcher)
if err != nil {
bucketUI.Set("[]", err)
return err
}

data, err := json.Marshal(blocks)
if err != nil {
bucketUI.Set("[]", err)
return err
}
bucketUI.Set(string(data), nil)
return nil
})
})
}

func download(ctx context.Context, logger log.Logger, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
level.Info(logger).Log("msg", "synchronizing block metadata")

metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return nil, err
}

blocks := []metadata.Meta{}

for _, meta := range metas {
blocks = append(blocks, *meta)
}

level.Info(logger).Log("msg", "downloaded blocks meta.json", "num", len(blocks))
return blocks, nil
}

func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error {
header := inspectColumns

Expand Down
31 changes: 15 additions & 16 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,27 +301,25 @@ func runCompact(
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{
baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
metaFetcherFilters := []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
},
block.NewReplicaLabelRemover(logger, dedupReplicaLabels),
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

compactFetcher := baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_", reg), metaFetcherFilters, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
sy, err := compact.NewSyncer(logger, reg, bkt, compactFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -383,29 +381,29 @@ func runCompact(
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, compactFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, compactFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
return nil
}

Expand All @@ -414,7 +412,7 @@ func runCompact(

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, indexCacheDir); err != nil {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, compactFetcher, indexCacheDir); err != nil {
return err
}
}
Expand Down Expand Up @@ -465,7 +463,8 @@ func runCompact(
srv.Handle("/", router)

g.Add(func() error {
return refresh(ctx, logger, bucketUI, waitInterval, time.Minute, metaFetcher)
// TODO(bwplotka): Allow Bucket UI to visualisate the state of the block as well.
return bucketUI.RunRefreshLoop(ctx, baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), metaFetcherFilters, nil), waitInterval, time.Minute)
}, func(error) {
cancel()
})
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func RunDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(),
}, nil)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
Expand Down
16 changes: 8 additions & 8 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ func runStore(
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay)
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
})
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
}, nil)
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/go-kit/kit v0.9.0
github.com/go-openapi/strfmt v0.19.2
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9
github.com/golang/snappy v0.0.1
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/gophercloud/gophercloud v0.6.0
Expand Down
Loading

0 comments on commit 0aa9954

Please sign in to comment.