Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetcher: Made metaFetcher go routine safe; Fixed multiple bucket UI +fetcher issues. #2354

Merged
merged 1 commit into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 6 additions & 47 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
issues = append(issues, issueFn)
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -373,7 +373,8 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
// TODO(bwplotka): Allow Bucket UI to visualisate the state of block as well.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/visualisate/visualize/

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil, nil)
if err != nil {
return err
}
Expand All @@ -382,7 +383,7 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
g.Add(func() error {
statusProber.Ready()
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return refresh(ctx, logger, bucketUI, *interval, *timeout, fetcher)
return bucketUI.RunRefreshLoop(ctx, fetcher, *interval, *timeout)
}, func(error) {
cancel()
})
Expand Down Expand Up @@ -459,48 +460,6 @@ func registerBucketDownsample(m map[string]setupFunc, root *kingpin.CmdClause, n
}
}

// refresh metadata from remote storage periodically and update the UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, fetcher *block.MetaFetcher) error {
return runutil.Repeat(duration, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, timeout)
defer iterCancel()

blocks, err := download(iterCtx, logger, fetcher)
if err != nil {
bucketUI.Set("[]", err)
return err
}

data, err := json.Marshal(blocks)
if err != nil {
bucketUI.Set("[]", err)
return err
}
bucketUI.Set(string(data), nil)
return nil
})
})
}

func download(ctx context.Context, logger log.Logger, fetcher *block.MetaFetcher) ([]metadata.Meta, error) {
level.Info(logger).Log("msg", "synchronizing block metadata")

metas, _, err := fetcher.Fetch(ctx)
if err != nil {
return nil, err
}

blocks := []metadata.Meta{}

for _, meta := range metas {
blocks = append(blocks, *meta)
}

level.Info(logger).Log("msg", "downloaded blocks meta.json", "num", len(blocks))
return blocks, nil
}

func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error {
header := inspectColumns

Expand Down
31 changes: 15 additions & 16 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,27 +301,25 @@ func runCompact(
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(deleteDelay.Seconds()/2)*time.Second)
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{
baseMetaFetcher, err := block.NewBaseFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
metaFetcherFilters := []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
},
block.NewReplicaLabelRemover(logger, dedupReplicaLabels),
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

compactFetcher := baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_", reg), metaFetcherFilters, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, dedupReplicaLabels)})
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
sy, err := compact.NewSyncer(logger, reg, bkt, compactFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -383,29 +381,29 @@ func runCompact(
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil {
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, compactFetcher, downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Warn(logger).Log("msg", "downsampling was explicitly disabled")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, compactFetcher, retentionByResolution, blocksMarkedForDeletion); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil {
return errors.Wrap(err, "error cleaning blocks")
}

compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
compact.BestEffortCleanAbortedPartialUploads(ctx, logger, compactFetcher, bkt, partialUploadDeleteAttempts, blocksMarkedForDeletion)
return nil
}

Expand All @@ -414,7 +412,7 @@ func runCompact(

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, indexCacheDir); err != nil {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, compactFetcher, indexCacheDir); err != nil {
return err
}
}
Expand Down Expand Up @@ -465,7 +463,8 @@ func runCompact(
srv.Handle("/", router)

g.Add(func() error {
return refresh(ctx, logger, bucketUI, waitInterval, time.Minute, metaFetcher)
// TODO(bwplotka): Allow Bucket UI to visualisate the state of the block as well.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same.

return bucketUI.RunRefreshLoop(ctx, baseMetaFetcher.WithFilters(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), metaFetcherFilters, nil), waitInterval, time.Minute)
}, func(error) {
cancel()
})
Expand Down
4 changes: 3 additions & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func RunDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(),
}, nil)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
Expand Down
16 changes: 8 additions & 8 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ func runStore(
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay)
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
})
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
}, nil)
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/go-kit/kit v0.9.0
github.com/go-openapi/strfmt v0.19.2
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9
github.com/golang/snappy v0.0.1
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/gophercloud/gophercloud v0.6.0
Expand Down
Loading