-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cmd,pkg: flush storage when hashring changes #1480
Conversation
ce1a8f5
to
c2a685a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This work is almost entirely in the cmd/thanos/receive.go
file. I would suggest that we instead introduce a new file/struct under the pkg/receive
package, that encapsulates the interactions at open/close times with the uploader. This will also keep the cmd/thanos/receive.go
file a bit smaller.
fba92d4
to
fb0bec7
Compare
why is the db run without a lock file? |
Hm does this PR introduce a regression? |
It's not a regression, it's intentional as we always run this component in environments where the orchestration system ensure there is only one process connected to a volume, hence no need for a lockfile. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a regression, it's intentional as we always run this component in environments where the orchestration system ensure there is only one process connected to a volume, hence no need for a lockfile.
but still why does it need to be disabled? Wouldn't be safer to have it enabled regardless?
// Before actually starting, we need to make sure | ||
// the WAL is flushed. | ||
startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000) | ||
if err := db.Open(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not call the Open method inside Flush? this will remove some of the boilerplate.
The same for db.Get()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we could do that. I was worried that it would be confusing if flush left an open DB. Does you think it would be more expected for Flush to leave a running DB?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is most logical that flush should: open the db -> flush -> close the db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in this case flush shouldnt be a type and there should be no handle returned. Instead you think it should simply be a func that flushes the specified dir?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaah I missed that it needs to be kept open for the Get()
method.
How about something like:
func (f *FlushableStorage) Flush() error {
f.Open()
defer f.Close()
...
}
and
// Get opens a db instance and returns a reference to it.
// The caller is responsible for closing the db.
func (f *FlushableStorage) Get() *promtsdb.DB {
f.Open()
return f.DB
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I just want to be clear about the API. I think it’s confusing if Flush leaves an open DB if the original state was closed. It may be clearest to say Flush will always leave the storage in the same state it was in: open storage will stay open; closed storage will remain closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep , that is a great idea, but can you imagine a case where the storage would be open when running a flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DB is open immediately before every flush (we are actively writing samples until we get the reshard signal and need to flush), someone has to close it, either the caller, or the flush method itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm yeah, maybe now with the new tsdb.Flush method can just have just use it directly and not need the separate FlushableStorage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that’s what I was driving at earlier. It will reduce the code here, though we’ll have some more boilerplate in the caller since it will have to open a new DB after flushing. This type is convenient because we can have one single reference that takes care of re-opening itself after flushing if it was already open and the only thing the caller has to do is call Open
once at the very beginning with all the args and then Flush
g.Add(func() error { | ||
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") | ||
// Before starting, ensure any old blocks are uploaded. | ||
if uploaded, err := s.Sync(context.Background()); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use some global cancel context to avoid this blocking as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole idea is that we want this to block so nothing is ingested before old blocks are flushed and uploaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you think that it will cause issues when you try to exit the receiver in the middle of an upload and it blocks here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already had this discussion with @brancz on this PR, but I'm not sure where GitHub put that comment history :/ . The original implementation had a 10-minute timeout on the context and the suggestion was to remove that. Let's cover that question in a follow rather than go back and forth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I wasn't aware you discussed it already.
When I said a global context I meant to be cancel-able like the rest of the actors groups.
3836574
to
4647afc
Compare
ping @krasi-georgiev @brancz PTAL again (: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm but giving @krasi-georgiev another chance to review
9b6c22a
to
0e32610
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, I think it is much better than before.
func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) { | ||
go cw.Run(ctx) | ||
defer close(updates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it a widely accepted that only the sender should close a channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goroutine IS the sender. In the single-node case, where there is no configuration watcher we have to close it ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for my ignorance, could you expand a bit more on that,
I see no reason why it can't be closed somewhere in runReceive
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we don't want to close it in runReceive
is because by convention chans should always be closed by their sender to avoid races that send on closed chan
. In this case, the HashringFromConfig
func runs a goroutine that is the only sender of the given chan, and as such it should be the one to close it. Otherwise, we need to add another layer of synchronization to always ensure that the generator goroutine has returned before we close the given chan. This adds a lot of complexity for little benefit IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping @krasi-georgiev
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes 100% correct, just got confused with the channel directions. Thanks for the patience!
Signed-off-by: Lucas Servén Marín <[email protected]>
In order to allow the hashring of receive nodes to scale at runtime, we need to temporarily stop the storage, flush it, and upload the blocks. This commit ensures that whenever a hashring change is detected, the entire storage is flushed and the shipper is notified so that is uploads any new blocks. It also ensures that when the receive component starts up, any in-progress WAL is flushed and uploaded. This ensures that new data that may belong to a different tenant is not mixed with old data. Finally, it also ensures that the storage is flushed and uploaded whenever the process shuts down. Signed-off-by: Lucas Servén Marín <[email protected]>
LGTM |
* pkg/receive: create flushable storage Signed-off-by: Lucas Servén Marín <[email protected]> * cmd,pkg: flush storage when hashring changes In order to allow the hashring of receive nodes to scale at runtime, we need to temporarily stop the storage, flush it, and upload the blocks. This commit ensures that whenever a hashring change is detected, the entire storage is flushed and the shipper is notified so that is uploads any new blocks. It also ensures that when the receive component starts up, any in-progress WAL is flushed and uploaded. This ensures that new data that may belong to a different tenant is not mixed with old data. Finally, it also ensures that the storage is flushed and uploaded whenever the process shuts down. Signed-off-by: Lucas Servén Marín <[email protected]> Signed-off-by: Giedrius Statkevičius <[email protected]>
This PR implements one of the last tasks of the Receive proposal
(tracked in #1093).
In order to allow the hashring of receive nodes to scale at runtime, we
need to temporarily stop the storage, flush it, and upload the blocks.
This commit ensures that whenever a hashring change is detected, the
entire storage is flushed and the shipper is notified so that is uploads
any new blocks. It also ensures that when the receive component starts
up, any in-progress WAL is flushed and uploaded. This ensures that
new data that may belong to a different tenant is not mixed with old
data. Finally, it also ensures that the storage is flushed and uploaded
whenever the process shuts down.
Changes
Verification
Manual testing of modifying hashrings at runtime and confirming
WAL is flushed to blocks.
cc @brancz @bwplotka @metalmatze