From a9a5761449146b29abf530e1bc2453f73726c7c5 Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Tue, 14 Feb 2023 21:22:03 +0530 Subject: [PATCH] feat(stream): add support for incremental stream writer (#1722) This PR adds support for stream writing incrementally to the DB. Adds an API: StreamWriter.PrepareIncremental This PR also has the bug fix from PR #1723. --- badger/cmd/stream.go | 1 + db.go | 2 +- stream_writer.go | 64 +++++++++++++++++++++++++--- stream_writer_test.go | 99 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 159 insertions(+), 7 deletions(-) diff --git a/badger/cmd/stream.go b/badger/cmd/stream.go index 98f1efd0c..45846d733 100644 --- a/badger/cmd/stream.go +++ b/badger/cmd/stream.go @@ -118,6 +118,7 @@ func stream(cmd *cobra.Command, args []string) error { WithValueDir(so.outDir). WithNumVersionsToKeep(so.numVersions). WithCompression(options.CompressionType(so.compressionType)). + WithEncryptionKey(encKey). WithReadOnly(false) err = inDB.StreamDB(outOpt) diff --git a/db.go b/db.go index 0dc8f020c..f5910b988 100644 --- a/db.go +++ b/db.go @@ -1623,7 +1623,7 @@ func (db *DB) prepareToDrop() (func(), error) { // write it to db. Then, flush all the pending memtable. So that, we // don't miss any entries. if err := db.blockWrite(); err != nil { - return nil, err + return func() {}, err } reqs := make([]*request, 0, 10) for { diff --git a/stream_writer.go b/stream_writer.go index 6618b2f7c..1cad64ce7 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -48,6 +48,7 @@ type StreamWriter struct { throttle *y.Throttle maxVersion uint64 writers map[uint32]*sortedWriter + prevLevel int } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -67,18 +68,58 @@ func (db *DB) NewStreamWriter() *StreamWriter { // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in // existing DB, stops compactions and any writes being done by other means. Be very careful when // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result -// in a corrupt Badger instance. +// in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write. func (sw *StreamWriter) Prepare() error { sw.writeLock.Lock() defer sw.writeLock.Unlock() done, err := sw.db.dropAll() + // Ensure that done() is never called more than once. + var once sync.Once + sw.done = func() { once.Do(done) } + return err +} + +// PrepareIncremental should be called before writing any entry to StreamWriter incrementally. +// In incremental stream write, the tables are written at one level above the current base level. +func (sw *StreamWriter) PrepareIncremental() error { + sw.writeLock.Lock() + defer sw.writeLock.Unlock() // Ensure that done() is never called more than once. var once sync.Once + + // prepareToDrop will stop all the incoming writes and process any pending flush tasks. + // Before we start writing, we'll stop the compactions because no one else should be writing to + // the same level as the stream writer is writing to. + f, err := sw.db.prepareToDrop() + if err != nil { + sw.done = func() { once.Do(f) } + return err + } + sw.db.stopCompactions() + done := func() { + sw.db.startCompactions() + f() + } sw.done = func() { once.Do(done) } - return err + isEmptyDB := true + for _, level := range sw.db.Levels() { + if level.NumTables > 0 { + sw.prevLevel = level.Level + isEmptyDB = false + break + } + } + if isEmptyDB { + // If DB is empty, we should allow doing incremental stream write. + return nil + } + if sw.prevLevel == 0 { + return fmt.Errorf("Unable to do incremental writes because L0 has data") + } + return nil } // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter @@ -110,6 +151,18 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId)) } + sw.writeLock.Lock() + if sw.maxVersion < kv.Version { + sw.maxVersion = kv.Version + } + if sw.prevLevel == 0 { + // If prevLevel is 0, that means that we have not written anything yet. + // So, we can write to the maxLevel. newWriter writes to prevLevel - 1, + // so we can set prevLevel to len(levels). + sw.prevLevel = len(sw.db.lc.levels) + } + sw.writeLock.Unlock() + var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -117,9 +170,6 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if len(kv.UserMeta) > 0 { userMeta = kv.UserMeta[0] } - if sw.maxVersion < kv.Version { - sw.maxVersion = kv.Version - } e := &Entry{ Key: y.KeyWithTs(kv.Key, kv.Version), Value: y.Copy(kv.Value), @@ -285,6 +335,7 @@ type sortedWriter struct { builder *table.Builder lastKey []byte + level int streamID uint32 reqCh chan *request // Have separate closer for each writer, as it can be closed at any time. @@ -304,6 +355,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { builder: table.NewTableBuilder(bopts), reqCh: make(chan *request, 3), closer: z.NewCloser(1), + level: sw.prevLevel - 1, // Write at the level just above the one we were writing to. } go w.handleRequests() @@ -435,7 +487,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { } lc := w.db.lc - lhandler := lc.levels[len(lc.levels)-1] + lhandler := lc.levels[w.level] // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ Id: tbl.ID(), diff --git a/stream_writer_test.go b/stream_writer_test.go index 2535fd22f..da4bde736 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -577,3 +577,102 @@ func TestStreamWriterEncrypted(t *testing.T) { require.NoError(t, db.Close()) } + +// Test that stream writer does not crashes with large values in managed mode. +func TestStreamWriterWithLargeValue(t *testing.T) { + opts := DefaultOptions("") + opts.managedTxns = true + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + buf := z.NewBuffer(10<<20, "test") + defer func() { require.NoError(t, buf.Release()) }() + val := make([]byte, 10<<20) + _, err := rand.Read(val) + require.NoError(t, err) + KVToBuffer(&pb.KV{ + Key: []byte("key"), + Value: val, + Version: 1, + }, buf) + + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + }) +} + +func TestStreamWriterIncremental(t *testing.T) { + addIncremtal := func(t *testing.T, db *DB, keys [][]byte) { + buf := z.NewBuffer(10<<20, "test") + defer func() { require.NoError(t, buf.Release()) }() + for _, key := range keys { + KVToBuffer(&pb.KV{ + Key: key, + Value: []byte("val"), + Version: 1, + }, buf) + } + // Now do an incremental stream write. + sw := db.NewStreamWriter() + require.NoError(t, sw.PrepareIncremental(), "sw.PrepareIncremental() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + } + + t.Run("incremental on non-empty DB", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + buf := z.NewBuffer(10<<20, "test") + defer func() { require.NoError(t, buf.Release()) }() + KVToBuffer(&pb.KV{ + Key: []byte("key-1"), + Value: []byte("val"), + Version: 1, + }, buf) + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + + addIncremtal(t, db, [][]byte{[]byte("key-2")}) + + txn := db.NewTransaction(false) + defer txn.Discard() + _, err := txn.Get([]byte("key-1")) + require.NoError(t, err) + _, err = txn.Get([]byte("key-2")) + require.NoError(t, err) + }) + }) + + t.Run("incremental on empty DB", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + addIncremtal(t, db, [][]byte{[]byte("key-1")}) + txn := db.NewTransaction(false) + defer txn.Discard() + _, err := txn.Get([]byte("key-1")) + require.NoError(t, err) + }) + }) + + t.Run("multiple incremental", func(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + addIncremtal(t, db, [][]byte{[]byte("a1"), []byte("c1")}) + addIncremtal(t, db, [][]byte{[]byte("a2"), []byte("c2")}) + addIncremtal(t, db, [][]byte{[]byte("a3"), []byte("c3")}) + txn := db.NewTransaction(false) + defer txn.Discard() + _, err := txn.Get([]byte("a1")) + require.NoError(t, err) + _, err = txn.Get([]byte("c1")) + require.NoError(t, err) + _, err = txn.Get([]byte("a2")) + require.NoError(t, err) + _, err = txn.Get([]byte("c2")) + require.NoError(t, err) + _, err = txn.Get([]byte("a3")) + require.NoError(t, err) + _, err = txn.Get([]byte("c3")) + require.NoError(t, err) + }) + }) +}