Skip to content

Commit

Permalink
Merge pull request #6374 from onflow/leo/v0.33-pebble-storage-index-a…
Browse files Browse the repository at this point in the history
…pproval

[Pebble Refactor] Making indexing approval concurrent-safe
  • Loading branch information
zhangchiqing authored Sep 13, 2024
2 parents 51969ec + a01d317 commit 8eb0f74
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 22 deletions.
45 changes: 27 additions & 18 deletions storage/pebble/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pebble
import (
"errors"
"fmt"
"sync"

"github.com/cockroachdb/pebble"

Expand All @@ -15,8 +16,9 @@ import (

// ResultApprovals implements persistent storage for result approvals.
type ResultApprovals struct {
db *pebble.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
indexing *sync.Mutex // preventing concurrent indexing of approvals
db *pebble.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
}

func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApprovals {
Expand All @@ -34,7 +36,8 @@ func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApp
}

res := &ResultApprovals{
db: db,
indexing: new(sync.Mutex),
db: db,
cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStore[flow.Identifier, *flow.ResultApproval](store),
Expand Down Expand Up @@ -73,27 +76,23 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app
return func(tx storage.PebbleReaderBatchWriter) error {
r, w := tx.ReaderWriter()

err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w)
if err == nil {
return nil
}
var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

// no approval found, index the approval

if !errors.Is(err, storage.ErrAlreadyExists) {
return err
return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w)
}

// When trying to index an approval for a result, and there is already
// an approval for the result, double check if the indexed approval is
// the same.
// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.
var storedApprovalID flow.Identifier
err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r)
if err != nil {
return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err)
}

if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
Expand All @@ -104,15 +103,25 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app
}
}

// Store stores a ResultApproval
// Store stores a ResultApproval and indexes a ResultApproval by chunk (ResultID + chunk index).
// this method is concurrent-safe
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return operation.WithReaderBatchWriter(r.db, r.store(approval))
}

// Index indexes a ResultApproval by chunk (ResultID + chunk index).
// operation is idempotent (repeated calls with the same value are equivalent to
// just calling the method once; still the method succeeds on each call).
// this method is concurrent-safe
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
// acquring the lock to prevent dirty reads when checking conflicted approvals
// how it works:
// the lock can only be acquired after the index operation is committed to the database,
// since the index operation is the only operation that would affect the reads operation,
// no writes can go through util the lock is released, so locking here could prevent dirty reads.
r.indexing.Lock()
defer r.indexing.Unlock()

err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID))
if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
Expand Down
60 changes: 56 additions & 4 deletions storage/pebble/approvals_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package pebble_test

import (
"errors"
"sync"
"testing"

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -72,9 +75,58 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) {
err = store.Store(approval2)
require.NoError(t, err)

// TODO: fix later once implement insert and upsert
// err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
// require.Error(t, err)
// require.True(t, errors.Is(err, storage.ErrDataMismatch))
// index again with a different approval should fail
err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
require.Error(t, err)
require.True(t, errors.Is(err, storage.ErrDataMismatch))
})
}

// verify that storing and indexing two conflicting approvals concurrently should fail
// one of them is succeed, the other one should fail
func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
metrics := metrics.NewNoopCollector()
store := bstorage.NewResultApprovals(metrics, db)

approval1 := unittest.ResultApprovalFixture()
approval2 := unittest.ResultApprovalFixture()

var wg sync.WaitGroup
wg.Add(2)

var firstIndexErr, secondIndexErr error

// First goroutine stores and indexes the first approval.
go func() {
defer wg.Done()

err := store.Store(approval1)
require.NoError(t, err)

firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID())
}()

// Second goroutine stores and tries to index the second approval for the same chunk.
go func() {
defer wg.Done()

err := store.Store(approval2)
require.NoError(t, err)

secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
}()

// Wait for both goroutines to finish
wg.Wait()

// Check that one of the Index operations succeeded and the other failed
if firstIndexErr == nil {
require.Error(t, secondIndexErr)
require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch))
} else {
require.NoError(t, secondIndexErr)
require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch))
}
})
}

0 comments on commit 8eb0f74

Please sign in to comment.