Skip to content

Commit

Permalink
(release/v2.2007) Add command to stream contents of DB into another D…
Browse files Browse the repository at this point in the history
…B. (#1486)

(cherry picked from commit dc653b0)
  • Loading branch information
martinmr authored Sep 14, 2020
1 parent d5a25b8 commit d12bee4
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 0 deletions.
98 changes: 98 additions & 0 deletions badger/cmd/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2020 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd

import (
"io"
"math"
"os"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

var streamCmd = &cobra.Command{
Use: "stream",
Short: "Stream DB into another DB with different options",
Long: `
This command streams the contents of this DB into another DB with the given options.
`,
RunE: stream,
}

var outDir string
var numVersions int
var compressionType uint32

func init() {
// TODO: Add more options.
RootCmd.AddCommand(streamCmd)
streamCmd.Flags().StringVarP(&outDir, "out", "o", "",
"Path to output DB. The directory should be empty.")
streamCmd.Flags().BoolVarP(&truncate, "truncate", "", false, "Option to truncate the DBs")
streamCmd.Flags().BoolVarP(&readOnly, "read_only", "", true,
"Option to open input DB in read-only mode")
streamCmd.Flags().IntVarP(&numVersions, "num_versions", "", 0,
"Option to configure the maximum number of versions per key. "+
"Values <= 0 will be considered to have the max number of versions.")
streamCmd.Flags().Uint32VarP(&compressionType, "compression", "", 0,
"Option to configure the compression type in output DB. "+
"0 to disable, 1 for Snappy, and 2 for ZSTD.")
}

func stream(cmd *cobra.Command, args []string) error {
// Check that outDir doesn't exist or is empty.
if _, err := os.Stat(outDir); err == nil {
f, err := os.Open(outDir)
if err != nil {
return err
}
defer f.Close()

_, err = f.Readdirnames(1)
if err != io.EOF {
return errors.Errorf("cannot run stream tool on non-empty output directory %s", outDir)
}
}

// Options for input DB.
if numVersions <= 0 {
numVersions = math.MaxInt32
}
inOpt := badger.DefaultOptions(sstDir).
WithReadOnly(readOnly).
WithTruncate(truncate).
WithValueThreshold(1 << 10 /* 1KB */).
WithNumVersionsToKeep(numVersions)

// Options for output DB.
if compressionType < 0 || compressionType > 2 {
return errors.Errorf(
"compression value must be one of 0 (disabled), 1 (Snappy), or 2 (ZSTD)")
}
outOpt := inOpt.WithDir(outDir).WithValueDir(outDir).
WithCompression(options.CompressionType(compressionType)).WithReadOnly(false)

inDB, err := badger.OpenManaged(inOpt)
if err != nil {
return errors.Wrapf(err, "cannot open DB at %s", sstDir)
}
defer inDB.Close()
return inDB.StreamDB(outOpt)
}
32 changes: 32 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/binary"
"expvar"
"fmt"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -1786,3 +1787,34 @@ func createDirs(opt Options) error {
}
return nil
}

// Stream the contents of this DB to a new DB with options outOptions that will be
// created in outDir.
func (db *DB) StreamDB(outOptions Options) error {
outDir := outOptions.Dir

// Open output DB.
outDB, err := OpenManaged(outOptions)
if err != nil {
return errors.Wrapf(err, "cannot open out DB at %s", outDir)
}
defer outDB.Close()
writer := outDB.NewStreamWriter()
if err := writer.Prepare(); err != nil {
errors.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
}

// Stream contents of DB to the output DB.
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
stream.Send = func(kvs *pb.KVList) error {
return writer.Write(kvs)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return errors.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
}
if err := writer.Flush(); err != nil {
return errors.Wrapf(err, "cannot flush writer")
}
return nil
}
43 changes: 43 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,49 @@ func TestForceCompactL0(t *testing.T) {
require.NoError(t, db.Close())
}

func TestStreamDB(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir).WithCompression(options.ZSTD)

db, err := OpenManaged(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

writer := db.NewManagedWriteBatch()
for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
require.NoError(t, writer.SetEntryAt(NewEntry(key, val).WithMeta(0x00), 1))
}
require.NoError(t, writer.Flush())

outDir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
outOpt := getTestOptions(outDir)
require.NoError(t, db.StreamDB(outOpt))

outDB, err := OpenManaged(outOpt)
require.NoError(t, err)
defer func() {
require.NoError(t, outDB.Close())
}()

for i := 0; i < 100; i++ {
key := []byte(fmt.Sprintf("key%d", i))
val := []byte(fmt.Sprintf("val%d", i))
txn := outDB.NewTransactionAt(1, false)
item, err := txn.Get(key)
require.NoError(t, err)
require.EqualValues(t, val, getItemValue(t, item))
require.Equal(t, byte(0x00), item.UserMeta())
txn.Discard()
}
}

func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
Expand Down

0 comments on commit d12bee4

Please sign in to comment.