Skip to content

Commit

Permalink
feat(stream): add support for incremental stream writer (#1722)
Browse files Browse the repository at this point in the history
This PR adds support for stream writing incrementally to the DB.
Adds an API: StreamWriter.PrepareIncremental

This PR also has the bug fix from PR #1723.
  • Loading branch information
mangalaman93 committed Mar 2, 2023
1 parent edb2318 commit efc9502
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 3 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,7 @@ func (db *DB) prepareToDrop() (func(), error) {
// write it to db. Then, flush all the pending flushtask. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return nil, err
return func() {}, err
}
reqs := make([]*request, 0, 10)
for {
Expand Down
44 changes: 42 additions & 2 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,58 @@ func (db *DB) NewStreamWriter() *StreamWriter {
// Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
// existing DB, stops compactions and any writes being done by other means. Be very careful when
// calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
// in a corrupt Badger instance.
// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
func (sw *StreamWriter) Prepare() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

done, err := sw.db.dropAll()
// Ensure that done() is never called more than once.
var once sync.Once
sw.done = func() { once.Do(done) }
return err
}

// PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
// In incremental stream write, the tables are written at one level above the current base level.
func (sw *StreamWriter) PrepareIncremental() error {
sw.writeLock.Lock()
defer sw.writeLock.Unlock()

// Ensure that done() is never called more than once.
var once sync.Once

// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
// Before we start writing, we'll stop the compactions because no one else should be writing to
// the same level as the stream writer is writing to.
f, err := sw.db.prepareToDrop()
if err != nil {
sw.done = func() { once.Do(f) }
return err
}
sw.db.stopCompactions()
done := func() {
sw.db.startCompactions()
f()
}
sw.done = func() { once.Do(done) }

return err
isEmptyDB := true
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
sw.prevLevel = level.Level
isEmptyDB = false
break
}
}
if isEmptyDB {
// If DB is empty, we should allow doing incremental stream write.
return nil
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
}
return nil
}

// Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
Expand Down
76 changes: 76 additions & 0 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,79 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
require.NoError(t, sw.Flush(), "sw.Flush() failed")
})
}

func TestStreamWriterIncremental(t *testing.T) {
addIncremtal := func(t *testing.T, db *DB, keys [][]byte) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
for _, key := range keys {
KVToBuffer(&pb.KV{
Key: key,
Value: []byte("val"),
Version: 1,
}, buf)
}
// Now do an incremental stream write.
sw := db.NewStreamWriter()
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
}

t.Run("incremental on non-empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer func() { require.NoError(t, buf.Release()) }()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Value: []byte("val"),
Version: 1,
}, buf)
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

addIncremtal(t, db, [][]byte{[]byte("key-2")})

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
_, err = txn.Get([]byte("key-2"))
require.NoError(t, err)
})
})

t.Run("incremental on empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("key-1")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
})
})

t.Run("multiple incremental", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")})
addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")})
addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")})
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("a1"))
require.NoError(t, err)
_, err = txn.Get([]byte("c1"))
require.NoError(t, err)
_, err = txn.Get([]byte("a2"))
require.NoError(t, err)
_, err = txn.Get([]byte("c2"))
require.NoError(t, err)
_, err = txn.Get([]byte("a3"))
require.NoError(t, err)
_, err = txn.Get([]byte("c3"))
require.NoError(t, err)
})
})
}

0 comments on commit efc9502

Please sign in to comment.