-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support summary and detailed file metrics in filestream input #25045
Conversation
💔 Tests Failed
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪Test errorsExpand to view the tests failures
|
Test | Results |
---|---|
Failed | 1 |
Passed | 5235 |
Skipped | 644 |
Total | 5880 |
Genuine test errors
💔 There are test failures but not known flaky tests, most likely a genuine test failure.
- Name:
Build&Test / filebeat-build / TestFilestreamCloseRemoved – github.com/elastic/beats/v7/filebeat/input/filestream
+1 on metrics. I wonder if we could rather have the prospector provide the metrics? Then we can also report files that are not actively collected (e.g. due to harvester_limit). Related discussion on organizing metrics for inputs: https://github.com/elastic/obs-dc-team/issues/401 |
I thought the same thing, but I decided against it because if we add metrics to the Also, why would we want to report files that we are not collecting? Maybe we should add metrics for the file scanner? |
c4480de
to
46d729a
Compare
Pinging @elastic/agent (Team:Agent) |
After sleeping on it, I moved global metrics to |
Failing tests are unrelated. |
This pull request is now in conflicts. Could you fix it? 🙏
|
As user I'm curious to know about the amount of unprocessed data + if it is growing or shrinking.
By not taking files into account that we do not harvest, we can't measure the total disk usage of the files we are watching + we can't tell the exact backlog in case a harvester_limit has been configured. The files we actively read is just a subset of the files we are watching. The measurement should also only contains the files we are collecting. As long as a file is not removed from disk (or still held by FB), it should be accounted for.
That might be an option. Or the prospector. Essentially the entity that is responsible for filtering files by path. |
So if I understand correctly you want to collect detailed |
Yes. All files that match |
507b602
to
de08033
Compare
This pull request is now in conflicts. Could you fix it? 🙏
|
03d1110
to
367b2bc
Compare
filebeat/input/filestream/input.go
Outdated
inp.detailedMetrics[srcID].updateCurrentSize(fi.Size()) | ||
return nil | ||
}) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alternatively to actively probing the file with fstat
, we could define a function based metric to save CPU. The function based metric will query the size only on demand, and only if the last file size update is >10s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "only if the last file size update is >10s"? The file was updated 10 seconds ago?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, that instead of pro-actively updating the metrics over and over again, we could make it lazy via the metrics callback. Instead of spinning a go-routine we check the state on disk lazily.
The callback would cache the last size (or maybe even gets its update from the last file watcher scan), and only call fstat if the cached value has not been updated withing the last 10 seconds.
By making it 'cheaper' when the metrics are not required, we don't really need a setting to enable/disable metrics support. It will just always be there.
filebeat/input/filestream/input.go
Outdated
inp.detailedMetrics[srcID] = newProgressMetrics(inp.ID, path, srcID, monitorCancel) | ||
go func() { | ||
timed.Periodic(monitorCtx, 30*time.Second, func() error { | ||
fi, err := f.Stat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This go-routine is not cleaned up when the harvester is shutdown. That is, we will likely fail here because the file was already closed. Unfortunately we shouldn't keep the file open, in order to not exhaust file descriptors. Alternatively we might want the prospector/file watcher to update the size.
This reminds me. If possible, the reader should also update the internal 'size' one it hits EOF. That is, the reader should not read until EOF, but until the internal size. Once the limit is reached, using Stat to update the size and continue reading if the size is increased. This is e.g. helpful when reading from NFS, as the stat
circumvents 0 reads, as it can force extra roundtrips at the protocol level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't it cleaned up? The canceler is from the input.Context
, so if the parent is cancelled, it should be cancelled as well. What did I miss?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I missed that the monitorCtx
is derived from the input context. Nevermind.
There is still a potential race on f
, as the overall input shutdown does not wait for the monitor to finish before the file is closed.
7fae87b
to
6ebf0ba
Compare
I have updated the PR. |
"github.com/elastic/beats/v7/libbeat/common" | ||
"github.com/elastic/beats/v7/libbeat/common/match" | ||
"github.com/elastic/beats/v7/libbeat/reader/readfile" | ||
) | ||
|
||
// Config stores the options of a file stream. | ||
type config struct { | ||
ID string `config:"id"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document how ID is used?
Is it used for registry state?
Is it used for monitoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added documentation.
@@ -43,6 +45,8 @@ const pluginName = "filestream" | |||
|
|||
type state struct { | |||
Offset int64 `json:"offset" struct:"offset"` | |||
|
|||
id string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state is going to be stored in the regsitry file. Do we really need the ID here? Does the ID change?
go func() { | ||
m := m | ||
timed.Periodic(monitorCtx, 30*time.Second, func() error { | ||
fi, err := f.Stat() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The (*filestream).Run
method will close f
on shutdown. This go-routine should be guaranteed to be stopped before Run
closes the file.
filebeat/input/filestream/input.go
Outdated
f, err := inp.openFile(log, path, offset) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
inp.startMonitoring(log, canceler, srcID, path, f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we want to move startMonitoring
to Run
, in order to make it more obvious which tasks an input is running. This would require us to split open
into openFile
and newReader
.
@@ -48,25 +48,36 @@ type Harvester interface { | |||
// Run is the event loop which reads from the source | |||
// and forwards it to the publisher. | |||
Run(input.Context, Source, Cursor, Publisher) error | |||
// Monitor is required for detailed metrics. | |||
Monitor(input.Context, Source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having a Monitor
per harvester, I wonder if we should have the file watcher do the reporting. The file watcher would report the size update and we would find and update the actual state.
This pull request is now in conflicts. Could you fix it? 🙏
|
160affa
to
b202b46
Compare
This pull request is now in conflicts. Could you fix it? 🙏
|
Hi! We're labeling this issue as |
Hi! |
I am reopening the PR because it has to be added to filestream. |
This pull request does not have a backport label. Could you fix it @kvch? 🙏
NOTE: |
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
@kvch is this PR pending for review? |
Stalled, we will need time to clean up this. |
What does this PR do?
This PR adds support for metrics in
filestream
input. The summary of reader metrics is enabled by default so the same numbers are reported as inlog
input.More detailed metrics are disabled by default. Users can enable it by setting
detailed_metrics
to true. These metrics are the same as whatlog
input provides. But as we have received numerous complaints about it not being configurable, I have separated it from the other metrics. Now users can configure whether they would like to collect these metrics or not.Why is it important?
It makes the inner parts of
filestream
observable. Required for feature-parity withlog
input.Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have added tests that prove my fix is effective or that my feature works- [ ] I have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.