Skip to content

Commit

Permalink
Use prometheus/util/flock to lock data dirs
Browse files Browse the repository at this point in the history
- @danp identified race condition in previous version -- thank you!
- Use flock package from Prometheus instead of rolling our own
- That just does lock/unlock; amend to also delete on Release
- Fix tests and standardize some variable names
- Update manifest/lock with new dependency (sigh)
- It can't ever just be easy, can it
- I wish I was a carpenter sometimes
  • Loading branch information
peterbourgon committed Jan 18, 2017
1 parent b6cbaf4 commit eb5e933
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 110 deletions.
10 changes: 9 additions & 1 deletion lock.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"memo": "484cd35468b179fbd7450e05cb7cb56b68929f51fe5df5c4e2023c1f895c5838",
"memo": "3369fc05fb53f33a7030ef62ba488ade29bde856456f7c2570d233b76710062b",
"projects": [
{
"name": "github.com/armon/go-metrics",
Expand Down Expand Up @@ -178,6 +178,14 @@
"packages": [
"."
]
},
{
"name": "github.com/prometheus/prometheus",
"version": "v1.4.1",
"revision": "2a89e8733f240d3cd57a6520b52c36ac4744ce12",
"packages": [
"util/flock"
]
}
]
}
3 changes: 3 additions & 0 deletions manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
},
"github.com/prometheus/client_golang": {
"version": "v0.8.0"
},
"github.com/prometheus/prometheus": {
"version": "v1.4.1"
}
}
}
6 changes: 6 additions & 0 deletions pkg/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Filesystem interface {
MkdirAll(path string) error
Chtimes(path string, atime, mtime time.Time) error
Walk(root string, walkFn filepath.WalkFunc) error
Lock(path string) (r Releaser, existed bool, err error)
}

// File is the subset of methods we use on an *os.File.
Expand All @@ -28,3 +29,8 @@ type File interface {
Size() int64
Sync() error
}

// Releaser is returned by Lock calls.
type Releaser interface {
Release() error
}
66 changes: 0 additions & 66 deletions pkg/fs/lock.go

This file was deleted.

45 changes: 30 additions & 15 deletions pkg/fs/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,35 @@ package fs
import "testing"

