Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Badger Batch] Insert approvals with badger batch update #6381

Closed
wants to merge 10 commits into from
83 changes: 43 additions & 40 deletions storage/badger/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package badger
import (
"errors"
"fmt"
"sync"

"github.com/dgraph-io/badger/v2"

Expand All @@ -11,46 +12,47 @@ import (
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"
)

// ResultApprovals implements persistent storage for result approvals.
type ResultApprovals struct {
db *badger.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
db *badger.DB
cache *CacheB[flow.Identifier, *flow.ResultApproval]
indexing *sync.Mutex // preventing concurrent indexing of approvals
}

func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals {

store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error {
return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val)))
store := func(key flow.Identifier, val *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error {
return storage.OnlyBadgerWriter(operation.InsertResultApproval(val))
}

retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) {
retrieve := func(approvalID flow.Identifier) func(tx storage.Reader) (*flow.ResultApproval, error) {
var approval flow.ResultApproval
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
return func(tx storage.Reader) (*flow.ResultApproval, error) {
err := operation.RetrieveResultApproval(approvalID, &approval)(tx)
return &approval, err
}
}

res := &ResultApprovals{
db: db,
cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStore[flow.Identifier, *flow.ResultApproval](store),
withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)),
cache: newCacheB[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimitB[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStoreB[flow.Identifier, *flow.ResultApproval](store),
withRetrieveB[flow.Identifier, *flow.ResultApproval](retrieve)),
indexing: new(sync.Mutex),
}

return res
}

func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error {
func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error {
return r.cache.PutTx(approval.ID(), approval)
}

func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) {
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) {
return func(tx storage.Reader) (*flow.ResultApproval, error) {
val, err := r.cache.Get(approvalID)(tx)
if err != nil {
return nil, err
Expand All @@ -59,8 +61,8 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f
}
}

func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) {
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) {
return func(tx storage.Reader) (*flow.ResultApproval, error) {
var approvalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx)
if err != nil {
Expand All @@ -70,29 +72,26 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f
}
}

func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx)
if err == nil {
return nil
}
// CAUTION: Caller must acquire `indexing` lock.
func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.BadgerReaderBatchWriter) error {
zhangchiqing marked this conversation as resolved.
Show resolved Hide resolved
return func(rw storage.BadgerReaderBatchWriter) error {
var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(rw.GlobalReader())
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

// no approval found, index the approval

if !errors.Is(err, storage.ErrAlreadyExists) {
return err
return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer())
}

// When trying to index an approval for a result, and there is already
// an approval for the result, double check if the indexed approval is
// the same.
// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.
var storedApprovalID flow.Identifier
err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx)
if err != nil {
return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err)
}

if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
Expand All @@ -105,14 +104,22 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app

// Store stores a ResultApproval
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval))
return operation.WithReaderBatchWriter(r.db, r.store(approval))
}

// Index indexes a ResultApproval by chunk (ResultID + chunk index).
// operation is idempotent (repeated calls with the same value are equivalent to
// just calling the method once; still the method succeeds on each call).
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID))
// acquring the lock to prevent dirty reads when checking conflicted approvals
// how it works:
// the lock can only be acquired after the index operation is committed to the database,
// since the index operation is the only operation that would affect the reads operation,
// no writes can go through util the lock is released, so locking here could prevent dirty reads.
r.indexing.Lock()
defer r.indexing.Unlock()

err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID))
if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
}
Expand All @@ -121,16 +128,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app

// ByID retrieves a ResultApproval by its ID
func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byID(approvalID)(tx)
return r.byID(approvalID)(operation.ToReader(r.db))
}

// ByChunk retrieves a ResultApproval by result ID and chunk index. The
// ResultApprovals store is only used within a verification node, where it is
// assumed that there is never more than one approval per chunk.
func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byChunk(resultID, chunkIndex)(tx)
return r.byChunk(resultID, chunkIndex)(operation.ToReader(r.db))
}
50 changes: 50 additions & 0 deletions storage/badger/approvals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package badger_test

import (
"errors"
"sync"
"testing"

"github.com/dgraph-io/badger/v2"
Expand Down Expand Up @@ -79,3 +80,52 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) {
require.True(t, errors.Is(err, storage.ErrDataMismatch))
})
}

