diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index aeda1c64968b..df7a5e4b7df7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for username in cisco asa security negotiation logs {pull}26975[26975] - Relax time parsing and capture group and session type in Cisco ASA module {issue}24710[24710] {pull}28325[28325] - Fix initialization of http client in Cloudfoundry input. {issue}28271[28271] {pull}28277[28277] +- Correctly track bytes read when max_bytes is exceeded. {issue}28317[28317] {pull}28352[28352] *Heartbeat* diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 78331a7d246f..d2db172706b7 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -75,8 +75,13 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { }, nil } -// Next reads the next line until the new line character -func (r *LineReader) Next() ([]byte, int, error) { +// Next reads the next line until the new line character. The return +// value b is the byte slice that contains the next line. The return +// value n is the number of bytes that were consumed from the +// underlying reader to read the next line. If the LineReader is +// configured with maxBytes n may be larger than the length of b due +// to skipped lines. +func (r *LineReader) Next() (b []byte, n int, err error) { // This loop is need in case advance detects an line ending which turns out // not to be one when decoded. If that is the case, reading continues. for { @@ -162,6 +167,7 @@ func (r *LineReader) advance() error { for idx != -1 && idx > r.maxBytes { r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx) err = r.inBuffer.Advance(idx + len(r.nl)) + r.byteCount += idx + len(r.nl) r.inBuffer.Reset() r.inOffset = 0 idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) @@ -175,6 +181,7 @@ func (r *LineReader) advance() error { return err } r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped) + r.byteCount += skipped idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) } } diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index 4f20e4de1610..8ed0a07fae46 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -365,7 +365,7 @@ func TestMaxBytesLimit(t *testing.T) { // Read decodec lines and test var idx int for i := 0; ; i++ { - b, n, err := reader.Next() + b, _, err := reader.Next() if err != nil { if err == io.EOF { break @@ -386,12 +386,7 @@ func TestMaxBytesLimit(t *testing.T) { break } - gotLen := n - len(nl) s := string(b[:len(b)-len(nl)]) - if len(line) != gotLen { - t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen) - } - if line != s { t.Fatalf("lines do not match, expected: %s got: %s", line, s) }