Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): add support for incremental stream writer #1722

Merged
merged 7 commits into from
Jul 6, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
allow doing incremental write when DB is empty
NamanJain8 committed Jul 6, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit f21e4ddc8dea195d069bd36bff45da8af88797c9
8 changes: 7 additions & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ func (sw *StreamWriter) PrepareIncremental() error {
// Ensure that done() is never called more than once.
var once sync.Once

// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// prepareToDrop will stop all the incoming writes and process any pending flush tasks.
// Before we start writing, we'll stop the compactions because no one else should be writing to
// the same level as the stream writer is writing to.
f, err := sw.db.prepareToDrop()
@@ -110,11 +110,17 @@ func (sw *StreamWriter) PrepareIncremental() error {
}
sw.done = func() { once.Do(done) }

isEmptyDB := true
for _, level := range sw.db.Levels() {
if level.NumTables > 0 {
sw.prevLevel = level.Level
isEmptyDB = false
}
}
if isEmptyDB {
// If DB is empty, we should allow doing incremental stream write.
return nil
}
if sw.prevLevel == 0 {
return fmt.Errorf("Unable to do incremental writes because L0 has data")
}
58 changes: 38 additions & 20 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
@@ -604,34 +604,52 @@ func TestStreamWriterWithLargeValue(t *testing.T) {
}

func TestStreamWriterIncremental(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal := func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Key: []byte("key-2"),
Value: []byte("val"),
Version: 1,
}, buf)
// Now do an incremental stream write.
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
buf.Reset()
KVToBuffer(&pb.KV{
Key: []byte("key-2"),
Value: []byte("val"),
Version: 1,
}, buf)
// Now do an incremental stream write.
sw2 := db.NewStreamWriter()
require.NoError(t, sw2.PrepareIncremental(), "sw.PrepareIncremental() failed")
require.NoError(t, sw2.Write(buf), "sw.Write() failed")
require.NoError(t, sw2.Flush(), "sw.Flush() failed")
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
_, err = txn.Get([]byte("key-2"))
require.NoError(t, err)
}

t.Run("incremental on non-empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Value: []byte("val"),
Version: 1,
}, buf)
sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")

addIncremtal(t, db)

txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-1"))
require.NoError(t, err)
_, err = txn.Get([]byte("key-2"))
require.NoError(t, err)
})
})
t.Run("incremental on empty DB", func(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
addIncremtal(t, db)
txn := db.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get([]byte("key-2"))
require.NoError(t, err)
})
})
}