Skip to content

Commit

Permalink
[Filebeat] keep track of bytes read when max_bytes exceeded (elastic#…
Browse files Browse the repository at this point in the history
…28352)

* [Filebeat] keep track of bytes read when max_bytes exceeded in LineReader

Closes elastic#28317
  • Loading branch information
leehinman authored and wiwen committed Nov 1, 2021
1 parent 5758a4b commit fc05b81
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191]
- Add support for username in cisco asa security negotiation logs {pull}26975[26975]
- Relax time parsing and capture group and session type in Cisco ASA module {issue}24710[24710] {pull}28325[28325]
- Correctly track bytes read when max_bytes is exceeded. {issue}28317[28317] {pull}28352[28352]

*Heartbeat*

Expand Down
11 changes: 9 additions & 2 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) {
}, nil
}

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
// Next reads the next line until the new line character. The return
// value b is the byte slice that contains the next line. The return
// value n is the number of bytes that were consumed from the
// underlying reader to read the next line. If the LineReader is
// configured with maxBytes n may be larger than the length of b due
// to skipped lines.
func (r *LineReader) Next() (b []byte, n int, err error) {
// This loop is need in case advance detects an line ending which turns out
// not to be one when decoded. If that is the case, reading continues.
for {
Expand Down Expand Up @@ -162,6 +167,7 @@ func (r *LineReader) advance() error {
for idx != -1 && idx > r.maxBytes {
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
err = r.inBuffer.Advance(idx + len(r.nl))
r.byteCount += idx + len(r.nl)
r.inBuffer.Reset()
r.inOffset = 0
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
Expand All @@ -175,6 +181,7 @@ func (r *LineReader) advance() error {
return err
}
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
r.byteCount += skipped
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
}
}
Expand Down
7 changes: 1 addition & 6 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestMaxBytesLimit(t *testing.T) {
// Read decodec lines and test
var idx int
for i := 0; ; i++ {
b, n, err := reader.Next()
b, _, err := reader.Next()
if err != nil {
if err == io.EOF {
break
Expand All @@ -387,12 +387,7 @@ func TestMaxBytesLimit(t *testing.T) {
break
}

gotLen := n - len(nl)
s := string(b[:len(b)-len(nl)])
if len(line) != gotLen {
t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen)
}

if line != s {
t.Fatalf("lines do not match, expected: %s got: %s", line, s)
}
Expand Down

0 comments on commit fc05b81

Please sign in to comment.