From 0d5596f3f5bba6229f1b33cb15a308f9985c34de Mon Sep 17 00:00:00 2001 From: liuwenping Date: Tue, 7 Jun 2022 19:35:12 +0800 Subject: [PATCH] [Filebeat] keep track of bytes read when max_bytes exceeded (#31824) Keep track of bytes read when max_bytes exceeded Add test code for exceeding max bytes in filebeat --- libbeat/reader/readfile/line.go | 2 +- libbeat/reader/readfile/line_test.go | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index 2f21f225aa90..e13b2a8fb00d 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -235,7 +235,7 @@ func (r *LineReader) skipUntilNewLine() (int, error) { if idx != -1 { r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n]) - skipped += idx + skipped += idx + len(r.nl) } else { skipped += n } diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index 17fdfcf10391..68f257bf45db 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -364,9 +364,13 @@ func TestMaxBytesLimit(t *testing.T) { } // Read decodec lines and test - var idx int + var ( + idx int + readLen int + ) + for i := 0; ; i++ { - b, _, err := reader.Next() + b, n, err := reader.Next() if err != nil { if err == io.EOF { break @@ -387,11 +391,16 @@ func TestMaxBytesLimit(t *testing.T) { break } + readLen += n s := string(b[:len(b)-len(nl)]) if line != s { t.Fatalf("lines do not match, expected: %s got: %s", line, s) } } + + if len(input) != readLen { + t.Fatalf("the bytes read are not equal to the bytes input, expected: %d got: %d", len(input), readLen) + } } // test_exceed_buffer from test_harvester.py