-
Notifications
You must be signed in to change notification settings - Fork 249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add filequeue functionality #1601
Conversation
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round! It's looking good, happy it's comparably lean 👍
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
internal/component/prometheus/remote/queue/filequeue/filequeue.go
Outdated
Show resolved
Hide resolved
if err != nil { | ||
return "", err | ||
} | ||
err = q.writeFile(name, rBuf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One file = one piece of data implies that we probably want to have a good level of batching for this queue and probably it's worth calling out that it's going to work better with larger data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general yes, in the case of a single node exporter self scraping its kilobytes. The default is it writes 10,000
signals in the full PR with a flush timer to ensure its timely. When testing internally the files after snappy compressed were megabytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that likely will work if you write entire scrapes to the queue and I'm fine with that... if you started to write single samples then it's gonna be a lot of overhead. That's all fine as long as we make it somewhat clear that the queue is optimised only for some use patterns.
BTW, I'd like to see some metrics on how the queue performs, but I'm happy for this to be in future PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some e2e benchmarks in the full pr. The file queue is really cheap though.
"github.com/vladopajic/go-actor/actor" | ||
) | ||
|
||
func BenchmarkFileQueue(t *testing.B) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can you add as a comment the current results of the benchmark for some future reference? I think it can be useful in certain cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to drop this benchmark, its not super useful. There are better end to end benchmarks that track it better.
require.NoError(t, err) | ||
q.Start() | ||
defer q.Stop() | ||
err = q.Store(context.Background(), nil, []byte("test")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd imagine we want to create one q
and store something t.N
times.
require.Len(t, meta, 0) | ||
|
||
// Ensure nothing new comes through. | ||
timer := time.NewTicker(100 * time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this make the benchmark always last 100ms+?
require.NoError(t, err) | ||
q.Start() | ||
defer q.Stop() | ||
err = q.Store(context.Background(), nil, []byte("test")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder how it works for different message lengths too.
require.NoError(t, err) | ||
|
||
// Send is async so may need to wait a bit for it happen. | ||
require.Eventually(t, func() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be flaky? when Eventually doesn't catch the 1.committed existing, cause it appears and disappears too fast?
Would be good to run this test with -count 100
or sth to validate it's not flaky if you didn't already :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realised that we won't delete the file until we call Get
on the data handle... not sure if I like this side-effect, let's discuss in another thread.
Get: func() (map[string]string, []byte, error) { | ||
return get(q.logger, name) | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So a call to get
will delete the file, and thus types.DataHandle.Get() can only be called once because it has a side-effect (unexpected for a method called Get, btw)
I think that's an important behaviour that needs to be documented. But also not sure if we want this... can you share some context why we do a lazy file reading via DataHandle and delete it only after it's read?
What happens if Get
is never called for some reason? Seems like we'd leak a file and it would be picked up again on next run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assumption (that I should document) is that Get
is called when the caller is ready to process the file, to limit the amount of lost data and limit the amount of memory in use. The out
queue (shown in upcoming PR) has a capacity of one, so if Get
is never called then data was never sent. This allows us to err on the side of dropping data versus sending duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make it super clear in the name that you can retrieve data once only and there's a side-effect of it being deleted forever. Maybe Pop
as it makes sense in the context of the queue.
This allows us to err on the side of dropping data versus sending duplicates.
Wouldn't we prefer to send duplicates? Or is the retry / backoff handled further down, in next PRs?
I'm also not clear on the implications of Get never being called for some reason (e.g. error). Seems like a file won't be deleted, but we will continue sending DataHandles for the subsequent elements in the queue. So over time we can build up a bunch of unprocessed files left behind? At startup we would try to re-send them and likely fail due to too-old-timestamp. Maybe we should just make sure that Get is always called, even when error happens - it's not the cleanest way, but at least allows us to keep doing this lazy read from disk pattern. I'm cool if we decide that's what we want to do, but let's leave some comments behind ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry/Backoff is handled in the network loop. In regards to get not being called the actual implementation of the callee processes them one at a time. If Get is not called then it should be retried on restart which would requeue it. In the main PR the lifecycles of the FileQueue and the Endpoint (callee in practice), are tied together at the component level. From the filequeue perspective the endpoint processes the file sequentially, deserializing and then adding to the network buffer, and stops pulling files when the network buffer is full.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pop is more accurate to whats going on so will change that and add a comment.
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
Co-authored-by: Piotr <[email protected]>
FileQueue offers the lowest level functionality, it writes a block of data and meta data to a file. Then pushes the data out. This also serves as the introduction to the actor framework, primarily via DoWork function. This allows us to avoid mutexes and have a well defined workflow that will be reused.