Skip to content

Commit

Permalink
add manifest rewriting unit test that will fail using a injected late…
Browse files Browse the repository at this point in the history
…ncy on os.file.Sync()
  • Loading branch information
iluminae committed Nov 2, 2021
1 parent cba20b9 commit e0eabc6
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
5 changes: 4 additions & 1 deletion manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,12 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error {
}

mf.appendLock.Unlock()
return mf.fp.Sync()
return syncFunc(mf.fp)
}

// this function is saved here to allow injection of fake filesystem latency at test time.
var syncFunc = func(f *os.File) error { return f.Sync() }

// Has to be 4 bytes. The value can never change, ever, anyway.
var magicText = [4]byte{'B', 'd', 'g', 'r'}

Expand Down
41 changes: 41 additions & 0 deletions manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"

otrace "go.opencensus.io/trace"

Expand Down Expand Up @@ -245,3 +247,42 @@ func TestManifestRewrite(t *testing.T) {
uint64(deletionsThreshold * 3): {Level: 0},
}, m.Tables)
}

func TestConcurrentManifestCompaction(t *testing.T) {
dir := t.TempDir()

// set this low so rewrites will happen more often
deletionsThreshold := 1

// overwrite the sync function to make this race condition easily reproducible
syncFunc = func(f *os.File) error {
// effectively making the Sync() take around 1s makes this reproduce every time
time.Sleep(1 * time.Second)
return f.Sync()
}

mf, _, err := helpOpenOrCreateManifestFile(dir, false, 0, deletionsThreshold)
require.NoError(t, err)

cs := &pb.ManifestChangeSet{}
for i := uint64(0); i < 1000; i++ {
cs.Changes = append(cs.Changes,
newCreateChange(i, 0, 0, 0),
newDeleteChange(i),
)
}

// simulate 2 concurrent compaction threads
n := 2
wg := sync.WaitGroup{}
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
require.NoError(t, mf.addChanges(cs.Changes))
}()
}
wg.Wait()

require.NoError(t, mf.close())
}

0 comments on commit e0eabc6

Please sign in to comment.