Skip to content

Commit

Permalink
introduce mutex for state and lastCommitInfo to avoid race condition …
Browse files Browse the repository at this point in the history
…betwwen Commit and CreateQueryContext
  • Loading branch information
beer-1 committed Nov 29, 2024
1 parent 6cfe2dc commit d142938
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 47 deletions.
72 changes: 41 additions & 31 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
// initialize states with a correct header
app.setState(execModeFinalize, initHeader)
app.setState(execModeCheck, initHeader)
finalizeState := app.getState(execModeFinalize)

// Store the consensus params in the BaseApp's param store. Note, this must be
// done after the finalizeBlockState and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams)
err := app.StoreConsensusParams(finalizeState.Context(), *req.ConsensusParams)
if err != nil {
return nil, err
}
Expand All @@ -86,13 +87,14 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
// handler, the block height is zero by default. However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader).
checkState := app.getState(execModeCheck)
checkState.SetContext(checkState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
}))
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader).
finalizeState.SetContext(finalizeState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Expand All @@ -105,9 +107,9 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
}

// add block gas meter for any genesis transactions (allow infinite gas)
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))

res, err := app.initChainer(app.finalizeBlockState.Context(), req)
res, err := app.initChainer(finalizeState.Context(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -604,7 +606,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.ExtendVoteRequest) (
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -684,7 +686,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -742,7 +744,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r

// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context()
// only used to handle early cancellation, for anything related to state app.getState(execModeFinalize).Context()
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
var events []abci.Event
Expand Down Expand Up @@ -773,12 +775,14 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// finalizeBlockState should be set on InitChain or ProcessProposal. If it is
// nil, it means we are replaying this block and we need to set the state here
// given that during block replay ProcessProposal is not executed by CometBFT.
if app.finalizeBlockState == nil {
finalizeState := app.getState(execModeFinalize)
if finalizeState == nil {
app.setState(execModeFinalize, header)
finalizeState = app.getState(execModeFinalize)
}

// Context is now updated with Header information.
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().
finalizeState.SetContext(finalizeState.Context().
WithBlockHeader(header).
WithHeaderHash(req.Hash).
WithHeaderInfo(coreheader.Info{
Expand All @@ -788,7 +792,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
Hash: req.Hash,
AppHash: app.LastCommitID().Hash,
}).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())).
WithConsensusParams(app.GetConsensusParams(finalizeState.Context())).
WithVoteInfos(req.DecidedLastCommit.Votes).
WithExecMode(sdk.ExecModeFinalize).
WithCometInfo(corecomet.Info{
Expand All @@ -799,11 +803,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}))

// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
gasMeter := app.getBlockGasMeter(finalizeState.Context())
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter))

