diff --git a/storage/pebble/approvals.go b/storage/pebble/approvals.go index 425e6de0c2a..2f4b1575197 100644 --- a/storage/pebble/approvals.go +++ b/storage/pebble/approvals.go @@ -3,6 +3,7 @@ package pebble import ( "errors" "fmt" + "sync" "github.com/cockroachdb/pebble" @@ -15,8 +16,9 @@ import ( // ResultApprovals implements persistent storage for result approvals. type ResultApprovals struct { - db *pebble.DB - cache *Cache[flow.Identifier, *flow.ResultApproval] + indexing *sync.Mutex // preventing concurrent indexing of approvals + db *pebble.DB + cache *Cache[flow.Identifier, *flow.ResultApproval] } func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApprovals { @@ -34,7 +36,8 @@ func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApp } res := &ResultApprovals{ - db: db, + indexing: new(sync.Mutex), + db: db, cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals, withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100), withStore[flow.Identifier, *flow.ResultApproval](store), @@ -73,27 +76,23 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app return func(tx storage.PebbleReaderBatchWriter) error { r, w := tx.ReaderWriter() - err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w) - if err == nil { - return nil - } + var storedApprovalID flow.Identifier + err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("could not lookup result approval ID: %w", err) + } + + // no approval found, index the approval - if !errors.Is(err, storage.ErrAlreadyExists) { - return err + return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w) } - // When trying to index an approval for a result, and there is already - // an approval for the result, double check if the indexed approval is - // the same. + // an approval is already indexed, double check if it is the same // We don't allow indexing multiple approvals per chunk because the // store is only used within Verification nodes, and it is impossible // for a Verification node to compute different approvals for the same // chunk. - var storedApprovalID flow.Identifier - err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r) - if err != nil { - return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err) - } if storedApprovalID != approvalID { return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", @@ -104,7 +103,8 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app } } -// Store stores a ResultApproval +// Store stores a ResultApproval and indexes a ResultApproval by chunk (ResultID + chunk index). +// this method is concurrent-safe func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { return operation.WithReaderBatchWriter(r.db, r.store(approval)) } @@ -112,7 +112,16 @@ func (r *ResultApprovals) Store(approval *flow.ResultApproval) error { // Index indexes a ResultApproval by chunk (ResultID + chunk index). // operation is idempotent (repeated calls with the same value are equivalent to // just calling the method once; still the method succeeds on each call). +// this method is concurrent-safe func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error { + // acquring the lock to prevent dirty reads when checking conflicted approvals + // how it works: + // the lock can only be acquired after the index operation is committed to the database, + // since the index operation is the only operation that would affect the reads operation, + // no writes can go through util the lock is released, so locking here could prevent dirty reads. + r.indexing.Lock() + defer r.indexing.Unlock() + err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID)) if err != nil { return fmt.Errorf("could not index result approval: %w", err) diff --git a/storage/pebble/approvals_test.go b/storage/pebble/approvals_test.go index 39f60c5bdaf..f1f6ed2ef45 100644 --- a/storage/pebble/approvals_test.go +++ b/storage/pebble/approvals_test.go @@ -1,12 +1,15 @@ package pebble_test import ( + "errors" + "sync" "testing" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/flow-go/utils/unittest" ) @@ -72,9 +75,58 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) { err = store.Store(approval2) require.NoError(t, err) - // TODO: fix later once implement insert and upsert - // err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) - // require.Error(t, err) - // require.True(t, errors.Is(err, storage.ErrDataMismatch)) + // index again with a different approval should fail + err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrDataMismatch)) + }) +} + +// verify that storing and indexing two conflicting approvals concurrently should fail +// one of them is succeed, the other one should fail +func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + metrics := metrics.NewNoopCollector() + store := bstorage.NewResultApprovals(metrics, db) + + approval1 := unittest.ResultApprovalFixture() + approval2 := unittest.ResultApprovalFixture() + + var wg sync.WaitGroup + wg.Add(2) + + var firstIndexErr, secondIndexErr error + + // First goroutine stores and indexes the first approval. + go func() { + defer wg.Done() + + err := store.Store(approval1) + require.NoError(t, err) + + firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID()) + }() + + // Second goroutine stores and tries to index the second approval for the same chunk. + go func() { + defer wg.Done() + + err := store.Store(approval2) + require.NoError(t, err) + + secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID()) + }() + + // Wait for both goroutines to finish + wg.Wait() + + // Check that one of the Index operations succeeded and the other failed + if firstIndexErr == nil { + require.Error(t, secondIndexErr) + require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch)) + } else { + require.NoError(t, secondIndexErr) + require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch)) + } }) }