-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added shipper capability to sync compacted blocks once. #731
Conversation
767d905
to
75a13c3
Compare
f0b17e4
to
44fadad
Compare
75a13c3
to
0e0b864
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice PR! LGTM! just some minor nits for me!
0e0b864
to
bbd0b52
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, ok from me just few minor suggestions or nits.
cmd/thanos/bucket.go
Outdated
if *dataDir != "" { | ||
level.Warn(logger).Log("msg", "is Prometheus operating on specified data dir? If yes, turn it's local compaction or leave dataDir empty to safely snapshot data to avoid races.") | ||
} | ||
level.Warn(logger).Log("msg", "have you turned off compactor? Currently it is unsafe to run compactor in the same time as this command! (y/N)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe adding flag --force-yes
would be good to allow automating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on other hand, you can do the same with just yes
command on unix, so let's skip this for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, you are right we can skip it.
bbd0b52
to
a8391db
Compare
8c9de1f
to
d12b918
Compare
a8391db
to
8abd8d1
Compare
d12b918
to
abcc887
Compare
737cf23
to
e15f983
Compare
ad0c1fa
to
bbdbe00
Compare
Moved from command to proper migration handling in sidecar. Tested that during FOSDEM demo. (: |
43fa7f3
to
17c496b
Compare
17c496b
to
ca773c7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :) just few questions
@@ -51,13 +57,18 @@ func newMetrics(r prometheus.Registerer) *metrics { | |||
Name: "thanos_shipper_upload_failures_total", | |||
Help: "Total number of failed object uploads", | |||
}) | |||
m.uploadedCompacted = prometheus.NewGauge(prometheus.GaugeOpts{ | |||
Name: "thanos_shipper_upload_compacted_done", | |||
Help: "If 1 it means shipper uploaded all compacted blocks from the filesystem.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just out of curiosity what is the use-case for this metric?
It strikes me that shipping of compacted blocks is one time issue during migration of pure Prometheus setup.
In that case you should see it in the logs because you know it happens in this time and after that you will disable the compacted syncing I suppose?
Maybe I'm just missing the use-case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, at a minimum, this metric should only be registered if uploadCompacted
is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, but wanted just more flexible way to tell if that one operation is done or not....will register it only on that flag good point 👍
pkg/testutil/prometheus.go
Outdated
return errors.New("method Start was not invoked.") | ||
} | ||
return runutil.Retry(time.Second, ctx.Done(), func() error { | ||
r, err := http.Get(fmt.Sprintf("http://%s/metrics", p.addr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to check the /-/ready
endpoint? IIRC /metrics
is ready sooner than the instance is ready to answer queries for example.
pkg/testutil/prometheus.go
Outdated
return errors.Wrap(err, "failed to kill Prometheus. Kill it manually") | ||
} | ||
|
||
time.Sleep(time.Second / 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we use p.wait()
? https://golang.org/pkg/os/#Process.Wait
in case of lazy stop running two instances on the same data could cause issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just a small question.
pkg/shipper/shipper.go
Outdated
@@ -295,6 +457,7 @@ func hardlinkBlock(src, dst string) error { | |||
type Meta struct { | |||
Version int `json:"version"` | |||
Uploaded []ulid.ULID `json:"uploaded"` | |||
Ignored []ulid.ULID `json:"ignored"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this filed used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not, good point!
pkg/block/metadata/meta.go
Outdated
@@ -27,6 +27,7 @@ const ( | |||
CompactorRepairSource SourceType = "compactor.repair" | |||
RulerSource SourceType = "ruler" | |||
BucketRepairSource SourceType = "bucket.repair" | |||
BucketSyncSource SourceType = "bucket.sync" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it should be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes!
cmd/thanos/sidecar.go
Outdated
@@ -51,6 +51,8 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri | |||
|
|||
objStoreConfig := regCommonObjStoreFlags(cmd, "", false) | |||
|
|||
uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Hidden().Bool() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably be explicit and Default("false")
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
ca773c7
to
bc57e25
Compare
bc57e25
to
81f9257
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - minor naming nits :)
pkg/shipper/shipper.go
Outdated
// NewMigrating creates a new shipper that detects new TSDB blocks in dir and uploads them | ||
// to remote if necessary, including compacted blocks which are already in filesystem. | ||
// It attaches the Thanos metadata section in each meta JSON file. | ||
func NewMigrating( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - would update to NewWithCompacted
its a shipper that also ships compacted blocks.
// If updload | ||
// | ||
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok) | ||
func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - uploaded
is used a few times in this func ... might be clearer to be something like uploadedCount
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mhm, I would stay with upload int
- you can see integer so you can assume this?
pkg/shipper/shipper.go
Outdated
) | ||
// Sync non compacted blocks first. | ||
if err := s.iterBlockMetas(func(m *metadata.Meta) error { | ||
// Do not sync a block if we already uploaded or ignored it. If it is no longer found in the bucket, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which it
? (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess later - mgm not sure
Changes: * Use shipper metrics properly. * Do not add compacted blocks to `uploaded` arr in thanos shipper meta if they are not uploaded. * Added flag for syncing compacted blocks once. * Enable working on different work dir. * Enable using snapshot API to avoid race conditions. * Ignore empty blocks. Signed-off-by: Bartek Plotka <[email protected]>
81f9257
to
99ca07f
Compare
Added shipper capability to sync compacted blocks once.
Changes:
uploaded
arr in thanos shipper meta if they are not uploaded.Signed-off-by: Bartek Plotka [email protected]