func TestLock(t *testing.T) {
var (
fs = NewVirtualFilesystem()
path = "test-lock"
)
if err := ClaimLock(fs, path); err != nil {
t.Fatalf("initial claim: %v", err)
}
if err := ClaimLock(fs, path); err == nil {
t.Fatal("double-claim: expected error, got none")
}
if err := ReleaseLock(fs, path); err != nil {
t.Fatalf("initial release: %v", err)
}
if err := ReleaseLock(fs, path); err == nil {
t.Fatalf("double-release: expected error, got none")
for _, testcase := range []struct {
name string
filesys Filesystem
}{
{"virtual", NewVirtualFilesystem()},
{"real", NewRealFilesystem(false)},
} {
lock := "TESTLOCK"
t.Run(testcase.name, func(t *testing.T) {
r, existed, err := testcase.filesys.Lock(lock)
if err != nil {
t.Fatalf("initial claim: %v", err)
}
if existed {
t.Fatal("initial claim: lock file already exists")
}
if _, existed, _ = testcase.filesys.Lock(lock); !existed {
t.Fatal("second claim: want existed true, have false")
}
if err := r.Release(); err != nil {
t.Fatalf("initial release: %v", err)
}
if err := r.Release(); err == nil {
t.Fatal("second release: want error, have none")
}
})
if testcase.filesys.Exists(lock) {
t.Errorf("%s: %s still exists", testcase.name, lock)
}
testcase.filesys.Remove(lock)
}
}
5 changes: 5 additions & 0 deletions pkg/fs/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func (nopFilesystem) Exists(path string) bool { return
func (nopFilesystem) MkdirAll(path string) error { return nil }
func (nopFilesystem) Chtimes(path string, atime, mtime time.Time) error { return nil }
func (nopFilesystem) Walk(root string, walkFn filepath.WalkFunc) error { return nil }
func (nopFilesystem) Lock(path string) (Releaser, bool, error) { return nopReleaser{}, false, nil }

type nopFile struct{}

Expand All @@ -29,3 +30,7 @@ func (nopFile) Close() error { return nil }
func (nopFile) Name() string { return "" }
func (nopFile) Size() int64 { return 0 }
func (nopFile) Sync() error { return nil }

type nopReleaser struct{}

func (nopReleaser) Release() error { return nil }
21 changes: 21 additions & 0 deletions pkg/fs/real.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/prometheus/prometheus/util/flock"

"github.com/oklog/oklog/pkg/ioext"
"github.com/oklog/oklog/pkg/mmap"
)
Expand Down Expand Up @@ -78,6 +80,25 @@ func (realFilesystem) Walk(root string, walkFn filepath.WalkFunc) error {
return filepath.Walk(root, walkFn)
}

func (realFilesystem) Lock(path string) (r Releaser, existed bool, err error) {
r, existed, err = flock.New(path)
r = deletingReleaser{path, r}
return r, existed, err
}

type deletingReleaser struct {
path string
r Releaser
}

func (dr deletingReleaser) Release() error {
// Remove before Release should be safe, and prevents a race.
if err := os.Remove(dr.path); err != nil {
return err
}
return dr.r.Release()
}

type realFile struct {
*os.File
io.Reader
Expand Down
17 changes: 17 additions & 0 deletions pkg/fs/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ func (fs *virtualFilesystem) Walk(root string, walkFn filepath.WalkFunc) error {
return nil
}

func (fs *virtualFilesystem) Lock(path string) (r Releaser, existed bool, err error) {
fs.mtx.Lock()
defer fs.mtx.Unlock()
if _, ok := fs.files[path]; ok {
return virtualReleaser(func() error { return fs.Remove(path) }), true, nil
}

// Copy/paste.
fs.files[path] = &virtualFile{path, bytes.Buffer{}, time.Now(), time.Now()}

return virtualReleaser(func() error { return fs.Remove(path) }), false, nil
}

type virtualFile struct {
name string
buf bytes.Buffer
Expand All @@ -128,3 +141,7 @@ func (fi virtualFileInfo) Mode() os.FileMode { return os.FileMode(0666) }
func (fi virtualFileInfo) ModTime() time.Time { return fi.mtime }
func (fi virtualFileInfo) IsDir() bool { return false }
func (fi virtualFileInfo) Sys() interface{} { return nil }

type virtualReleaser func() error

func (r virtualReleaser) Release() error { return r() }
5 changes: 5 additions & 0 deletions pkg/ingest/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (fs *mockFilesystem) Exists(path string) bool { r
func (fs *mockFilesystem) MkdirAll(path string) error { return nil }
func (fs *mockFilesystem) Chtimes(path string, atime, mtime time.Time) error { return nil }
func (fs *mockFilesystem) Walk(root string, walkFn filepath.WalkFunc) error { return nil }
func (fs *mockFilesystem) Lock(string) (fs.Releaser, bool, error) { return mockReleaser{}, false, nil }

type mockFile struct{ closures *uint64 }

Expand All @@ -130,3 +131,7 @@ func (f *mockFile) Close() error { atomic.AddUint64(f.closures, 1
func (f *mockFile) Name() string { return "" }
func (f *mockFile) Size() int64 { return 0 }
func (f *mockFile) Sync() error { return nil }

type mockReleaser struct{}

func (mockReleaser) Release() error { return nil }
39 changes: 24 additions & 15 deletions pkg/ingest/file_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
extActive = ".active"
extFlushed = ".flushed"
extPending = ".pending"

lockFile = "LOCK"
)

// NewFileLog returns a Log implemented via the filesystem.
Expand All @@ -24,30 +26,37 @@ func NewFileLog(filesys fs.Filesystem, root string) (Log, error) {
if err := filesys.MkdirAll(root); err != nil {
return nil, err
}
if err := fs.ClaimLock(filesys, root); err != nil {
return nil, err
lock := filepath.Join(root, lockFile)
r, existed, err := filesys.Lock(lock)
if err != nil {
return nil, errors.Wrapf(err, "locking %s", lock)
}
if existed {
return nil, errors.Errorf("%s already exists; another process is running, or the file is stale", lock)
}
return &fileLog{
fs: filesys,
root: root,
root: root,
filesys: filesys,
releaser: r,
}, nil
}

type fileLog struct {
fs fs.Filesystem
root string
root string
filesys fs.Filesystem
releaser fs.Releaser
}

// Create returns a new writable segment.
func (log *fileLog) Create() (WriteSegment, error) {
filename := filepath.Join(log.root, fmt.Sprintf("%s%s", uuid.New(), extActive))

f, err := log.fs.Create(filename)
f, err := log.filesys.Create(filename)
if err != nil {
return nil, err
}

return fileWriteSegment{log.fs, f}, nil
return fileWriteSegment{log.filesys, f}, nil
}

// Oldest returns the oldest flushed segment.
Expand All @@ -56,7 +65,7 @@ func (log *fileLog) Oldest() (ReadSegment, error) {
oldest = time.Now()
chosen string
)
log.fs.Walk(log.root, func(path string, info os.FileInfo, err error) error {
log.filesys.Walk(log.root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -78,24 +87,24 @@ func (log *fileLog) Oldest() (ReadSegment, error) {
// This can be racy. But if the rename fails, no problem.
// Someone else got it; our client can just try again.
newname := modifyExtension(chosen, extPending)
if err := log.fs.Rename(chosen, newname); err != nil {
if err := log.filesys.Rename(chosen, newname); err != nil {
return nil, errors.New("race when fetching oldest; please try again")
}

f, err := log.fs.Open(newname)
f, err := log.filesys.Open(newname)
if err != nil {
if renameErr := log.fs.Rename(newname, chosen); renameErr != nil {
if renameErr := log.filesys.Rename(newname, chosen); renameErr != nil {
panic(renameErr)
}
return nil, err
}

return fileReadSegment{log.fs, f}, nil
return fileReadSegment{log.filesys, f}, nil
}

func (log *fileLog) Stats() (LogStats, error) {
var stats LogStats
log.fs.Walk(log.root, func(path string, info os.FileInfo, err error) error {
log.filesys.Walk(log.root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -119,7 +128,7 @@ func (log *fileLog) Stats() (LogStats, error) {
}

func (log *fileLog) Close() error {
return fs.ReleaseLock(log.fs, log.root)
return log.releaser.Release()
}

type fileWriteSegment struct {
Expand Down
Loading

0 comments on commit eb5e933

Please sign in to comment.