Skip to content

Commit

Permalink
insert approvals with badger batch update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Aug 21, 2024
1 parent 3bd915b commit d3ec64f
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 51 deletions.
84 changes: 44 additions & 40 deletions storage/badger/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package badger
import (
"errors"
"fmt"
"sync"

"github.com/dgraph-io/badger/v2"

Expand All @@ -11,46 +12,47 @@ import (
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"
)

// ResultApprovals implements persistent storage for result approvals.
type ResultApprovals struct {
db *badger.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
db *badger.DB
cache *CacheB[flow.Identifier, *flow.ResultApproval]
indexing *sync.Mutex // preventing concurrent indexing of approvals
}

func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals {

store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error {
return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val)))
store := func(key flow.Identifier, val *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error {
return storage.OnlyBadgerWriter(operation.InsertResultApproval(val))
}

retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) {
retrieve := func(approvalID flow.Identifier) func(tx storage.Reader) (*flow.ResultApproval, error) {
var approval flow.ResultApproval
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
return func(tx storage.Reader) (*flow.ResultApproval, error) {
err := operation.RetrieveResultApproval(approvalID, &approval)(tx)
return &approval, err
}
}

res := &ResultApprovals{
db: db,
cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStore[flow.Identifier, *flow.ResultApproval](store),
withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)),
cache: newCacheB[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimitB[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStoreB[flow.Identifier, *flow.ResultApproval](store),
withRetrieveB[flow.Identifier, *flow.ResultApproval](retrieve)),
indexing: new(sync.Mutex),
}

return res
}

func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error {
func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.BadgerReaderBatchWriter) error {
return r.cache.PutTx(approval.ID(), approval)
}

func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) {
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) {
return func(tx storage.Reader) (*flow.ResultApproval, error) {
val, err := r.cache.Get(approvalID)(tx)
if err != nil {
return nil, err
Expand All @@ -59,8 +61,8 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f
}
}

func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) {
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) {
return func(tx storage.Reader) (*flow.ResultApproval, error) {
var approvalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx)
if err != nil {
Expand All @@ -70,29 +72,27 @@ func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) f
}
}

func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx)
if err == nil {
return nil
}
func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.BadgerReaderBatchWriter) error {
return func(tx storage.BadgerReaderBatchWriter) error {
r, w := tx.ReaderWriter()

var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

if !errors.Is(err, storage.ErrAlreadyExists) {
return err
// no approval found, index the approval

return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w)
}

// When trying to index an approval for a result, and there is already
// an approval for the result, double check if the indexed approval is
// the same.
// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.
var storedApprovalID flow.Identifier
err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx)
if err != nil {
return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err)
}

if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
Expand All @@ -105,14 +105,22 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app

// Store stores a ResultApproval
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval))
return operation.WithBadgerReaderBatchWriter(r.db, r.store(approval))
}

// Index indexes a ResultApproval by chunk (ResultID + chunk index).
// operation is idempotent (repeated calls with the same value are equivalent to
// just calling the method once; still the method succeeds on each call).
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID))
// acquring the lock to prevent dirty reads when checking conflicted approvals
// how it works:
// the lock can only be acquired after the index operation is committed to the database,
// since the index operation is the only operation that would affect the reads operation,
// no writes can go through util the lock is released, so locking here could prevent dirty reads.
r.indexing.Lock()
defer r.indexing.Unlock()

err := operation.WithBadgerReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID))
if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
}
Expand All @@ -121,16 +129,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app

// ByID retrieves a ResultApproval by its ID
func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byID(approvalID)(tx)
return r.byID(approvalID)(operation.ToReader(r.db))
}

// ByChunk retrieves a ResultApproval by result ID and chunk index. The
// ResultApprovals store is only used within a verification node, where it is
// assumed that there is never more than one approval per chunk.
func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byChunk(resultID, chunkIndex)(tx)
return r.byChunk(resultID, chunkIndex)(operation.ToReader(r.db))
}
152 changes: 152 additions & 0 deletions storage/badger/cache_b.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package badger

import (
"errors"
"fmt"

lru "github.com/hashicorp/golang-lru/v2"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/storage"
)

func withLimitB[K comparable, V any](limit uint) func(*CacheB[K, V]) {
return func(c *CacheB[K, V]) {
c.limit = limit
}
}

type storeFuncB[K comparable, V any] func(key K, val V) func(storage.BadgerReaderBatchWriter) error

func withStoreB[K comparable, V any](store storeFuncB[K, V]) func(*CacheB[K, V]) {
return func(c *CacheB[K, V]) {
c.store = store
}
}

func noStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error {
return func(tx storage.BadgerReaderBatchWriter) error {
return fmt.Errorf("no store function for cache put available")
}
}

// nolint: unused
func noopStoreB[K comparable, V any](_ K, _ V) func(storage.BadgerReaderBatchWriter) error {
return func(tx storage.BadgerReaderBatchWriter) error {
return nil
}
}

