Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd,pkg: flush storage when hashring changes #1480

Merged
merged 2 commits into from
Sep 30, 2019
Merged

Conversation

squat
Copy link
Member

@squat squat commented Aug 31, 2019

This PR implements one of the last tasks of the Receive proposal
(tracked in #1093).

In order to allow the hashring of receive nodes to scale at runtime, we
need to temporarily stop the storage, flush it, and upload the blocks.
This commit ensures that whenever a hashring change is detected, the
entire storage is flushed and the shipper is notified so that is uploads
any new blocks. It also ensures that when the receive component starts
up, any in-progress WAL is flushed and uploaded. This ensures that
new data that may belong to a different tenant is not mixed with old
data. Finally, it also ensures that the storage is flushed and uploaded
whenever the process shuts down.

Changes

  • flush storage and upload blocks on startup
  • stop, flush, and restart storage whenever hashring changes
  • flush storage and upload blocks on shutdown

Verification

Manual testing of modifying hashrings at runtime and confirming
WAL is flushed to blocks.

cc @brancz @bwplotka @metalmatze

@squat squat force-pushed the reshard branch 8 times, most recently from ce1a8f5 to c2a685a Compare September 4, 2019 10:17
Copy link
Member

@brancz brancz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This work is almost entirely in the cmd/thanos/receive.go file. I would suggest that we instead introduce a new file/struct under the pkg/receive package, that encapsulates the interactions at open/close times with the uploader. This will also keep the cmd/thanos/receive.go file a bit smaller.

cmd/thanos/receive.go Outdated Show resolved Hide resolved
cmd/thanos/receive.go Show resolved Hide resolved
cmd/thanos/receive.go Show resolved Hide resolved
cmd/thanos/receive.go Outdated Show resolved Hide resolved
@squat squat force-pushed the reshard branch 3 times, most recently from fba92d4 to fb0bec7 Compare September 10, 2019 08:40
@krasi-georgiev
Copy link
Contributor

pkg/receive/tsdb.go Outdated Show resolved Hide resolved
@squat
Copy link
Member Author

squat commented Sep 10, 2019

why is the db run without a lock file?

Hm does this PR introduce a regression?

@brancz
Copy link
Member

brancz commented Sep 10, 2019

It's not a regression, it's intentional as we always run this component in environments where the orchestration system ensure there is only one process connected to a volume, hence no need for a lockfile.

Copy link
Contributor

@krasi-georgiev krasi-georgiev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a regression, it's intentional as we always run this component in environments where the orchestration system ensure there is only one process connected to a volume, hence no need for a lockfile.

but still why does it need to be disabled? Wouldn't be safer to have it enabled regardless?

cmd/thanos/receive.go Show resolved Hide resolved
// Before actually starting, we need to make sure
// the WAL is flushed.
startTimeMargin := int64(2 * time.Duration(tsdbCfg.MinBlockDuration).Seconds() * 1000)
if err := db.Open(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not call the Open method inside Flush? this will remove some of the boilerplate.

The same for db.Get()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we could do that. I was worried that it would be confusing if flush left an open DB. Does you think it would be more expected for Flush to leave a running DB?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is most logical that flush should: open the db -> flush -> close the db

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in this case flush shouldnt be a type and there should be no handle returned. Instead you think it should simply be a func that flushes the specified dir?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaah I missed that it needs to be kept open for the Get() method.

How about something like:

func (f *FlushableStorage) Flush() error {
	f.Open()
	defer f.Close() 
	...
}

and

// Get opens a db instance and returns a reference to it.
// The caller is responsible for closing the db.
func (f *FlushableStorage) Get() *promtsdb.DB {
	f.Open()
	return f.DB
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I just want to be clear about the API. I think it’s confusing if Flush leaves an open DB if the original state was closed. It may be clearest to say Flush will always leave the storage in the same state it was in: open storage will stay open; closed storage will remain closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep , that is a great idea, but can you imagine a case where the storage would be open when running a flush?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DB is open immediately before every flush (we are actively writing samples until we get the reshard signal and need to flush), someone has to close it, either the caller, or the flush method itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm yeah, maybe now with the new tsdb.Flush method can just have just use it directly and not need the separate FlushableStorage

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that’s what I was driving at earlier. It will reduce the code here, though we’ll have some more boilerplate in the caller since it will have to open a new DB after flushing. This type is convenient because we can have one single reference that takes care of re-opening itself after flushing if it was already open and the only thing the caller has to do is call Open once at the very beginning with all the args and then Flush

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// Before starting, ensure any old blocks are uploaded.
if uploaded, err := s.Sync(context.Background()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use some global cancel context to avoid this blocking as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole idea is that we want this to block so nothing is ingested before old blocks are flushed and uploaded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you think that it will cause issues when you try to exit the receiver in the middle of an upload and it blocks here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already had this discussion with @brancz on this PR, but I'm not sure where GitHub put that comment history :/ . The original implementation had a 10-minute timeout on the context and the suggestion was to remove that. Let's cover that question in a follow rather than go back and forth.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I wasn't aware you discussed it already.

When I said a global context I meant to be cancel-able like the rest of the actors groups.

cmd/thanos/receive.go Outdated Show resolved Hide resolved
@squat
Copy link
Member Author

squat commented Sep 26, 2019

ping @krasi-georgiev @brancz PTAL again (:

pkg/receive/tsdb.go Outdated Show resolved Hide resolved
Copy link
Member

@brancz brancz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm but giving @krasi-georgiev another chance to review

@squat squat force-pushed the reshard branch 2 times, most recently from 9b6c22a to 0e32610 Compare September 26, 2019 16:17
Copy link
Contributor

@krasi-georgiev krasi-georgiev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I think it is much better than before.

func HashringFromConfig(ctx context.Context, updates chan<- Hashring, cw *ConfigWatcher) {
go cw.Run(ctx)
defer close(updates)
Copy link
Contributor

@krasi-georgiev krasi-georgiev Sep 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it a widely accepted that only the sender should close a channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goroutine IS the sender. In the single-node case, where there is no configuration watcher we have to close it ourselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for my ignorance, could you expand a bit more on that,

I see no reason why it can't be closed somewhere in runReceive ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we don't want to close it in runReceive is because by convention chans should always be closed by their sender to avoid races that send on closed chan. In this case, the HashringFromConfig func runs a goroutine that is the only sender of the given chan, and as such it should be the one to close it. Otherwise, we need to add another layer of synchronization to always ensure that the generator goroutine has returned before we close the given chan. This adds a lot of complexity for little benefit IMO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes 100% correct, just got confused with the channel directions. Thanks for the patience!

pkg/receive/tsdb.go Show resolved Hide resolved
pkg/receive/handler.go Outdated Show resolved Hide resolved
Signed-off-by: Lucas Servén Marín <[email protected]>
In order to allow the hashring of receive nodes to scale at runtime, we
need to temporarily stop the storage, flush it, and upload the blocks.
This commit ensures that whenever a hashring change is detected, the
entire storage is flushed and the shipper is notified so that is uploads
any new blocks. It also ensures that when the receive component starts
up, any in-progress WAL is flushed and uploaded. This ensures that
new data that may belong to a different tenant is not mixed with old
data. Finally, it also ensures that the storage is flushed and uploaded
whenever the process shuts down.

Signed-off-by: Lucas Servén Marín <[email protected]>
@krasi-georgiev
Copy link
Contributor

LGTM

@brancz brancz merged commit 91fdbd6 into thanos-io:master Sep 30, 2019
@squat squat deleted the reshard branch September 30, 2019 11:50
@dtrejod dtrejod mentioned this pull request Oct 1, 2019
GiedriusS pushed a commit that referenced this pull request Oct 28, 2019
* pkg/receive: create flushable storage

Signed-off-by: Lucas Servén Marín <[email protected]>

* cmd,pkg: flush storage when hashring changes

In order to allow the hashring of receive nodes to scale at runtime, we
need to temporarily stop the storage, flush it, and upload the blocks.
This commit ensures that whenever a hashring change is detected, the
entire storage is flushed and the shipper is notified so that is uploads
any new blocks. It also ensures that when the receive component starts
up, any in-progress WAL is flushed and uploaded. This ensures that
new data that may belong to a different tenant is not mixed with old
data. Finally, it also ensures that the storage is flushed and uploaded
whenever the process shuts down.

Signed-off-by: Lucas Servén Marín <[email protected]>
Signed-off-by: Giedrius Statkevičius <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants