Skip to content

Commit

Permalink
Shipper: change upload compacted type from bool to a function (#6526)
Browse files Browse the repository at this point in the history
* change shipper upload compacted type from bool to a function

Signed-off-by: Ben Ye <[email protected]>

* add default to false

Signed-off-by: Ben Ye <[email protected]>

* reset uploadedCompacted to 0

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jul 19, 2023
1 parent 6fda58a commit a10de73
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, false, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
s := shipper.New(logger, reg, conf.dataDir, bkt, func() labels.Labels { return conf.lset }, metadata.RulerSource, nil, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))

ctx, cancel := context.WithCancel(context.Background())

Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,9 @@ func runSidecar(
return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout)
}

uploadCompactedFunc := func() bool { return conf.shipper.uploadCompacted }
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.bucket,
func() labels.Labels { return lset },
metadata.ReceiveSource,
false,
nil,
t.allowOutOfOrderUpload,
t.hashFunc,
)
Expand Down
31 changes: 17 additions & 14 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type metrics struct {
uploadedCompacted prometheus.Gauge
}

func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics {
func newMetrics(reg prometheus.Registerer) *metrics {
var m metrics

m.dirSyncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Expand All @@ -59,15 +59,10 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics {
Name: "thanos_shipper_upload_failures_total",
Help: "Total number of block upload failures",
})
uploadCompactedGaugeOpts := prometheus.GaugeOpts{
m.uploadedCompacted = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_shipper_upload_compacted_done",
Help: "If 1 it means shipper uploaded all compacted blocks from the filesystem.",
}
if uploadCompacted {
m.uploadedCompacted = promauto.With(reg).NewGauge(uploadCompactedGaugeOpts)
} else {
m.uploadedCompacted = promauto.With(nil).NewGauge(uploadCompactedGaugeOpts)
}
})
return &m
}

Expand All @@ -80,7 +75,7 @@ type Shipper struct {
bucket objstore.Bucket
source metadata.SourceType

uploadCompacted bool
uploadCompactedFunc func() bool
allowOutOfOrderUploads bool
hashFunc metadata.HashFunc

Expand All @@ -98,7 +93,7 @@ func New(
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
uploadCompacted bool,
uploadCompactedFunc func() bool,
allowOutOfOrderUploads bool,
hashFunc metadata.HashFunc,
) *Shipper {
Expand All @@ -109,15 +104,20 @@ func New(
lbls = func() labels.Labels { return nil }
}

if uploadCompactedFunc == nil {
uploadCompactedFunc = func() bool {
return false
}
}
return &Shipper{
logger: logger,
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, uploadCompacted),
metrics: newMetrics(r),
source: source,
allowOutOfOrderUploads: allowOutOfOrderUploads,
uploadCompacted: uploadCompacted,
uploadCompactedFunc: uploadCompactedFunc,
hashFunc: hashFunc,
}
}
Expand Down Expand Up @@ -272,6 +272,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
uploadErrs int
)

uploadCompacted := s.uploadCompactedFunc()
metas, err := s.blockMetasFromOldest()
if err != nil {
return 0, err
Expand All @@ -292,7 +293,7 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {

// We only ship of the first compacted block level as normal flow.
if m.Compaction.Level > 1 {
if !s.uploadCompacted {
if !uploadCompacted {
continue
}
}
Expand Down Expand Up @@ -339,8 +340,10 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs)
}

if s.uploadCompacted {
if uploadCompacted {
s.metrics.uploadedCompacted.Set(1)
} else {
s.metrics.uploadedCompacted.Set(0)
}
return uploaded, nil
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
dir := t.TempDir()

extLset := labels.FromStrings("prometheus", "prom-1")
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false, metadata.NoneFunc)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, nil, false, metadata.NoneFunc)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -219,7 +219,8 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger))

shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false, metadata.NoneFunc)
uploadCompactedFunc := func() bool { return true }
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, false, metadata.NoneFunc)

// Create 10 new blocks. 9 of them (non compacted) should be actually uploaded.
var (
Expand Down Expand Up @@ -374,8 +375,9 @@ func TestShipper_SyncOverlapBlocks_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2, logger))

uploadCompactedFunc := func() bool { return true }
// Here, the allowOutOfOrderUploads flag is set to true, which allows blocks with overlaps to be uploaded.
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, true, metadata.NoneFunc)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, uploadCompactedFunc, true, metadata.NoneFunc)

// Creating 2 overlapping blocks - both uploaded when OOO uploads allowed.
var (
Expand Down
8 changes: 4 additions & 4 deletions pkg/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func TestShipperTimestamps(t *testing.T) {
dir := t.TempDir()

s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc)
s := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc)

// Missing thanos meta file.
_, _, err := s.Timestamps()
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestIterBlockMetas(t *testing.T) {
},
}.WriteToDir(log.NewNopLogger(), path.Join(dir, id3.String())))

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc)
metas, err := shipper.blockMetasFromOldest()
testutil.Ok(t, err)
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
Expand Down Expand Up @@ -153,7 +153,7 @@ func BenchmarkIterBlockMetas(b *testing.B) {
})
b.ResetTimer()

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false, metadata.NoneFunc)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, nil, false, metadata.NoneFunc)

_, err := shipper.blockMetasFromOldest()
testutil.Ok(b, err)
Expand All @@ -165,7 +165,7 @@ func TestShipperAddsSegmentFiles(t *testing.T) {
inmemory := objstore.NewInMemBucket()

lbls := []labels.Label{{Name: "test", Value: "test"}}
s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, false, false, metadata.NoneFunc)
s := New(nil, nil, dir, inmemory, func() labels.Labels { return lbls }, metadata.TestSource, nil, false, metadata.NoneFunc)

id := ulid.MustNew(1, nil)
blockDir := path.Join(dir, id.String())
Expand Down

0 comments on commit a10de73

Please sign in to comment.