From 55f74edc746bc5f89def13af8cb11cfae53e8140 Mon Sep 17 00:00:00 2001 From: Emmanuel Touzery Date: Fri, 3 May 2024 11:22:43 +0200 Subject: [PATCH] fix high IO after sudden filebeat stop (#35893) In case of corrupted log file (which has good chances to happen in case of sudden unclean system shutdown), we set a flag which causes us to checkpoint immediately, but never do anything else besides that. This causes filebeat to just checkpoint on each log operation (therefore causing a high IO load on the server and also causing filebeat to fall behind). This change resets the logInvalid flag after a successful checkpointing. --- .../statestore/backend/memlog/diskstore.go | 1 + .../statestore/backend/memlog/store_test.go | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 libbeat/statestore/backend/memlog/store_test.go diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go index bc44263206d8..1658ef1d55ad 100644 --- a/libbeat/statestore/backend/memlog/diskstore.go +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -402,6 +402,7 @@ func (s *diskstore) checkpointClearLog() { err := s.logFile.Truncate(0) if err == nil { _, err = s.logFile.Seek(0, io.SeekStart) + s.logInvalid = false } if err != nil { diff --git a/libbeat/statestore/backend/memlog/store_test.go b/libbeat/statestore/backend/memlog/store_test.go new file mode 100644 index 000000000000..192248acaa43 --- /dev/null +++ b/libbeat/statestore/backend/memlog/store_test.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func init() { + logp.DevelopmentSetup() +} + +func TestRecoverFromCorruption(t *testing.T) { + path := t.TempDir() + defer os.RemoveAll(path) + + if err := copyPath(path, "testdata/1/logfile_incomplete/"); err != nil { + t.Fatalf("Failed to copy test file to the temporary directory: %v", err) + } + + store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, false, func(_ uint64) bool { + return false + }) + + assert.Equal(t, true, store.disk.logInvalid) + + err = store.logOperation(&opSet{K: "key", V: mapstr.M{ + "field": 42, + }}) + assert.NoError(t, err) + assert.Equal(t, false, store.disk.logInvalid) +}