Skip to content

Commit

Permalink
feat: support using badgerDB for suppressions
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Oct 26, 2022
1 parent dceaf29 commit 3ff091e
Show file tree
Hide file tree
Showing 9 changed files with 705 additions and 5 deletions.
210 changes: 210 additions & 0 deletions enterprise/suppress-user/internal/repository/badgerdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package repository

import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

badger "github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
"github.com/rudderlabs/rudder-server/utils/logger"
)

const tokenKey = "__token__"

type badgerRepository struct {
restoreLock sync.RWMutex
log logger.Logger
path string
db *badger.DB
maxGoroutines int
started bool
done chan struct{}
}

func NewBadgerRepository(path string, log logger.Logger) Repository {
b := &badgerRepository{
log: log,
path: path,
maxGoroutines: 1,
}
return b
}

func (b *badgerRepository) GetToken() ([]byte, error) {
b.restoreLock.RLock()
defer b.restoreLock.RUnlock()
if b.db == nil {
return nil, ErrNotInitialized
}
var token []byte
err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(tokenKey))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
token = val
return nil
})
return err
})
if err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
return nil, err
}
return token, nil
}

func (b *badgerRepository) Suppress(userID, workspaceID, sourceID string) (bool, error) {
b.restoreLock.RLock()
defer b.restoreLock.RUnlock()
if b.db == nil {
return false, ErrNotInitialized
}
key := fmt.Sprintf("%s:%s", workspaceID, userID)
var value []string

err := b.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
if len(val) > 0 {
value = strings.Split(string(val), ",")
}
return nil
})
return err
})
if err != nil {
if err == badger.ErrKeyNotFound {
return false, nil
}
return false, err
}
if len(value) == 0 {
return true, nil
}
for _, v := range value {
if v == sourceID {
return true, nil
}
}
return false, err
}

func (b *badgerRepository) Add(suppressions []Suppression, token []byte) error {
b.restoreLock.RLock()
defer b.restoreLock.RUnlock()
if b.db == nil {
return ErrNotInitialized
}
wb := b.db.NewWriteBatch()
defer wb.Cancel()
for i := range suppressions {
suppression := suppressions[i]
key := fmt.Sprintf("%s:%s", suppression.WorkspaceID, suppression.UserID)
var err error
if suppression.Canceled {
err = wb.Delete([]byte(key))
} else {
err = wb.Set([]byte(key), []byte(strings.Join(suppression.SourceIDs, ",")))
}
if err != nil {
return err
}
}
if err := wb.Set([]byte(tokenKey), token); err != nil {
return err
}
err := wb.Flush()
return err
}

func (b *badgerRepository) Start() error {
if b.started {
return nil
}
b.done = make(chan struct{})
opts := badger.
DefaultOptions(b.path).
WithLogger(blogger{b.log}).
WithCompression(options.None).
WithNumMemtables(1).
WithMemTableSize(16 << 20).
WithNumLevelZeroTables(2).
WithNumLevelZeroTablesStall(4).
WithIndexCacheSize(10 << 20).
WithNumGoroutines(b.maxGoroutines)
var err error
b.db, err = badger.Open(opts)
if err != nil {
return err
}
b.started = true
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-b.done:
return
case <-ticker.C:
}
again:
err := b.db.RunValueLogGC(0.7)
if err == nil {
goto again
}
}
}()
return nil
}

func (b *badgerRepository) Stop() error {
if b.db != nil {
close(b.done)
err := b.db.Close()
b.started = false
b.db = nil
return err
}
return nil
}

func (b *badgerRepository) Backup(w io.Writer) error {
b.restoreLock.RLock()
defer b.restoreLock.RUnlock()
_, err := b.db.Backup(w, 0)
return err
}

func (b *badgerRepository) Restore(r io.Reader) (err error) {
b.restoreLock.Lock()
defer b.restoreLock.Unlock()
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during restore: %v", r)
}
}()
return b.db.Load(r, b.maxGoroutines)
}

func NewBadgerTestHelper(r Repository) (*BadgerTestHelper, error) {
if br, ok := r.(*badgerRepository); ok {
return &BadgerTestHelper{br}, nil
}
return nil, fmt.Errorf("repository is not a badger repository")
}

type BadgerTestHelper struct {
b *badgerRepository
}