type retrieveFuncB[K comparable, V any] func(key K) func(storage.Reader) (V, error)

func withRetrieveB[K comparable, V any](retrieve retrieveFuncB[K, V]) func(*CacheB[K, V]) {
return func(c *CacheB[K, V]) {
c.retrieve = retrieve
}
}

func noRetrieveB[K comparable, V any](_ K) func(storage.Reader) (V, error) {
return func(tx storage.Reader) (V, error) {
var nullV V
return nullV, fmt.Errorf("no retrieve function for cache get available")
}
}

type CacheB[K comparable, V any] struct {
metrics module.CacheMetrics
limit uint
store storeFuncB[K, V]
retrieve retrieveFuncB[K, V]
resource string
cache *lru.Cache[K, V]
}

func newCacheB[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*CacheB[K, V])) *CacheB[K, V] {
c := CacheB[K, V]{
metrics: collector,
limit: 1000,
store: noStoreB[K, V],
retrieve: noRetrieveB[K, V],
resource: resourceName,
}
for _, option := range options {
option(&c)
}
c.cache, _ = lru.New[K, V](int(c.limit))
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
return &c
}

// IsCached returns true if the key exists in the cache.
// It DOES NOT check whether the key exists in the underlying data store.
func (c *CacheB[K, V]) IsCached(key K) bool {
return c.cache.Contains(key)
}

// Get will try to retrieve the resource from cache first, and then from the
// injected. During normal operations, the following error returns are expected:
// - `storage.ErrNotFound` if key is unknown.
func (c *CacheB[K, V]) Get(key K) func(storage.Reader) (V, error) {
return func(tx storage.Reader) (V, error) {

// check if we have it in the cache
resource, cached := c.cache.Get(key)
if cached {
c.metrics.CacheHit(c.resource)
return resource, nil
}

// get it from the database
resource, err := c.retrieve(key)(tx)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
c.metrics.CacheNotFound(c.resource)
}
var nullV V
return nullV, fmt.Errorf("could not retrieve resource: %w", err)
}

c.metrics.CacheMiss(c.resource)

// cache the resource and eject least recently used one if we reached limit
evicted := c.cache.Add(key, resource)
if !evicted {
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
}

return resource, nil
}
}

func (c *CacheB[K, V]) Remove(key K) {
c.cache.Remove(key)
}

// Insert will add a resource directly to the cache with the given ID
func (c *CacheB[K, V]) Insert(key K, resource V) {
// cache the resource and eject least recently used one if we reached limit
evicted := c.cache.Add(key, resource)
if !evicted {
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
}
}

// PutTx will return tx which adds a resource to the cache with the given ID.
func (c *CacheB[K, V]) PutTx(key K, resource V) func(storage.BadgerReaderBatchWriter) error {
storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution)

return func(tx storage.BadgerReaderBatchWriter) error {
tx.AddCallback(func(err error) {
if err != nil {
c.Insert(key, resource)
}
})

err := storeOps(tx) // execute operations to store resource
if err != nil {
return fmt.Errorf("could not store resource: %w", err)
}

return nil
}
}
22 changes: 12 additions & 10 deletions storage/badger/operation/approvals.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
package operation

import (
"github.com/dgraph-io/badger/v2"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

// InsertResultApproval inserts a ResultApproval by ID.
func InsertResultApproval(approval *flow.ResultApproval) func(*badger.Txn) error {
return insert(makePrefix(codeResultApproval, approval.ID()), approval)
// The same key (`approval.ID()`) necessitates that the value (full `approval`) is
// also identical (otherwise, we would have a successful pre-image attack on our
// cryptographic hash function). Therefore, concurrent calls to this function are safe.
func InsertResultApproval(approval *flow.ResultApproval) func(storage.Writer) error {
return insertW(makePrefix(codeResultApproval, approval.ID()), approval)
}

// RetrieveResultApproval retrieves an approval by ID.
func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(*badger.Txn) error {
return retrieve(makePrefix(codeResultApproval, approvalID), approval)
func RetrieveResultApproval(approvalID flow.Identifier, approval *flow.ResultApproval) func(storage.Reader) error {
return retrieveR(makePrefix(codeResultApproval, approvalID), approval)
}

// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists
// error is returned. This operation is only used by the ResultApprovals store,
// which is only used within a Verification node, where it is assumed that there
// is only one approval per chunk.
func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error {
return insert(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
func IndexResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.Writer) error {
return insertW(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}

// LookupResultApproval finds a ResultApproval by result ID and chunk index.
func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(*badger.Txn) error {
return retrieve(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
func LookupResultApproval(resultID flow.Identifier, chunkIndex uint64, approvalID *flow.Identifier) func(storage.Reader) error {
return retrieveR(makePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}
Loading

0 comments on commit d3ec64f

Please sign in to comment.