Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filequeue functionality #1601

Merged
merged 16 commits into from
Sep 6, 2024
Merged

Add filequeue functionality #1601

merged 16 commits into from
Sep 6, 2024

Conversation

mattdurham
Copy link
Collaborator

@mattdurham mattdurham commented Sep 3, 2024

FileQueue offers the lowest level functionality, it writes a block of data and meta data to a file. Then pushes the data out. This also serves as the introduction to the actor framework, primarily via DoWork function. This allows us to avoid mutexes and have a well defined workflow that will be reused.

@mattdurham mattdurham marked this pull request as ready for review September 3, 2024 18:32
@mattdurham mattdurham changed the title WIP: Add filequeue section Add filequeue functionality Sep 3, 2024
Copy link
Contributor

@thampiotr thampiotr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round! It's looking good, happy it's comparably lean 👍

@mattdurham mattdurham requested a review from thampiotr September 5, 2024 14:42
if err != nil {
return "", err
}
err = q.writeFile(name, rBuf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One file = one piece of data implies that we probably want to have a good level of batching for this queue and probably it's worth calling out that it's going to work better with larger data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general yes, in the case of a single node exporter self scraping its kilobytes. The default is it writes 10,000 signals in the full PR with a flush timer to ensure its timely. When testing internally the files after snappy compressed were megabytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that likely will work if you write entire scrapes to the queue and I'm fine with that... if you started to write single samples then it's gonna be a lot of overhead. That's all fine as long as we make it somewhat clear that the queue is optimised only for some use patterns.

BTW, I'd like to see some metrics on how the queue performs, but I'm happy for this to be in future PRs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some e2e benchmarks in the full pr. The file queue is really cheap though.

"github.com/vladopajic/go-actor/actor"
)

func BenchmarkFileQueue(t *testing.B) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can you add as a comment the current results of the benchmark for some future reference? I think it can be useful in certain cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to drop this benchmark, its not super useful. There are better end to end benchmarks that track it better.

require.NoError(t, err)
q.Start()
defer q.Stop()
err = q.Store(context.Background(), nil, []byte("test"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd imagine we want to create one q and store something t.N times.

require.Len(t, meta, 0)

// Ensure nothing new comes through.
timer := time.NewTicker(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this make the benchmark always last 100ms+?

require.NoError(t, err)
q.Start()
defer q.Stop()
err = q.Store(context.Background(), nil, []byte("test"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how it works for different message lengths too.

require.NoError(t, err)

// Send is async so may need to wait a bit for it happen.
require.Eventually(t, func() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be flaky? when Eventually doesn't catch the 1.committed existing, cause it appears and disappears too fast?

Would be good to run this test with -count 100 or sth to validate it's not flaky if you didn't already :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised that we won't delete the file until we call Get on the data handle... not sure if I like this side-effect, let's discuss in another thread.

Comment on lines 121 to 123
Get: func() (map[string]string, []byte, error) {
return get(q.logger, name)
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So a call to get will delete the file, and thus types.DataHandle.Get() can only be called once because it has a side-effect (unexpected for a method called Get, btw)

I think that's an important behaviour that needs to be documented. But also not sure if we want this... can you share some context why we do a lazy file reading via DataHandle and delete it only after it's read?

What happens if Get is never called for some reason? Seems like we'd leak a file and it would be picked up again on next run?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assumption (that I should document) is that Get is called when the caller is ready to process the file, to limit the amount of lost data and limit the amount of memory in use. The out queue (shown in upcoming PR) has a capacity of one, so if Get is never called then data was never sent. This allows us to err on the side of dropping data versus sending duplicates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should make it super clear in the name that you can retrieve data once only and there's a side-effect of it being deleted forever. Maybe Pop as it makes sense in the context of the queue.

This allows us to err on the side of dropping data versus sending duplicates.

Wouldn't we prefer to send duplicates? Or is the retry / backoff handled further down, in next PRs?

I'm also not clear on the implications of Get never being called for some reason (e.g. error). Seems like a file won't be deleted, but we will continue sending DataHandles for the subsequent elements in the queue. So over time we can build up a bunch of unprocessed files left behind? At startup we would try to re-send them and likely fail due to too-old-timestamp. Maybe we should just make sure that Get is always called, even when error happens - it's not the cleanest way, but at least allows us to keep doing this lazy read from disk pattern. I'm cool if we decide that's what we want to do, but let's leave some comments behind ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry/Backoff is handled in the network loop. In regards to get not being called the actual implementation of the callee processes them one at a time. If Get is not called then it should be retried on restart which would requeue it. In the main PR the lifecycles of the FileQueue and the Endpoint (callee in practice), are tied together at the component level. From the filequeue perspective the endpoint processes the file sequentially, deserializing and then adding to the network buffer, and stops pulling files when the network buffer is full.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pop is more accurate to whats going on so will change that and add a comment.

@mattdurham mattdurham merged commit 4670f64 into dev.new-wal Sep 6, 2024
14 of 15 checks passed
@mattdurham mattdurham deleted the file_queue branch September 6, 2024 15:04
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Oct 7, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants