Skip to content

Commit

Permalink
fix(replay) - Update head for LSM entires also (#1456) (#1516)
Browse files Browse the repository at this point in the history
#1372 tried to fix the `replay from
start` issue but it partially fixed the issue. The head was not being updated
in case all the entries are inserted only in the LSM tree.
This commit fixes it.

(cherry picked from commit 4c8fe7f)
  • Loading branch information
Ibrahim Jarif authored Sep 10, 2020
1 parent d583154 commit 05e7af9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 13 deletions.
22 changes: 22 additions & 0 deletions backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestBackupRestore1(t *testing.T) {
return nil
})
require.NoError(t, err)
require.Equal(t, db.orc.nextTs(), uint64(3))
}

func TestBackupRestore2(t *testing.T) {
Expand Down Expand Up @@ -163,6 +164,9 @@ func TestBackupRestore2(t *testing.T) {
err = db2.Load(&backup, 16)
require.NoError(t, err)

// Check nextTs is correctly set.
require.Equal(t, db1.orc.nextTs(), db2.orc.nextTs())

for i := byte(1); i < N; i++ {
err = db2.View(func(tx *Txn) error {
k := append(key1, i)
Expand Down Expand Up @@ -210,6 +214,9 @@ func TestBackupRestore2(t *testing.T) {
err = db3.Load(&backup, 16)
require.NoError(t, err)

// Check nextTs is correctly set.
require.Equal(t, db2.orc.nextTs(), db3.orc.nextTs())

for i := byte(1); i < N; i++ {
err = db3.View(func(tx *Txn) error {
k := append(key1, i)
Expand Down Expand Up @@ -319,6 +326,7 @@ func TestBackupRestore3(t *testing.T) {
N := 1000
entries := createEntries(N)

var db1NextTs uint64
// backup
{
db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup1")))
Expand All @@ -329,6 +337,8 @@ func TestBackupRestore3(t *testing.T) {

_, err = db1.Backup(&bb, 0)
require.NoError(t, err)

db1NextTs = db1.orc.nextTs()
require.NoError(t, db1.Close())
}
require.True(t, len(entries) == N)
Expand All @@ -339,7 +349,9 @@ func TestBackupRestore3(t *testing.T) {
require.NoError(t, err)

defer db2.Close()
require.NotEqual(t, db1NextTs, db2.orc.nextTs())
require.NoError(t, db2.Load(&bb, 16))
require.Equal(t, db1NextTs, db2.orc.nextTs())

// verify
err = db2.View(func(txn *Txn) error {
Expand Down Expand Up @@ -377,6 +389,7 @@ func TestBackupLoadIncremental(t *testing.T) {
updates := make(map[int]byte)
var bb bytes.Buffer

var db1NextTs uint64
// backup
{
db1, err := Open(DefaultOptions(filepath.Join(tmpdir, "backup2")))
Expand Down Expand Up @@ -433,6 +446,9 @@ func TestBackupLoadIncremental(t *testing.T) {
require.NoError(t, err)
_, err = db1.Backup(&bb, since)
require.NoError(t, err)

db1NextTs = db1.orc.nextTs()

require.NoError(t, db1.Close())
}
require.True(t, len(entries) == N)
Expand All @@ -444,7 +460,9 @@ func TestBackupLoadIncremental(t *testing.T) {

defer db2.Close()

require.NotEqual(t, db1NextTs, db2.orc.nextTs())
require.NoError(t, db2.Load(&bb, 16))
require.Equal(t, db1NextTs, db2.orc.nextTs())

// verify
actual := make(map[int]byte)
Expand Down Expand Up @@ -511,6 +529,8 @@ func TestBackupBitClear(t *testing.T) {
_, err = db.Backup(bak, 0)
require.NoError(t, err)
require.NoError(t, bak.Close())

oldValue := db.orc.nextTs()
require.NoError(t, db.Close())

opt = getTestOptions(dir)
Expand All @@ -524,6 +544,8 @@ func TestBackupBitClear(t *testing.T) {
defer bak.Close()

require.NoError(t, db.Load(bak, 16))
// Ensure nextTs is still the same.
require.Equal(t, oldValue, db.orc.nextTs())

require.NoError(t, db.View(func(txn *Txn) error {
e, err := txn.Get(key)
Expand Down
34 changes: 23 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
nv = make([]byte, vptrSize)
vp.Encode(nv)
meta = meta | bitValuePointer
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files again.
db.updateHead([]valuePointer{vp})
}
// Update vhead. If the crash happens while replay was in progess
// and the head is not updated, we will end up replaying all the
// files starting from file zero, again.
db.updateHead([]valuePointer{vp})

v := y.ValueStruct{
Value: nv,
Expand Down Expand Up @@ -871,17 +871,14 @@ type flushTask struct {
dropPrefix []byte
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scnerio, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
return nil
func (db *DB) pushHead(ft flushTask) error {
// Ensure we never push a zero valued head pointer.
if ft.vptr.IsZero() {
return errors.New("Head should not be zero")
}

// Store badger head even if vptr is zero, need it for readTs
db.opt.Debugf("Storing value log head: %+v\n", ft.vptr)
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
offset := make([]byte, vptrSize)
ft.vptr.Encode(offset)

Expand All @@ -890,6 +887,21 @@ func (db *DB) handleFlushTask(ft flushTask) error {
headTs := y.KeyWithTs(head, db.orc.nextTs())
ft.mt.Put(headTs, y.ValueStruct{Value: offset})

return nil
}

// handleFlushTask must be run serially.
func (db *DB) handleFlushTask(ft flushTask) error {
// There can be a scenario, when empty memtable is flushed. For example, memtable is empty and
// after writing request to value log, rotation count exceeds db.LogRotatesToFlush.
if ft.mt.Empty() {
return nil
}

if err := db.pushHead(ft); err != nil {
return err
}

fileID := db.lc.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,8 +1861,9 @@ func TestNoCrash(t *testing.T) {
}

db.Lock()
// make head to point to first file
db.vhead = valuePointer{0, 0, 0}
// make head to point to second file. We cannot make it point to the first
// vlog file because we cannot push a zero head pointer.
db.vhead = valuePointer{1, 0, 0}
db.Unlock()
db.Close()

Expand Down

0 comments on commit 05e7af9

Please sign in to comment.