if app.checkState != nil {
app.checkState.SetContext(app.checkState.Context().
if checkState := app.getState(execModeCheck); checkState != nil {
checkState.SetContext(checkState.Context().
WithBlockGasMeter(gasMeter).
WithHeaderHash(req.Hash))
}
Expand Down Expand Up @@ -831,8 +835,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
gasMeter = app.getBlockGasMeter(finalizeState.Context())
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter))

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
Expand Down Expand Up @@ -861,11 +865,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
txResults = append(txResults, response)
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
if finalizeState.ms.TracingEnabled() {
finalizeState.ms = finalizeState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
endBlock, err := app.endBlock(finalizeState.Context())
if err != nil {
return nil, err
}
Expand All @@ -879,7 +883,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
cp := app.GetConsensusParams(finalizeState.Context())

return &abci.FinalizeBlockResponse{
Events: events,
Expand All @@ -903,7 +907,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
defer func() {
// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if streamErr := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); streamErr != nil {
if streamErr := streamingListener.ListenFinalizeBlock(app.getState(execModeFinalize).Context(), *req, *res); streamErr != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
if app.streamingManager.StopNodeOnErr {
// if StopNodeOnErr is set, we should return the streamErr in order to stop the node
Expand All @@ -929,7 +933,10 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
}

// if it was aborted, we need to reset the state
app.stateMut.Lock()
app.finalizeBlockState = nil
app.stateMut.Unlock()

app.optimisticExec.Reset()
}

Expand Down Expand Up @@ -968,11 +975,12 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
header := app.finalizeBlockState.Context().BlockHeader()
finalizeState := app.getState(execModeFinalize)
header := finalizeState.Context().BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

if app.precommiter != nil {
app.precommiter(app.finalizeBlockState.Context())
app.precommiter(finalizeState.Context())
}

rms, ok := app.cms.(*rootmulti.Store)
Expand All @@ -988,7 +996,7 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {

abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.Context()
ctx := finalizeState.Context()
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()

Expand All @@ -1013,10 +1021,12 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
// Commit. Use the header from this latest block.
app.setState(execModeCheck, header)

app.stateMut.Lock()
app.finalizeBlockState = nil
app.stateMut.Unlock()

if app.prepareCheckStater != nil {
app.prepareCheckStater(app.checkState.Context())
app.prepareCheckStater(app.getState(execModeCheck).Context())
}

// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
Expand All @@ -1034,7 +1044,7 @@ func (app *BaseApp) workingHash() []byte {
// Write the FinalizeBlock state into branched storage and commit the MultiStore.
// The write to the FinalizeBlock state writes all state transitions to the root
// MultiStore (app.cms) so when Commit() is called it persists those values.
app.finalizeBlockState.ms.Write()
app.getState(execModeFinalize).ms.Write()

// Get the hash of all writes in order to return the apphash to the comet in finalizeBlock.
commitHash := app.cms.WorkingHash()
Expand Down Expand Up @@ -1181,7 +1191,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.QueryResponse {
// access any state changes made in InitChain.
func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context {
if height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()

// clear all context data set during InitChain to avoid inconsistent behavior
ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{})
Expand Down Expand Up @@ -1282,8 +1292,8 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check
var header *cmtproto.Header
isLatest := height == 0
for _, state := range []*state{
app.checkState,
app.finalizeBlockState,
app.getState(execModeCheck),
app.getState(execModeFinalize),
} {
if state != nil {
// branch the commit multi-store for safety
Expand Down Expand Up @@ -1396,7 +1406,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 {
// evidence parameters instead of computing an estimated number of blocks based
// on the unbonding period and block commitment time as the two should be
// equivalent.
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
cp := app.GetConsensusParams(app.getState(execModeFinalize).Context())
if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 {
retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks
}
Expand Down
49 changes: 49 additions & 0 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -2779,3 +2780,51 @@ func TestABCI_Proposal_FailReCheckTx(t *testing.T) {
require.NotEmpty(t, res.TxResults[0].Events)
require.True(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res))
}

func TestABCI_Race_Commit_Query(t *testing.T) {
suite := NewBaseAppSuite(t, baseapp.SetChainID("test-chain-id"))
app := suite.baseApp

_, err := app.InitChain(&abci.InitChainRequest{
ChainId: "test-chain-id",
ConsensusParams: &cmtproto.ConsensusParams{Block: &cmtproto.BlockParams{MaxGas: 5000000}},
InitialHeight: 1,
})
require.NoError(t, err)
_, err = app.Commit()
require.NoError(t, err)

counter := atomic.Uint64{}
counter.Store(0)

ctx, cancel := context.WithCancel(context.Background())
queryCreator := func() {
for {
select {
case <-ctx.Done():
return
default:
_, err := app.CreateQueryContextWithCheckHeader(0, false, false)
require.NoError(t, err)

counter.Add(1)
}
}
}

for i := 0; i < 100; i++ {
go queryCreator()
}

for i := 0; i < 1000; i++ {
_, err = app.FinalizeBlock(&abci.FinalizeBlockRequest{Height: app.LastBlockHeight() + 1})
require.NoError(t, err)

_, err = app.Commit()
require.NoError(t, err)
}

cancel()

require.Equal(t, int64(1001), app.GetContextForCheckTx(nil).BlockHeight())
}
16 changes: 12 additions & 4 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type BaseApp struct {
prepareProposalState *state
processProposalState *state
finalizeBlockState *state
stateMut sync.RWMutex

// An inter-block write-through cache provided to the context during the ABCI
// FinalizeBlock call.
Expand Down Expand Up @@ -494,6 +495,9 @@ func (app *BaseApp) setState(mode execMode, h cmtproto.Header) {
WithHeaderInfo(headerInfo),
}

app.stateMut.Lock()
defer app.stateMut.Unlock()

switch mode {
case execModeCheck:
baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices))
Expand Down Expand Up @@ -633,6 +637,9 @@ func validateBasicTxMsgs(router *MsgServiceRouter, msgs []sdk.Msg) error {
}

func (app *BaseApp) getState(mode execMode) *state {
app.stateMut.RLock()
defer app.stateMut.RUnlock()

switch mode {
case execModeFinalize:
return app.finalizeBlockState
Expand Down Expand Up @@ -706,7 +713,8 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, error) {
var events []abci.Event
if app.preBlocker != nil {
ctx := app.finalizeBlockState.Context().WithEventManager(sdk.NewEventManager())
finalizeState := app.getState(execModeFinalize)
ctx := finalizeState.Context().WithEventManager(sdk.NewEventManager())
if err := app.preBlocker(ctx, req); err != nil {
return nil, err
}
Expand All @@ -716,7 +724,7 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(ctx)
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.SetContext(ctx)
finalizeState.SetContext(ctx)
events = ctx.EventManager().ABCIEvents()

// append PreBlock attributes to all events
Expand All @@ -738,7 +746,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er
)

if app.beginBlocker != nil {
resp, err = app.beginBlocker(app.finalizeBlockState.Context())
resp, err = app.beginBlocker(app.getState(execModeFinalize).Context())
if err != nil {
return resp, err
}
Expand Down Expand Up @@ -801,7 +809,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
var endblock sdk.EndBlock

if app.endBlocker != nil {
eb, err := app.endBlocker(app.finalizeBlockState.Context())
eb, err := app.endBlocker(app.getState(execModeFinalize).Context())
if err != nil {
return endblock, err
}
Expand Down
6 changes: 3 additions & 3 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ func (app *BaseApp) SimDeliver(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo,
// SimWriteState is an entrypoint for simulations only. They are not executed during the normal ABCI finalize
// block step but later. Therefore, an extra call to the root multi-store (app.cms) is required to write the changes.
func (app *BaseApp) SimWriteState() {
app.finalizeBlockState.ms.Write()
app.getState(execModeFinalize).ms.Write()
}

// NewContextLegacy returns a new sdk.Context with the provided header
func (app *BaseApp) NewContextLegacy(isCheckTx bool, header cmtproto.Header) sdk.Context {
if isCheckTx {
return sdk.NewContext(app.checkState.ms, true, app.logger).
return sdk.NewContext(app.getState(execModeCheck).ms, true, app.logger).
WithMinGasPrices(app.minGasPrices).
WithBlockHeader(header)
}

return sdk.NewContext(app.finalizeBlockState.ms, false, app.logger).WithBlockHeader(header)
return sdk.NewContext(app.getState(execModeFinalize).ms, false, app.logger).WithBlockHeader(header)
}

// NewContext returns a new sdk.Context with a empty header
Expand Down
Loading

0 comments on commit d142938

Please sign in to comment.