func (h *BadgerTestHelper) StopBadgerDB() error {
return h.b.Stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package repository_test

import (
"bufio"
"fmt"
"os"
"path"
"runtime"
"runtime/debug"
"testing"
"time"

"github.com/rudderlabs/rudder-server/enterprise/suppress-user/internal/repository"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/stretchr/testify/require"
)

// BenchmarkBackupRestore benchmarks the backup and restore time of the badger repository
// after seeding it with a very large number of suppressions.
func BenchmarkBackupRestore(b *testing.B) {
debug.SetMemoryLimit(1 << 30) // 1GB
totalSuppressions := 50_000_000
batchSize := 5000
backupFilename := path.Join(b.TempDir(), "backup.badger")

b.Run("backup", func(b *testing.B) {
repo := repository.NewBadgerRepository(path.Join(b.TempDir(), "repo-1"), logger.NOP)
require.NoError(b, repo.Start())
defer func() { _ = repo.Stop() }()
for i := 0; i < totalSuppressions/batchSize; i++ {
suppressions := generateSuppressions(i*batchSize/2, batchSize/2)
token := []byte(fmt.Sprintf("token%d", i))
require.NoError(b, repo.Add(suppressions, token))
}
debug.FreeOSMemory()
var m runtime.MemStats
runtime.ReadMemStats(&m)
b.Logf("before backup memory: %d MB", m.Sys/1024/1024)

start := time.Now()
f, err := os.Create(backupFilename)
w := bufio.NewWriter(f)
defer func() { _ = f.Close() }()
require.NoError(b, err)
require.NoError(b, repo.Backup(w))
dur := time.Since(start)
fileInfo, err := f.Stat()
require.NoError(b, err)
b.ReportMetric(float64(fileInfo.Size()/1024/1024), "filesize(MB)")
b.ReportMetric(dur.Seconds(), "duration(sec)")
collectMemStats(b)
})

b.Run("restore", func(b *testing.B) {
repo := repository.NewBadgerRepository(path.Join(b.TempDir(), "repo-2"), logger.NOP)
require.NoError(b, repo.Start())
defer func() { _ = repo.Stop() }()

start := time.Now()
f, err := os.Open(backupFilename)
require.NoError(b, err)
r := bufio.NewReader(f)
fileInfo, err := f.Stat()
require.NoError(b, err)
b.ReportMetric(float64(fileInfo.Size()/1024/1024), "filesize(MB)")

defer func() { _ = f.Close() }()
require.NoError(b, repo.Restore(r))
b.ReportMetric(time.Since(start).Seconds(), "duration(sec)")
collectMemStats(b)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package repository_test

import (
"fmt"
"math/rand"
"runtime"
"runtime/debug"
"testing"
"time"

"github.com/rudderlabs/rudder-server/enterprise/suppress-user/internal/repository"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/stretchr/testify/require"
)

func BenchmarkAddAndSuppress(b *testing.B) {
debug.SetMemoryLimit(1 << 30) // 1GB
totalSuppressions := 20_000_000
totalReads := 10_000
batchSize := 5000

runAddSuppressionsBenchmark := func(b *testing.B, repo repository.Repository, batchSize, totalSuppressions int) {
require.NoError(b, repo.Start())
var totalTime time.Duration
var totalAddSuppressions int
for i := 0; i < totalSuppressions/batchSize; i++ {
suppressions1 := generateSuppressions(i*batchSize/2, batchSize/2)
suppressions2 := generateSuppressions(i*batchSize/2, batchSize/2)
token := []byte(fmt.Sprintf("token%d", i))
start := time.Now()
require.NoError(b, repo.Add(suppressions1, token))
require.NoError(b, repo.Add(suppressions2, token))
totalTime += time.Since(start)
totalAddSuppressions += batchSize
}
b.ReportMetric(float64(totalSuppressions)/totalTime.Seconds(), "suppressions/s(add)")
}

runSuppressBenchmark := func(b *testing.B, repo repository.Repository, totalSuppressions, totalReads int) {
require.NoError(b, repo.Start())
var totalTime time.Duration
for i := 0; i < totalReads; i++ {
start := time.Now()
idx := randomInt(totalSuppressions)
_, err := repo.Suppress(fmt.Sprintf("user%d", idx), fmt.Sprintf("workspace%d", idx), fmt.Sprintf("source%d", idx))
require.NoError(b, err)
totalTime += time.Since(start)
}
b.ReportMetric(float64(totalSuppressions)/totalTime.Seconds(), "suppressions/s(read)")
}

runAddAndSuppressBenchmark := func(b *testing.B, repo repository.Repository, totalSuppressions, batchSize, totalReads int) {
runAddSuppressionsBenchmark(b, repo, batchSize, totalSuppressions)
runSuppressBenchmark(b, repo, totalSuppressions, totalReads)
collectMemStats(b)
}
b.Run("badger", func(b *testing.B) {
repo := repository.NewBadgerRepository(b.TempDir(), logger.NOP)
defer func() { _ = repo.Stop() }()
runAddAndSuppressBenchmark(b, repo, totalSuppressions, batchSize, totalReads)
})

b.Run("memory", func(b *testing.B) {
repo := repository.NewMemoryRepository(logger.NOP)
defer func() { _ = repo.Stop() }()
runAddAndSuppressBenchmark(b, repo, totalSuppressions, batchSize, totalReads)
})
}

func generateSuppressions(startFrom, batchSize int) []repository.Suppression {
var res []repository.Suppression

for i := startFrom; i < startFrom+batchSize; i++ {
res = append(res, repository.Suppression{
Canceled: randomInt(2) == 0,
WorkspaceID: fmt.Sprintf("workspace%d", i),
UserID: fmt.Sprintf("user%d", i),
SourceIDs: []string{fmt.Sprintf("source%d", i), "otherSource", "anotherSource"},
})
}
return res
}

func init() {
rand.Seed(time.Now().UnixNano())
}

func randomInt(lt int) int {
return rand.Int() % lt
}

func collectMemStats(b *testing.B) {
debug.FreeOSMemory()
var m runtime.MemStats
runtime.ReadMemStats(&m)
b.ReportMetric(float64(m.Sys/1024/1024), "memory(MB)")
}
Loading

0 comments on commit 3ff091e

Please sign in to comment.