Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pebble Refactor] Making indexing approval concurrent-safe #6374

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions storage/pebble/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pebble
import (
"errors"
"fmt"
"sync"

"github.com/cockroachdb/pebble"

Expand All @@ -15,8 +16,9 @@ import (

// ResultApprovals implements persistent storage for result approvals.
type ResultApprovals struct {
db *pebble.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
indexing *sync.Mutex // preventing concurrent indexing of approvals
db *pebble.DB
cache *Cache[flow.Identifier, *flow.ResultApproval]
}

func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApprovals {
Expand All @@ -34,7 +36,8 @@ func NewResultApprovals(collector module.CacheMetrics, db *pebble.DB) *ResultApp
}

res := &ResultApprovals{
db: db,
indexing: new(sync.Mutex),
db: db,
cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStore[flow.Identifier, *flow.ResultApproval](store),
Expand Down Expand Up @@ -73,27 +76,23 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app
return func(tx storage.PebbleReaderBatchWriter) error {
r, w := tx.ReaderWriter()

err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w)
if err == nil {
return nil
}
var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

// no approval found, index the approval

if !errors.Is(err, storage.ErrAlreadyExists) {
return err
return operation.IndexResultApproval(resultID, chunkIndex, approvalID)(w)
}

// When trying to index an approval for a result, and there is already
// an approval for the result, double check if the indexed approval is
// the same.
// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.
var storedApprovalID flow.Identifier
err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(r)
if err != nil {
return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err)
}

if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
Expand All @@ -104,15 +103,25 @@ func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, app
}
}

// Store stores a ResultApproval
// Store stores a ResultApproval and indexes a ResultApproval by chunk (ResultID + chunk index).
// this method is concurrent-safe
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return operation.WithReaderBatchWriter(r.db, r.store(approval))
}

// Index indexes a ResultApproval by chunk (ResultID + chunk index).
// operation is idempotent (repeated calls with the same value are equivalent to
// just calling the method once; still the method succeeds on each call).
// this method is concurrent-safe
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
// acquring the lock to prevent dirty reads when checking conflicted approvals
// how it works:
// the lock can only be acquired after the index operation is committed to the database,
// since the index operation is the only operation that would affect the reads operation,
// no writes can go through util the lock is released, so locking here could prevent dirty reads.
r.indexing.Lock()
defer r.indexing.Unlock()

err := operation.WithReaderBatchWriter(r.db, r.index(resultID, chunkIndex, approvalID))
if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
Expand Down
60 changes: 56 additions & 4 deletions storage/pebble/approvals_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package pebble_test

import (
"errors"
"sync"
"testing"

"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/unittest"
)
Expand Down Expand Up @@ -72,9 +75,58 @@ func TestApprovalStoreTwoDifferentApprovalsShouldFail(t *testing.T) {
err = store.Store(approval2)
require.NoError(t, err)

// TODO: fix later once implement insert and upsert
// err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
// require.Error(t, err)
// require.True(t, errors.Is(err, storage.ErrDataMismatch))
// index again with a different approval should fail
err = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
require.Error(t, err)
require.True(t, errors.Is(err, storage.ErrDataMismatch))
})
}

// verify that storing and indexing two conflicting approvals concurrently should fail
// one of them is succeed, the other one should fail
func TestApprovalStoreTwoDifferentApprovalsConcurrently(t *testing.T) {
unittest.RunWithPebbleDB(t, func(db *pebble.DB) {
metrics := metrics.NewNoopCollector()
store := bstorage.NewResultApprovals(metrics, db)

approval1 := unittest.ResultApprovalFixture()
approval2 := unittest.ResultApprovalFixture()

var wg sync.WaitGroup
wg.Add(2)

var firstIndexErr, secondIndexErr error

// First goroutine stores and indexes the first approval.
go func() {
defer wg.Done()

err := store.Store(approval1)
require.NoError(t, err)

firstIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval1.ID())
}()

// Second goroutine stores and tries to index the second approval for the same chunk.
go func() {
defer wg.Done()

err := store.Store(approval2)
require.NoError(t, err)

secondIndexErr = store.Index(approval1.Body.ExecutionResultID, approval1.Body.ChunkIndex, approval2.ID())
}()

// Wait for both goroutines to finish
wg.Wait()

// Check that one of the Index operations succeeded and the other failed
if firstIndexErr == nil {
require.Error(t, secondIndexErr)
require.True(t, errors.Is(secondIndexErr, storage.ErrDataMismatch))
} else {
require.NoError(t, secondIndexErr)
require.True(t, errors.Is(firstIndexErr, storage.ErrDataMismatch))
}
})
}
Loading