diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7bd8f06e8abd..a45fa80df12c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v6.8.0...6.8.1[Check the HEAD diff] *Filebeat* +- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index c0422538642e..12a26172e2f8 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -557,6 +557,8 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { var r reader.Reader var err error + logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes) + // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. @@ -570,7 +572,13 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return nil, err } - r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize) + // Configure MaxBytes limit for EncodeReader as multiplied by 4 + // for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters. + // This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file. + // The further size limiting is performed by LimitReader at the end of the readers pipeline as needed. + encReaderMaxBytes := h.config.MaxBytes * 4 + + r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize, encReaderMaxBytes) if err != nil { return nil, err } diff --git a/filebeat/reader/multiline/multiline_test.go b/filebeat/reader/multiline/multiline_test.go index 96dfd6922ccc..205e69c18631 100644 --- a/filebeat/reader/multiline/multiline_test.go +++ b/filebeat/reader/multiline/multiline_test.go @@ -277,7 +277,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade } var r reader.Reader - r, err = readfile.NewEncodeReader(in, enc, 4096) + r, err = readfile.NewEncodeReader(in, enc, 4096, 4096) if err != nil { t.Fatalf("Failed to initialize line reader: %v", err) } diff --git a/filebeat/reader/readfile/encode.go b/filebeat/reader/readfile/encode.go index a86e6541bec6..b68591bc67e0 100644 --- a/filebeat/reader/readfile/encode.go +++ b/filebeat/reader/readfile/encode.go @@ -36,9 +36,9 @@ type EncoderReader struct { func NewEncodeReader( r io.Reader, codec encoding.Encoding, - bufferSize int, + bufferSize, maxBytes int, ) (EncoderReader, error) { - eReader, err := NewLineReader(r, codec, bufferSize) + eReader, err := NewLineReader(r, codec, bufferSize, maxBytes) return EncoderReader{eReader}, err } diff --git a/filebeat/reader/readfile/line.go b/filebeat/reader/readfile/line.go index a2656884868d..5df107f516d8 100644 --- a/filebeat/reader/readfile/line.go +++ b/filebeat/reader/readfile/line.go @@ -18,6 +18,7 @@ package readfile import ( + "bytes" "io" "golang.org/x/text/encoding" @@ -34,6 +35,7 @@ type LineReader struct { reader io.Reader codec encoding.Encoding bufferSize int + maxBytes int nl []byte inBuffer *streambuf.Buffer outBuffer *streambuf.Buffer @@ -43,7 +45,7 @@ type LineReader struct { } // New creates a new reader object -func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) { +func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize, maxBytes int) (*LineReader, error) { encoder := codec.NewEncoder() // Create newline char based on encoding @@ -56,6 +58,7 @@ func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*L reader: input, codec: codec, bufferSize: bufferSize, + maxBytes: maxBytes, nl: nl, decoder: codec.NewDecoder(), inBuffer: streambuf.New(nil), @@ -138,6 +141,29 @@ func (r *LineReader) advance() error { // Check if buffer has newLine character idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) + + // If max bytes limit per line is set, then drop the lines that are longer + if r.maxBytes != 0 { + // If newLine is found, drop the lines longer than maxBytes + for idx != -1 && idx > r.maxBytes { + logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx) + err = r.inBuffer.Advance(idx + len(r.nl)) + r.inBuffer.Reset() + r.inOffset = 0 + idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) + } + + // If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine + if idx == -1 && r.inBuffer.Len() > r.maxBytes { + skipped, err := r.skipUntilNewLine(buf) + if err != nil { + logp.Err("Error skipping until new line, err: %s", err) + return err + } + logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped) + idx = r.inBuffer.IndexFrom(r.inOffset, r.nl) + } + } } // found encoded byte sequence for '\n' in buffer @@ -163,6 +189,49 @@ func (r *LineReader) advance() error { return err } +func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) { + // The length of the line skipped + skipped := r.inBuffer.Len() + + // Clean up the buffer + err := r.inBuffer.Advance(skipped) + r.inBuffer.Reset() + + // Reset inOffset + r.inOffset = 0 + + if err != nil { + return 0, err + } + + // Read until the new line is found + for idx := -1; idx == -1; { + n, err := r.reader.Read(buf) + + // Check bytes read for newLine + if n > 0 { + idx = bytes.Index(buf[:n], r.nl) + + if idx != -1 { + r.inBuffer.Append(buf[idx+len(r.nl) : n]) + skipped += idx + } else { + skipped += n + } + } + + if err != nil { + return skipped, err + } + + if n == 0 { + return skipped, streambuf.ErrNoMoreBytes + } + } + + return skipped, nil +} + func (r *LineReader) decode(end int) (int, error) { var err error buffer := make([]byte, 1024) diff --git a/filebeat/reader/readfile/line_test.go b/filebeat/reader/readfile/line_test.go index 9d244f0decfa..e9d145015e12 100644 --- a/filebeat/reader/readfile/line_test.go +++ b/filebeat/reader/readfile/line_test.go @@ -21,8 +21,12 @@ package readfile import ( "bytes" + "encoding/hex" + "io" "math/rand" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "golang.org/x/text/transform" @@ -68,7 +72,7 @@ func TestReaderEncodings(t *testing.T) { } // create line reader - reader, err := NewLineReader(buffer, codec, 1024) + reader, err := NewLineReader(buffer, codec, 1024, 1024) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -159,7 +163,8 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) codec, _ := encoding.Plain(buffer) - reader, err := NewLineReader(buffer, codec, buffer.Len()) + bufLen := buffer.Len() + reader, err := NewLineReader(buffer, codec, bufLen, bufLen) if err != nil { t.Fatalf("Error initializing reader: %v", err) } @@ -185,3 +190,137 @@ func testReadLines(t *testing.T, inputLines [][]byte) { func testReadLine(t *testing.T, line []byte) { testReadLines(t, [][]byte{line}) } + +func randomInt(r *rand.Rand, min, max int) int { + return r.Intn(max+1-min) + min +} + +func randomBool(r *rand.Rand) bool { + n := randomInt(r, 0, 1) + return n != 0 +} + +func randomBytes(r *rand.Rand, sz int) ([]byte, error) { + bytes := make([]byte, sz) + if _, err := rand.Read(bytes); err != nil { + return nil, err + } + return bytes, nil +} + +func randomString(r *rand.Rand, sz int) (string, error) { + if sz == 0 { + return "", nil + } + + var bytes []byte + var err error + if bytes, err = randomBytes(r, sz/2+sz%2); err != nil { + return "", err + } + s := hex.EncodeToString(bytes) + return s[:sz], nil +} + +func setupTestMaxBytesLimit(lineMaxLimit, lineLen int, nl []byte) (lines []string, data string, err error) { + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + lineCount := randomInt(rnd, 11, 142) + lines = make([]string, lineCount) + + var b strings.Builder + + for i := 0; i < lineCount; i++ { + var sz int + // Non-empty line + if randomBool(rnd) { + // Boundary to the lineMaxLimit + if randomBool(rnd) { + sz = randomInt(rnd, lineMaxLimit-1, lineMaxLimit+1) + } else { + sz = randomInt(rnd, 0, lineLen) + } + } else { + // Randomly empty or one characters lines(another possibly boundary conditions) + sz = randomInt(rnd, 0, 1) + } + + s, err := randomString(rnd, sz) + if err != nil { + return nil, "", err + } + + lines[i] = s + if len(s) > 0 { + b.WriteString(s) + } + b.Write(nl) + } + return lines, b.String(), nil +} + +func TestMaxBytesLimit(t *testing.T) { + const ( + enc = "plain" + numberOfLines = 102 + bufferSize = 1024 + lineMaxLimit = 3012 + lineLen = 5720 // exceeds lineMaxLimit + ) + + codecFactory, ok := encoding.FindEncoding(enc) + if !ok { + t.Fatalf("can not find encoding '%v'", enc) + } + + buffer := bytes.NewBuffer(nil) + codec, _ := codecFactory(buffer) + + // Generate random lines lengths including empty lines + nl := []byte("\n") + lines, input, err := setupTestMaxBytesLimit(lineMaxLimit, lineLen, nl) + if err != nil { + t.Fatal("failed to generate random input:", err) + } + + // Create line reader + reader, err := NewLineReader(strings.NewReader(input), codec, bufferSize, lineMaxLimit) + if err != nil { + t.Fatal("failed to initialize reader:", err) + } + + // Read decodec lines and test + var idx int + for i := 0; ; i++ { + b, n, err := reader.Next() + if err != nil { + if err == io.EOF { + break + } else { + t.Fatal("unexpected error:", err) + } + } + + // Find the next expected line from the original test array + var line string + for ; idx < len(lines); idx++ { + // Expected to be dropped + if len(lines[idx]) > lineMaxLimit { + continue + } + line = lines[idx] + idx++ + break + } + + gotLen := n - len(nl) + s := string(b[:len(b)-len(nl)]) + if len(line) != gotLen { + t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen) + } + + if line != s { + t.Fatalf("lines do not match, expected: %s got: %s", line, s) + } + } +} diff --git a/filebeat/scripts/tester/main.go b/filebeat/scripts/tester/main.go index 3300b8ef623a..76460c60bbd8 100644 --- a/filebeat/scripts/tester/main.go +++ b/filebeat/scripts/tester/main.go @@ -133,7 +133,7 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) { } var r reader.Reader - r, err = readfile.NewEncodeReader(f, enc, 4096) + r, err = readfile.NewEncodeReader(f, enc, 4096, 4096) if err != nil { return nil, err }