Skip to content

Commit

Permalink
[8.14](backport #39392) [Bug] fix high IO after sudden filebeat stop (#…
Browse files Browse the repository at this point in the history
…35893) (#39842)

* Fix high IO after sudden filebeat stop (#35893) (#39392)

In case of corrupted log file (which has good chances to happen in case
of sudden unclean system shutdown), we set a flag which causes us to
checkpoint immediately, but never do anything else besides that. This
causes Filebeat to just checkpoint on each log operation (therefore
causing a high IO load on the server and also causing Filebeat to fall
behind).

This change resets the logInvalid flag after a successful checkpointing.

Co-authored-by: Tiago Queiroz <[email protected]>
(cherry picked from commit 217f5a6)

* Update CHANGELOG.next.asciidoc

---------

Co-authored-by: emmanueltouzery <[email protected]>
Co-authored-by: Tiago Queiroz <[email protected]>
  • Loading branch information
3 people authored Jun 13, 2024
1 parent 3af457b commit 20fea92
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
*Filebeat*

- Convert netflow input to API v2 and disable event normalisation {pull}37901[37901]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]

*Heartbeat*

Expand Down
8 changes: 4 additions & 4 deletions libbeat/statestore/backend/memlog/diskstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -182,7 +181,7 @@ func (s *diskstore) tryOpenLog() error {
f.Close()
})

_, err = f.Seek(0, os.SEEK_END)
_, err = f.Seek(0, io.SeekEnd)
if err != nil {
return err
}
Expand Down Expand Up @@ -402,6 +401,7 @@ func (s *diskstore) checkpointClearLog() {
err := s.logFile.Truncate(0)
if err == nil {
_, err = s.logFile.Seek(0, io.SeekStart)
s.logInvalid = false
}

if err != nil {
Expand Down Expand Up @@ -438,7 +438,7 @@ func updateActiveMarker(log *logp.Logger, homePath, checkpointFilePath string) e
log.Errorf("Failed to remove old temporary active.dat.tmp file: %v", err)
return err
}
if err := ioutil.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil {
if err := os.WriteFile(tmpLink, []byte(checkpointFilePath), 0600); err != nil {
log.Errorf("Failed to write temporary pointer file: %v", err)
return err
}
Expand Down Expand Up @@ -538,7 +538,7 @@ func readDataFile(path string, fn func(string, mapstr.M)) error {
var states []map[string]interface{}
dec := json.NewDecoder(f)
if err := dec.Decode(&states); err != nil {
return fmt.Errorf("%w: %v", ErrCorruptStore, err)
return fmt.Errorf("%w: %w", ErrCorruptStore, err)
}

for _, state := range states {
Expand Down
55 changes: 55 additions & 0 deletions libbeat/statestore/backend/memlog/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestRecoverFromCorruption(t *testing.T) {
path := t.TempDir()
logp.DevelopmentSetup()

if err := copyPath(path, "testdata/1/logfile_incomplete/"); err != nil {
t.Fatalf("Failed to copy test file to the temporary directory: %v", err)
}

store, err := openStore(logp.NewLogger("test"), path, 0660, 4096, false, func(_ uint64) bool {
return false
})
require.NoError(t, err, "openStore must succeed")
require.True(t, store.disk.logInvalid, "expecting the log file to be invalid")

err = store.logOperation(&opSet{K: "key", V: mapstr.M{
"field": 42,
}})
require.NoError(t, err, "logOperation must succeed")
require.False(t, store.disk.logInvalid, "log file must be valid")
require.FileExistsf(t, filepath.Join(path, "7.json"), "expecting the checkpoint file to have been created")

file, err := os.Stat(filepath.Join(path, "log.json"))
require.NoError(t, err, "Stat on the log file must succeed")
require.Equal(t, int64(0), file.Size(), "expecting the log file to be truncated")
}

0 comments on commit 20fea92

Please sign in to comment.