-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add max bytes in line limit #19552
Add max bytes in line limit #19552
Conversation
This pull request doesn't have a |
Pinging @elastic/integrations-services (Team:Services) |
hmm, seems like a problem with CI build
|
💔 Build FailedExpand to view the summary
Build stats
Test stats 🧪
Steps errorsExpand to view the steps failures
Log outputExpand to view the last 100 lines of log output
|
The today's comment on the original issue was a suggestion to skip the long lines instead of returning the error. Will update PR later today. |
Updated PR to skip the long lines, logging the warning and continuing processing. New log screenshot, running on the 100M suggested in the test case with couple of lines appended. Using the same MaxSize from the config that is used for the limiter at the end of the readers pipeline, multiplied by 4 since the checks and skipping happen before decoding the data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome! Thanks so much for the quick and best fix. Looking forward to deploying this.
libbeat/reader/readfile/line.go
Outdated
|
||
// If newLine is not found and exceeded max bytes limit | ||
if idx == -1 && r.maxBytes != 0 && r.inBuffer.Len() > r.maxBytes { | ||
return ErrExceededMaxBytesLimit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this abort reading the file forever? If other words, if max_bytes is 1024 and line 50 is 3000 bytes, will line 51+ never be read?
If so, would be it possible to, in lieu of returning an error, continue reading until the new-line but do not add what we read to r.inBuffer
, effectively just ignoring the line. So in my example 50 would be skipped, but 51 could be read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you commented on the first commit. This code was changed by the second commit in this PR.
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters. | ||
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file. | ||
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed. | ||
encReaderMaxBytes := h.config.MaxBytes * 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may want to add a comment in docs for max_bytes that it can use up to 4x the number worth of memory for buffering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure can do that. Which docs? Before this change it was taking all available memory :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
As a user I would want that information here
The maximum number of bytes that a single log message can have. All bytes after max_bytes are discarded and not sent., but up to 4x max_bytes will be buffered in-memory .This setting is especially useful for multiline log messages, which can get large. The default is 10MB (10485760).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the second thought I'm not sure that this part of internal implementation should be documented.
Before this change the code was allocating as much as it was available and cutting the final message to the max byte limit before the output from the pipeline.
This change adds a protection from unlimited allocation.
I'll let the regular beats contributors that are going to review this PR to decide if this needs to be documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. I will make a quick pitch and leave it to you and reviewers to decide: without that kind of information, it is hard to plan. For example, how can I determine how much memory I will be safe giving filebeat? I may think that if I tell it to read only 1 file at a time, and tell it to use max_bytes of 25MB, that I am very safe giving it 50MB of memory, only to be sad when it OOMs because it actually used 100MB of memory for 1 line that I thought was limited to 25.
Personally I find it already very hard to figure out how much memory it needs because i have to do match like + max_events * num_harvesters * max_size - and im scared of more hidden values (like * 4)
Can reduce the memory footprint by not caching the line data before decoding and decoding every read. Need to measure how it affects the performance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you taken a look at the functions of streambuf
? Is AppendWithCapLimits
not usable here? (https://github.com/elastic/beats/blob/master/libbeat/common/streambuf/streambuf.go#L176)
@kvch if I'm reading the implementation correctly beats/libbeat/common/streambuf/streambuf.go Line 129 in efa5eba
it caps the the number of byte in append operation, while potentially doing huge (10MB default) allocations for backing array: beats/libbeat/common/streambuf/streambuf.go Line 145 in efa5eba
and not really limiting the buffer size growth beats/libbeat/common/streambuf/streambuf.go Line 158 in efa5eba
It looks like the purpose of Please explain, if I misunderstood the way you are proposing to use this. |
libbeat/reader/readfile/line.go
Outdated
@@ -28,12 +28,15 @@ import ( | |||
"github.com/elastic/beats/v7/libbeat/logp" | |||
) | |||
|
|||
const Unlimited = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not export this variable if you are not using it outside the readfile
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constant potentially could be used for the public NewLineReader config param.
Let me know if still want to hide it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I am not a huge fan of exposing something we would potentially use. In general, it is easier to export something unexported, then unexport something exported.
But I do not block this PR because of a philosophical question about a small constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will update to not export.
libbeat/reader/readfile/line_test.go
Outdated
@@ -31,10 +35,12 @@ import ( | |||
) | |||
|
|||
// Sample texts are from http://www.columbia.edu/~kermit/utf8.html | |||
var tests = []struct { | |||
type LineTestSpec struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not export this struct as you are not using it from outside the package. Also, rather name it listTestCase
. The name you have given it is not idiomatic to this repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
libbeat/reader/readfile/line_test.go
Outdated
} | ||
|
||
func TestMaxBytesLimit(t *testing.T) { | ||
const ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please extract the setup part of the test out into a separate function? The way the test is structured now is hard to read and understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code itself look good, I have a few minor comments. Please refactor the test you have provided.
@aleksmaus You are right. I was just wondering if we could use it. |
Addressed all of the comments. Let me know if anything else needs to be changed. |
jenkins run tests |
@aleksmaus Sorry for the long wait. The code in general looks good to me. However, when a line is truncated Filebeat flags that message by adding "truncated" to the list in |
@aleksmaus Indeed. Could you please rebase the PR and add a changelog entry to |
Have we also considered truncating the line before logging? We have seen ~1Gb of junk causing filebeat to OOM. It might be helpful to truncate first before logging. Perhaps only log the first 1024 characters or something for the dropped line? |
the line drop happens before decoding. would have to try to decode for that. the number of characters will be known only after decoding. also the junk will not be necessarily decodable. |
Ahh, not logging the line content itself. Just the fact that the line is dropped and the length in bytes. |
@aleksmaus Thank you for the contribution! |
(cherry picked from commit 0e049f0)
…-basis-when * upstream/master: [CI] run everything for branches/tags (elastic#20057) Limit the number of bytes read by LineReader in Filebeat (elastic#19552) Prefer testify.Error/NoError over NotNil/Nil (elastic#20044)
…0081) (cherry picked from commit 0e049f0) Co-authored-by: Aleksandr Maus <[email protected]>
(cherry picked from commit 0e049f0)
…0087) (cherry picked from commit 0e049f0) Co-authored-by: Aleksandr Maus <[email protected]>
chery pick from elastic#19552
* bugfix: Add max bytes in line limit elastic#19552 chery pick from elastic#19552 * feature: add file reuse harvester * bugfix: harvester cleanup & state update
What does this PR do?
Handles OOM on long lines gracefully, writes the error into log.
Why is it important?
Handle bad data gracefully without OOM.
Checklist
Author's Checklist
How to test this PR locally
Refer to #19500 for test case.
The filebeat doesn't get killed due to OOM with this change and the error is logged (see the screenshot attached).
Related issues
Screenshots
Logs