From 19725aa73c1e113e8bbb623c9e0b4a24ace82915 Mon Sep 17 00:00:00 2001 From: Emmanuel Touzery Date: Fri, 3 May 2024 11:22:43 +0200 Subject: [PATCH 1/3] fix high IO after sudden filebeat stop (#35893) In case of corrupted log file (which has good chances to happen in case of sudden unclean system shutdown), we set a flag which causes us to checkpoint immediately, but never do anything else besides that. This causes filebeat to just checkpoint on each log operation (therefore causing a high IO load on the server and also causing filebeat to fall behind). This change resets the logInvalid flag after a successful checkpointing. --- CHANGELOG.next.asciidoc | 1 + .../statestore/backend/memlog/diskstore.go | 1 + .../statestore/backend/memlog/store_test.go | 54 +++++++++++++++++++ 3 files changed, 56 insertions(+) create mode 100644 libbeat/statestore/backend/memlog/store_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index faa8c6b58d27..f87dc01bb151 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Removed deprecated Sophos UTM from Beats. Use the https://docs.elastic.co/integrations/sophos[Sophos] Elastic integration instead. {pull}38037[38037] - Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] +- Fix high IO and disrupted operation after sudden filebeat stop. {pull}35893[35893] *Heartbeat* diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go index bc44263206d8..1658ef1d55ad 100644 --- a/libbeat/statestore/backend/memlog/diskstore.go +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -402,6 +402,7 @@ func (s *diskstore) checkpointClearLog() { err := s.logFile.Truncate(0) if err == nil { _, err = s.logFile.Seek(0, io.SeekStart) + s.logInvalid = false } if err != nil { diff --git a/libbeat/statestore/backend/memlog/store_test.go b/libbeat/statestore/backend/memlog/store_test.go new file mode 100644 index 000000000000..4128228db3c4 --- /dev/null +++ b/libbeat/statestore/backend/memlog/store_test.go @@ -0,0 +1,54 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func init() { + logp.DevelopmentSetup() +} + +func TestRecoverFromCorruption(t *testing.T) { + path := t.TempDir() + defer os.RemoveAll(path) + + if err := copyPath(path, "testdata/1/logfile_incomplete/"); err != nil { + t.Fatalf("Failed to copy test file to the temporary directory: %v", err) + } + + store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, false, func(_ uint64) bool { + return false + }) + assert.NoError(t, err) + + assert.Equal(t, true, store.disk.logInvalid) + + err = store.logOperation(&opSet{K: "key", V: mapstr.M{ + "field": 42, + }}) + assert.NoError(t, err) + assert.Equal(t, false, store.disk.logInvalid) +} From 10e920d0935f8ea43462036abb80813733df35e1 Mon Sep 17 00:00:00 2001 From: Emmanuel Touzery Date: Sun, 19 May 2024 16:41:07 +0200 Subject: [PATCH 2/3] fix diskstore deprecation warnings --- libbeat/statestore/backend/memlog/diskstore.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go index 1658ef1d55ad..a62c031d2ee9 100644 --- a/libbeat/statestore/backend/memlog/diskstore.go +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "os" "path/filepath" "sort" @@ -182,7 +181,7 @@ func (s *diskstore) tryOpenLog() error { f.Close() }) - _, err = f.Seek(0, os.SEEK_END) + _, err = f.Seek(0, io.SeekEnd) if err != nil { return err } @@ -439,7 +438,7 @@ func updateActiveMarker(log *logp.Logger, homePath, checkpointFilePath string) e log.Errorf("Failed to remove old temporary active.dat.tmp file: %v", err) return err } - if err := ioutil.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil { + if err := os.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil { log.Errorf("Failed to write temporary pointer file: %v", err) return err } @@ -539,7 +538,7 @@ func readDataFile(path string, fn func(string, mapstr.M)) error { var states []map[string]interface{} dec := json.NewDecoder(f) if err := dec.Decode(&states); err != nil { - return fmt.Errorf("%w: %v", ErrCorruptStore, err) + return fmt.Errorf("%w: %w", ErrCorruptStore, err) } for _, state := range states { From 2c16d592f7a44154ff97e43f74b5310552473ae1 Mon Sep 17 00:00:00 2001 From: emmanueltouzery Date: Tue, 21 May 2024 21:32:38 +0200 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Tiago Queiroz --- CHANGELOG.next.asciidoc | 2 +- .../statestore/backend/memlog/store_test.go | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f87dc01bb151..30f8599351db 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -42,7 +42,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Removed deprecated Sophos UTM from Beats. Use the https://docs.elastic.co/integrations/sophos[Sophos] Elastic integration instead. {pull}38037[38037] - Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] -- Fix high IO and disrupted operation after sudden filebeat stop. {pull}35893[35893] +- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] *Heartbeat* diff --git a/libbeat/statestore/backend/memlog/store_test.go b/libbeat/statestore/backend/memlog/store_test.go index 4128228db3c4..d0bc650db198 100644 --- a/libbeat/statestore/backend/memlog/store_test.go +++ b/libbeat/statestore/backend/memlog/store_test.go @@ -19,21 +19,18 @@ package memlog import ( "os" + "path/filepath" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) -func init() { - logp.DevelopmentSetup() -} - func TestRecoverFromCorruption(t *testing.T) { path := t.TempDir() - defer os.RemoveAll(path) + logp.DevelopmentSetup() if err := copyPath(path, "testdata/1/logfile_incomplete/"); err != nil { t.Fatalf("Failed to copy test file to the temporary directory: %v", err) @@ -42,13 +39,17 @@ func TestRecoverFromCorruption(t *testing.T) { store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, false, func(_ uint64) bool { return false }) - assert.NoError(t, err) - - assert.Equal(t, true, store.disk.logInvalid) + require.NoError(t, err, "openStore must succeed") + require.True(t, store.disk.logInvalid, "expecting the log file to be invalid") err = store.logOperation(&opSet{K: "key", V: mapstr.M{ "field": 42, }}) - assert.NoError(t, err) - assert.Equal(t, false, store.disk.logInvalid) + require.NoError(t, err, "logOperation must succeed") + require.False(t, store.disk.logInvalid, "log file must be valid") + require.FileExistsf(t, filepath.Join(path, "7.json"), "expecting the checkpoint file to have been created") + + file, err := os.Stat(filepath.Join(path, "log.json")) + require.NoError(t, err, "Stat on the log file must succeed") + require.Equal(t, int64(0), file.Size(), "expecting the log file to be truncated") }