Skip to content

Commit

Permalink
Refactor beats lockfile to use timeout, retry (#34194)
Browse files Browse the repository at this point in the history
* move lockfile logic to a retries

* clean up

* add changelog, update docs

* change unlock operation, remove file first

* fix tests

* fix lock on windows

* remove debug line

* add docs

* split out files

* remove old OS checks

* fix error

* format

(cherry picked from commit 21b6128)

# Conflicts:
#	libbeat/cmd/instance/locks/lock.go
  • Loading branch information
fearful-symmetry authored and mergify[bot] committed Jan 31, 2023
1 parent 66abded commit 097f897
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 131 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

*Affecting all Beats*

- Fix Windows service install/uninstall when Win32_Service returns error, add logic to wait until the Windows Service is stopped before proceeding. {pull}33322[33322]
- Support for multiline zookeeper logs {issue}2496[2496]
- Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792]
- Disable lockfile when running under elastic-agent. {pull}33988[33988]
- Fix lockfile logic, retry locking {pull}34194[34194]
- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346]
- Fix namespacing on self-monitoring {pull}32336[32336]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
Expand Down
76 changes: 31 additions & 45 deletions libbeat/cmd/instance/locks/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,85 +18,68 @@
package locks

import (
"encoding/json"
"errors"
"fmt"
"os"
"runtime"
"time"

"github.com/gofrs/flock"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
metricproc "github.com/elastic/elastic-agent-system-metrics/metric/system/process"
)

// Locker is a retrying file locker
type Locker struct {
fileLock *flock.Flock
logger *logp.Logger
beatName string
filePath string
beatStart time.Time
}

type pidfile struct {
PID int `json:"pid"`
WriteTime time.Time `json:"write_time"`
fileLock *flock.Flock
retryCount int
retrySleep time.Duration
logger *logp.Logger
}

var (
// ErrAlreadyLocked is returned when a lock on the data path is attempted but
// unsuccessful because another Beat instance already has the lock on the same
// data path.
ErrAlreadyLocked = fmt.Errorf("data path already locked by another beat. Please make sure that multiple beats are not sharing the same data path (path.data)")

// ErrLockfileEmpty is returned by readExistingPidfile() when an existing pidfile is found, but the file is empty.
ErrLockfileEmpty = fmt.Errorf("lockfile is empty")
)

// a little wrapper for the gitpid function to make testing easier.
var pidFetch = os.Getpid

// New returns a new pid-aware file locker
// all logic, including checking for existing locks, is performed lazily
// New returns a new file locker
func New(beatInfo beat.Info) *Locker {
return NewWithRetry(beatInfo, 4, time.Millisecond*400)
}

// NewWithRetry returns a new file locker with the given settings
func NewWithRetry(beatInfo beat.Info, retryCount int, retrySleep time.Duration) *Locker {
lockfilePath := paths.Resolve(paths.Data, beatInfo.Beat+".lock")
return &Locker{
fileLock: flock.New(lockfilePath),
logger: logp.L(),
beatName: beatInfo.Beat,
filePath: lockfilePath,
beatStart: beatInfo.StartTime,
fileLock: flock.New(lockfilePath),
retryCount: retryCount,
retrySleep: retrySleep,
logger: logp.L(),
}
}