// verify that storing and indexing two conflicting approvals concurrently should fail
// one of them is succeed, the other one should fail
func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) {
unittest.RunWithBadgerDB(t, func(db *badger.DB) {
metrics := metrics.NewNoopCollector()
store := bstorage.NewResultApprovals(metrics, db)

approval1 := unittest.ResultApprovalFixture()
approval2 := unittest.ResultApprovalFixture()

var wg sync.WaitGroup
wg.Add(2)

var firstIndexErr, secondIndexErr error

// First goroutine stores and indexes the first approval.
go func() {
defer wg.Done()

err := store.Store(approval1)
require.NoError(t, err)

firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID())
}()

// Second goroutine stores and tries to index the second approval for the same chunk.
go func() {
defer wg.Done()

err := store.Store(approval2)
require.NoError(t, err)

secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
}()

// Wait for both goroutines to finish
wg.Wait()

// Check that one of the Index operations succeeded and the other failed
if firstIndexErr == nil {
require.Error(t, secondIndexErr)
require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch))
} else {
require.NoError(t, secondIndexErr)
require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch))
}
})
}
150 changes: 150 additions & 0 deletions storage/badger/cache_b.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package badger

import (
"errors"
"fmt"

lru "github.com/hashicorp/golang-lru/v2"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/storage"
)

func withLimitB[K comparable, V any](limit uint) func(*CacheB[K, V]) {
return func(c *CacheB[K, V]) {
c.limit = limit
}
}

type storeFuncB[K comparable, V any] func(key K, val V) func(storage.BadgerReaderBatchWriter) error

func withStoreB[K comparable, V any](store storeFuncB[K, V]) func(*CacheB[K, V]) {
return func(c *CacheB[K, V]) {
c.store = store
}
}

func noStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error {
return func(tx storage.BadgerReaderBatchWriter) error {
return fmt.Errorf("no store function for cache put available")
}
}

// nolint: unused
func noopStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error {
return func(tx storage.BadgerReaderBatchWriter) error {
return nil
}
}

type retrieveFuncB[K comparable, V any] func(key K) func(storage.Reader) (V, error)

func withRetrieveB[K comparable, V any](retrieve retrieveFuncB[K, V]) func(*CacheB[K, V]) {
return func(c *CacheB[K, V]) {
c.retrieve = retrieve
}
}

func noRetrieveB[K comparable, V any](_ K) func(storage.Reader) (V, error) {
return func(tx storage.Reader) (V, error) {
var nullV V
return nullV, fmt.Errorf("no retrieve function for cache get available")
}
}

type CacheB[K comparable, V any] struct {
metrics module.CacheMetrics
limit uint
store storeFuncB[K, V]
retrieve retrieveFuncB[K, V]
resource string
cache *lru.Cache[K, V]
}

func newCacheB[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*CacheB[K, V])) *CacheB[K, V] {
c := CacheB[K, V]{
metrics: collector,
limit: 1000,
store: noStoreB[K, V],
retrieve: noRetrieveB[K, V],
resource: resourceName,
}
for _, option := range options {
option(&c)
}
c.cache, _ = lru.New[K, V](int(c.limit))
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
return &c
}

// IsCached returns true if the key exists in the cache.
// It DOES NOT check whether the key exists in the underlying data store.
func (c *CacheB[K, V]) IsCached(key K) bool {
return c.cache.Contains(key)
}

// Get will try to retrieve the resource from cache first, and then from the
// injected. During normal operations, the following error returns are expected:
// - `storage.ErrNotFound` if key is unknown.
func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) {
return func(r storage.Reader) (V, error) {

// check if we have it in the cache
resource, cached := c.cache.Get(key)
if cached {
c.metrics.CacheHit(c.resource)
return resource, nil
}

// get it from the database
resource, err := c.retrieve(key)(r)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
c.metrics.CacheNotFound(c.resource)
}
var nullV V
return nullV, fmt.Errorf("could not retrieve resource: %w", err)
}

c.metrics.CacheMiss(c.resource)

// cache the resource and eject least recently used one if we reached limit
evicted := c.cache.Add(key, resource)
if !evicted {
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
}

return resource, nil
}
}

func (c *CacheB[K, V]) Remove(key K) {
c.cache.Remove(key)
}

// Insert will add a resource directly to the cache with the given ID
func (c *CacheB[K, V]) Insert(key K, resource V) {
// cache the resource and eject least recently used one if we reached limit
evicted := c.cache.Add(key, resource)
if !evicted {
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
}
}

// PutTx will return tx which adds a resource to the cache with the given ID.
func (c *CacheB[K, V]) PutTx(key K, resource V) func(storage.BadgerReaderBatchWriter) error {
storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution)

return func(rw storage.BadgerReaderBatchWriter) error {
storage.OnCommitSucceed(rw, func() {
c.Insert(key, resource)
})

err := storeOps(rw) // execute operations to store resource
if err != nil {
return fmt.Errorf("could not store resource: %w", err)
}

return nil
}
}
Loading
Loading