// Lock attempts to acquire a lock on the data path for the currently-running
// Beat instance. If another Beats instance already has a lock on the same data path
// an ErrAlreadyLocked error is returned.
func (lock *Locker) Lock() error {
new := pidfile{PID: pidFetch(), WriteTime: time.Now()}
encoded, err := json.Marshal(&new)
if err != nil {
return fmt.Errorf("error encoding json for pidfile: %w", err)
}

// The combination of O_CREATE and O_EXCL will ensure we return an error if we don't
// manage to create the file
fh, openErr := os.OpenFile(lock.filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
// Don't trust different OSes to report the errors we expect, just try to recover regardless
if openErr != nil {
err = lock.handleFailedCreate()
for i := 0; i < lock.retryCount; i++ {
// note that TryLock doesn't set an os.O_EXCL flag,
// which means that we could be locking a file we didn't create.
// This makes it easy to recover from a failed shutdown or panic,
// as the OS will clean up the lock and we'll re-lock the same file.
// However, can create odd races if you're not careful, since you don't know if you're locking "your" file.
gotLock, err := lock.fileLock.TryLock()
if err != nil {
return fmt.Errorf("cannot obtain lockfile: %w", err)
return fmt.Errorf("unable to try a lock of the data path: %w", err)
}
// If something fails here, it's probably unrecoverable
fh, err = os.OpenFile(lock.filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600)
if err != nil {
return fmt.Errorf("cannot re-obtain lockfile %s: %w", lock.filePath, err)
if gotLock {
return nil
}
lock.logger.Debugf("Could not obtain lock for file %s, retrying %d times", lock.fileLock.Path(), (lock.retryCount - i))
time.Sleep(lock.retrySleep)
}
<<<<<<< HEAD

// a Process can't write to its own locked file on all platforms, write first
_, err = fh.Write(encoded)
Expand Down Expand Up @@ -259,4 +242,7 @@ func (lock *Locker) readExistingPidfile() (pidfile, error) {
return pidfile{}, fmt.Errorf("error reading JSON from pid file %s: %w", lock.filePath, err)
}
return foundPidFile, nil
=======
return fmt.Errorf("%s: %w", lock.fileLock.Path(), ErrAlreadyLocked)
>>>>>>> 21b6128c95 (Refactor beats lockfile to use timeout, retry (#34194))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,28 @@
// specific language governing permissions and limitations
// under the License.

//go:build (!darwin || !cgo) && !freebsd && !linux && !windows && !aix
//go:build !windows
// +build !windows

package locks

import (
"fmt"
"runtime"

"github.com/elastic/elastic-agent-system-metrics/metric/system/process"
"os"
)

func findMatchingPID(pid int) (process.PidState, error) {
return process.Dead, fmt.Errorf("findMatchingPID not supported on platform: %s", runtime.GOOS)
// Unlock attempts to release the lock on a data path previously acquired via Lock(). This will first remove the file, then unlock the file handle.
func (lock *Locker) Unlock() error {
// Unlock will remove the file while we still have the lock, so we reduce the odds of another beat swooping in to start between the Unlock() and Remove() operation.
err := os.Remove(lock.fileLock.Path())
if err != nil {
lock.logger.Warnf("could not remove lockfile at %s: %s", lock.fileLock.Path(), err)
}

err = lock.fileLock.Unlock()
if err != nil {
return fmt.Errorf("unable to unlock data path: %w", err)
}

return nil
}
122 changes: 49 additions & 73 deletions libbeat/cmd/instance/locks/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -60,96 +58,74 @@ func TestMain(m *testing.M) {
os.Exit(exit)
}

func TestLockWithDeadPid(t *testing.T) {
// create old lockfile
pidFetch = fakeDeadPid
testBeat := beat.Info{Beat: mustNewUUID(t), StartTime: time.Now()}
locker := New(testBeat)
err := locker.Lock()
require.NoError(t, err)
func TestLocker(t *testing.T) {
// Setup two beats with same name and data path
const beatName = "testbeat-testlocker"

// create new locker
pidFetch = os.Getpid
newLocker := New(testBeat)
err = newLocker.Lock()
require.NoError(t, err)
}
b1 := beat.Info{}
b1.Beat = beatName

func TestLockWithTwoBeats(t *testing.T) {
testBeat := beat.Info{Beat: mustNewUUID(t), StartTime: time.Now()}
// emulate two beats trying to run from the same data path
locker := New(testBeat)
// use the parent process as another random beat
pidFetch = os.Getppid
err := locker.Lock()
require.NoError(t, err)
b2 := beat.Info{}
b2.Beat = beatName

// create new locker for this beat
pidFetch = os.Getpid
newLocker := New(testBeat)
err = newLocker.Lock()
require.Error(t, err)
t.Logf("Got desired error: %s", err)
}

func TestDoubleLock(t *testing.T) {
testBeat := beat.Info{Beat: mustNewUUID(t), StartTime: time.Now()}
locker := New(testBeat)
err := locker.Lock()
// Try to get a lock for the first beat. Expect it to succeed.
bl1 := New(b1)
err := bl1.Lock()
require.NoError(t, err)

newLocker := New(testBeat)
err = newLocker.Lock()
// Try to get a lock for the second beat. Expect it to fail because the
// first beat already has the lock.
bl2 := New(b2)
err = bl2.Lock()
require.Error(t, err)
t.Logf("Got desired error: %s", err)

}

func TestUnlock(t *testing.T) {
testBeat := beat.Info{Beat: mustNewUUID(t), StartTime: time.Now()}
locker := New(testBeat)
err := locker.Lock()
require.NoError(t, err)
const beatName = "testbeat-testunlock"

err = locker.Unlock()
require.NoError(t, err)
}
b1 := beat.Info{}
b1.Beat = beatName

func TestRestartWithSamePID(t *testing.T) {
// create old lockfile
testBeatName := mustNewUUID(t)
testBeat := beat.Info{Beat: testBeatName, StartTime: time.Now().Add(-time.Second * 20)}
locker := New(testBeat)
err := locker.Lock()
require.NoError(t, err)
// create new lockfile with the same PID but a newer time
// create old lockfile
testNewBeat := beat.Info{Name: testBeatName, StartTime: time.Now()}
lockerNew := New(testNewBeat)
err = lockerNew.Lock()
b2 := beat.Info{}
b2.Beat = beatName
bl2 := New(b2)

// Try to get a lock for the first beat. Expect it to succeed.
bl1 := New(b1)
err := bl1.Lock()
require.NoError(t, err)
}

func TestEmptyLockfile(t *testing.T) {
testBeat := beat.Info{Beat: mustNewUUID(t), StartTime: time.Now().Add(-time.Second * 1)}
deadLock := New(testBeat)
// Create an empty lockfile
// Might happen in cases where a beat shut down at *just* the right time.
fh, err := os.Create(deadLock.filePath)
// now unlock
err = bl1.Unlock()
require.NoError(t, err)
fh.Close()

newBeat := New(testBeat)
err = newBeat.Lock()
// try with other lockfile
err = bl2.Lock()
require.NoError(t, err)

}

func mustNewUUID(t *testing.T) string {
uuid, err := uuid.NewV4()
func TestUnlockWithRemainingFile(t *testing.T) {
const beatName = "testbeat-testunlockwithfile"

b1 := beat.Info{}
b1.Beat = beatName

b2 := beat.Info{}
b2.Beat = beatName
bl2 := New(b2)

// Try to get a lock for the first beat. Expect it to succeed.
bl1 := New(b1)
err := bl1.Lock()
require.NoError(t, err)
return uuid.String()
}

func fakeDeadPid() int {
return 99999
// unlock the underlying FD, so we don't remove the file
err = bl1.fileLock.Unlock()
require.NoError(t, err)

// now lock new handle with the same file
err = bl2.Lock()
require.NoError(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,28 @@
// specific language governing permissions and limitations
// under the License.

//go:build (darwin && cgo) || freebsd || linux || windows || aix

package locks

import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/process"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"fmt"
"os"
)

// findMatchingPID is a small wrapper to deal with cgo compat issues in libbeat's CI
func findMatchingPID(pid int) (process.PidState, error) {
return process.GetPIDState(resolve.NewTestResolver("/"), pid)
// Unlock attempts to release the lock on a data path previously acquired via Lock(). This will unlock the file before it removes it.
func (lock *Locker) Unlock() error {
// Removing a file that's locked seems to be an unsupported or undefined, and will often fail on Windows.
// Reverse the order of operations, and unlock first, then remove.
// This will slightly increase the odds of a race on Windows if we're in a tight restart loop,
// as another beat can swoop in and lock the file before this beat removes it.
err := lock.fileLock.Unlock()
if err != nil {
return fmt.Errorf("unable to unlock data path: %w", err)
}

err = os.Remove(lock.fileLock.Path())
if err != nil {
lock.logger.Warnf("could not remove lockfile at %s: %s", lock.fileLock.Path(), err)
}

return nil
}

0 comments on commit 097f897

Please sign in to comment.