From d14293892132cdd4a512c4753cd1a75bd61b27d6 Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 29 Nov 2024 12:08:02 +0900 Subject: [PATCH] introduce mutex for state and lastCommitInfo to avoid race condition betwwen Commit and CreateQueryContext --- baseapp/abci.go | 72 +++++++++++++++++++++++----------------- baseapp/abci_test.go | 49 +++++++++++++++++++++++++++ baseapp/baseapp.go | 16 ++++++--- baseapp/test_helpers.go | 6 ++-- go.mod | 3 +- store/rootmulti/store.go | 33 +++++++++++++----- 6 files changed, 132 insertions(+), 47 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index d9921a8e13d5..27e5eeabd82a 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -69,12 +69,13 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon // initialize states with a correct header app.setState(execModeFinalize, initHeader) app.setState(execModeCheck, initHeader) + finalizeState := app.getState(execModeFinalize) // Store the consensus params in the BaseApp's param store. Note, this must be // done after the finalizeBlockState and context have been set as it's persisted // to state. if req.ConsensusParams != nil { - err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams) + err := app.StoreConsensusParams(finalizeState.Context(), *req.ConsensusParams) if err != nil { return nil, err } @@ -86,13 +87,14 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon // handler, the block height is zero by default. However, after Commit is called // the height needs to reflect the true block height. initHeader.Height = req.InitialHeight - app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader). + checkState := app.getState(execModeCheck) + checkState.SetContext(checkState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, Time: req.Time, })) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader). + finalizeState.SetContext(finalizeState.Context().WithBlockHeader(initHeader). WithHeaderInfo(coreheader.Info{ ChainID: req.ChainId, Height: req.InitialHeight, @@ -105,9 +107,9 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon } // add block gas meter for any genesis transactions (allow infinite gas) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter())) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter())) - res, err := app.initChainer(app.finalizeBlockState.Context(), req) + res, err := app.initChainer(finalizeState.Context(), req) if err != nil { return nil, err } @@ -604,7 +606,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.ExtendVoteRequest) ( // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.getState(execModeFinalize).Context().CacheContext() } else { ms := app.cms.CacheMultiStore() ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height) @@ -684,7 +686,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r // finalizeBlockState context, otherwise we don't get the uncommitted data // from InitChain. if req.Height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.getState(execModeFinalize).Context().CacheContext() } else { ms := app.cms.CacheMultiStore() ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height) @@ -742,7 +744,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r // internalFinalizeBlock executes the block, called by the Optimistic // Execution flow or by the FinalizeBlock ABCI method. The context received is -// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context() +// only used to handle early cancellation, for anything related to state app.getState(execModeFinalize).Context() // must be used. func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) { var events []abci.Event @@ -773,12 +775,14 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // finalizeBlockState should be set on InitChain or ProcessProposal. If it is // nil, it means we are replaying this block and we need to set the state here // given that during block replay ProcessProposal is not executed by CometBFT. - if app.finalizeBlockState == nil { + finalizeState := app.getState(execModeFinalize) + if finalizeState == nil { app.setState(execModeFinalize, header) + finalizeState = app.getState(execModeFinalize) } // Context is now updated with Header information. - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context(). + finalizeState.SetContext(finalizeState.Context(). WithBlockHeader(header). WithHeaderHash(req.Hash). WithHeaderInfo(coreheader.Info{ @@ -788,7 +792,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz Hash: req.Hash, AppHash: app.LastCommitID().Hash, }). - WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())). + WithConsensusParams(app.GetConsensusParams(finalizeState.Context())). WithVoteInfos(req.DecidedLastCommit.Votes). WithExecMode(sdk.ExecModeFinalize). WithCometInfo(corecomet.Info{ @@ -799,11 +803,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz })) // GasMeter must be set after we get a context with updated consensus params. - gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + gasMeter := app.getBlockGasMeter(finalizeState.Context()) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter)) - if app.checkState != nil { - app.checkState.SetContext(app.checkState.Context(). + if checkState := app.getState(execModeCheck); checkState != nil { + checkState.SetContext(checkState.Context(). WithBlockGasMeter(gasMeter). WithHeaderHash(req.Hash)) } @@ -831,8 +835,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz events = append(events, beginBlock.Events...) // Reset the gas meter so that the AnteHandlers aren't required to - gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + gasMeter = app.getBlockGasMeter(finalizeState.Context()) + finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter)) // Iterate over all raw transactions in the proposal and attempt to execute // them, gathering the execution results. @@ -861,11 +865,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz txResults = append(txResults, response) } - if app.finalizeBlockState.ms.TracingEnabled() { - app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) + if finalizeState.ms.TracingEnabled() { + finalizeState.ms = finalizeState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) } - endBlock, err := app.endBlock(app.finalizeBlockState.Context()) + endBlock, err := app.endBlock(finalizeState.Context()) if err != nil { return nil, err } @@ -879,7 +883,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz } events = append(events, endBlock.Events...) - cp := app.GetConsensusParams(app.finalizeBlockState.Context()) + cp := app.GetConsensusParams(finalizeState.Context()) return &abci.FinalizeBlockResponse{ Events: events, @@ -903,7 +907,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin defer func() { // call the streaming service hooks with the FinalizeBlock messages for _, streamingListener := range app.streamingManager.ABCIListeners { - if streamErr := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); streamErr != nil { + if streamErr := streamingListener.ListenFinalizeBlock(app.getState(execModeFinalize).Context(), *req, *res); streamErr != nil { app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err) if app.streamingManager.StopNodeOnErr { // if StopNodeOnErr is set, we should return the streamErr in order to stop the node @@ -929,7 +933,10 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin } // if it was aborted, we need to reset the state + app.stateMut.Lock() app.finalizeBlockState = nil + app.stateMut.Unlock() + app.optimisticExec.Reset() } @@ -968,11 +975,12 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error { // against that height and gracefully halt if it matches the latest committed // height. func (app *BaseApp) Commit() (*abci.CommitResponse, error) { - header := app.finalizeBlockState.Context().BlockHeader() + finalizeState := app.getState(execModeFinalize) + header := finalizeState.Context().BlockHeader() retainHeight := app.GetBlockRetentionHeight(header.Height) if app.precommiter != nil { - app.precommiter(app.finalizeBlockState.Context()) + app.precommiter(finalizeState.Context()) } rms, ok := app.cms.(*rootmulti.Store) @@ -988,7 +996,7 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) { abciListeners := app.streamingManager.ABCIListeners if len(abciListeners) > 0 { - ctx := app.finalizeBlockState.Context() + ctx := finalizeState.Context() blockHeight := ctx.BlockHeight() changeSet := app.cms.PopStateCache() @@ -1013,10 +1021,12 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) { // Commit. Use the header from this latest block. app.setState(execModeCheck, header) + app.stateMut.Lock() app.finalizeBlockState = nil + app.stateMut.Unlock() if app.prepareCheckStater != nil { - app.prepareCheckStater(app.checkState.Context()) + app.prepareCheckStater(app.getState(execModeCheck).Context()) } // The SnapshotIfApplicable method will create the snapshot by starting the goroutine @@ -1034,7 +1044,7 @@ func (app *BaseApp) workingHash() []byte { // Write the FinalizeBlock state into branched storage and commit the MultiStore. // The write to the FinalizeBlock state writes all state transitions to the root // MultiStore (app.cms) so when Commit() is called it persists those values. - app.finalizeBlockState.ms.Write() + app.getState(execModeFinalize).ms.Write() // Get the hash of all writes in order to return the apphash to the comet in finalizeBlock. commitHash := app.cms.WorkingHash() @@ -1181,7 +1191,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.QueryResponse { // access any state changes made in InitChain. func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context { if height == app.initialHeight { - ctx, _ = app.finalizeBlockState.Context().CacheContext() + ctx, _ = app.getState(execModeFinalize).Context().CacheContext() // clear all context data set during InitChain to avoid inconsistent behavior ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{}) @@ -1282,8 +1292,8 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check var header *cmtproto.Header isLatest := height == 0 for _, state := range []*state{ - app.checkState, - app.finalizeBlockState, + app.getState(execModeCheck), + app.getState(execModeFinalize), } { if state != nil { // branch the commit multi-store for safety @@ -1396,7 +1406,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 { // evidence parameters instead of computing an estimated number of blocks based // on the unbonding period and block commitment time as the two should be // equivalent. - cp := app.GetConsensusParams(app.finalizeBlockState.Context()) + cp := app.GetConsensusParams(app.getState(execModeFinalize).Context()) if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 { retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks } diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index e404f7c47932..4c36c71cb2f2 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -11,6 +11,7 @@ import ( "math/rand" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -2779,3 +2780,51 @@ func TestABCI_Proposal_FailReCheckTx(t *testing.T) { require.NotEmpty(t, res.TxResults[0].Events) require.True(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) } + +func TestABCI_Race_Commit_Query(t *testing.T) { + suite := NewBaseAppSuite(t, baseapp.SetChainID("test-chain-id")) + app := suite.baseApp + + _, err := app.InitChain(&abci.InitChainRequest{ + ChainId: "test-chain-id", + ConsensusParams: &cmtproto.ConsensusParams{Block: &cmtproto.BlockParams{MaxGas: 5000000}}, + InitialHeight: 1, + }) + require.NoError(t, err) + _, err = app.Commit() + require.NoError(t, err) + + counter := atomic.Uint64{} + counter.Store(0) + + ctx, cancel := context.WithCancel(context.Background()) + queryCreator := func() { + for { + select { + case <-ctx.Done(): + return + default: + _, err := app.CreateQueryContextWithCheckHeader(0, false, false) + require.NoError(t, err) + + counter.Add(1) + } + } + } + + for i := 0; i < 100; i++ { + go queryCreator() + } + + for i := 0; i < 1000; i++ { + _, err = app.FinalizeBlock(&abci.FinalizeBlockRequest{Height: app.LastBlockHeight() + 1}) + require.NoError(t, err) + + _, err = app.Commit() + require.NoError(t, err) + } + + cancel() + + require.Equal(t, int64(1001), app.GetContextForCheckTx(nil).BlockHeight()) +} diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 98adf6e1a94e..730da6881dd8 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -124,6 +124,7 @@ type BaseApp struct { prepareProposalState *state processProposalState *state finalizeBlockState *state + stateMut sync.RWMutex // An inter-block write-through cache provided to the context during the ABCI // FinalizeBlock call. @@ -494,6 +495,9 @@ func (app *BaseApp) setState(mode execMode, h cmtproto.Header) { WithHeaderInfo(headerInfo), } + app.stateMut.Lock() + defer app.stateMut.Unlock() + switch mode { case execModeCheck: baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)) @@ -633,6 +637,9 @@ func validateBasicTxMsgs(router *MsgServiceRouter, msgs []sdk.Msg) error { } func (app *BaseApp) getState(mode execMode) *state { + app.stateMut.RLock() + defer app.stateMut.RUnlock() + switch mode { case execModeFinalize: return app.finalizeBlockState @@ -706,7 +713,8 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, error) { var events []abci.Event if app.preBlocker != nil { - ctx := app.finalizeBlockState.Context().WithEventManager(sdk.NewEventManager()) + finalizeState := app.getState(execModeFinalize) + ctx := finalizeState.Context().WithEventManager(sdk.NewEventManager()) if err := app.preBlocker(ctx, req); err != nil { return nil, err } @@ -716,7 +724,7 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro // GasMeter must be set after we get a context with updated consensus params. gasMeter := app.getBlockGasMeter(ctx) ctx = ctx.WithBlockGasMeter(gasMeter) - app.finalizeBlockState.SetContext(ctx) + finalizeState.SetContext(ctx) events = ctx.EventManager().ABCIEvents() // append PreBlock attributes to all events @@ -738,7 +746,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er ) if app.beginBlocker != nil { - resp, err = app.beginBlocker(app.finalizeBlockState.Context()) + resp, err = app.beginBlocker(app.getState(execModeFinalize).Context()) if err != nil { return resp, err } @@ -801,7 +809,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { var endblock sdk.EndBlock if app.endBlocker != nil { - eb, err := app.endBlocker(app.finalizeBlockState.Context()) + eb, err := app.endBlocker(app.getState(execModeFinalize).Context()) if err != nil { return endblock, err } diff --git a/baseapp/test_helpers.go b/baseapp/test_helpers.go index cffc2589f089..93c905f71809 100644 --- a/baseapp/test_helpers.go +++ b/baseapp/test_helpers.go @@ -44,18 +44,18 @@ func (app *BaseApp) SimDeliver(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo, // SimWriteState is an entrypoint for simulations only. They are not executed during the normal ABCI finalize // block step but later. Therefore, an extra call to the root multi-store (app.cms) is required to write the changes. func (app *BaseApp) SimWriteState() { - app.finalizeBlockState.ms.Write() + app.getState(execModeFinalize).ms.Write() } // NewContextLegacy returns a new sdk.Context with the provided header func (app *BaseApp) NewContextLegacy(isCheckTx bool, header cmtproto.Header) sdk.Context { if isCheckTx { - return sdk.NewContext(app.checkState.ms, true, app.logger). + return sdk.NewContext(app.getState(execModeCheck).ms, true, app.logger). WithMinGasPrices(app.minGasPrices). WithBlockHeader(header) } - return sdk.NewContext(app.finalizeBlockState.ms, false, app.logger).WithBlockHeader(header) + return sdk.NewContext(app.getState(execModeFinalize).ms, false, app.logger).WithBlockHeader(header) } // NewContext returns a new sdk.Context with a empty header diff --git a/go.mod b/go.mod index 173cda441476..b71cb63bf4cc 100644 --- a/go.mod +++ b/go.mod @@ -186,7 +186,6 @@ require ( // TODO remove after all modules have their own go.mods replace ( cosmossdk.io/api => ./api - cosmossdk.io/store => ./store cosmossdk.io/x/bank => ./x/bank cosmossdk.io/x/staking => ./x/staking cosmossdk.io/x/tx => ./x/tx @@ -216,3 +215,5 @@ retract ( // do not use v0.43.0 ) + +replace cosmossdk.io/store => ./store diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index e821e930de7f..a93adafa42e9 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -60,6 +60,7 @@ type Store struct { db corestore.KVStoreWithBatch logger iavltree.Logger lastCommitInfo *types.CommitInfo + lastCommitInfoMut sync.RWMutex pruningManager *pruning.Manager iavlCacheSize int iavlDisableFastNode bool @@ -288,7 +289,9 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error { } } + rs.lastCommitInfoMut.Lock() rs.lastCommitInfo = cInfo + rs.lastCommitInfoMut.Unlock() rs.stores = newStores // load any snapshot heights we missed from disk to be pruned on the next run @@ -434,16 +437,24 @@ func (rs *Store) PopStateCache() []*types.StoreKVPair { // LatestVersion returns the latest version in the store func (rs *Store) LatestVersion() int64 { - if rs.lastCommitInfo == nil { + lastCommitInfo := rs.LastCommitInfo() + if lastCommitInfo == nil { return GetLatestVersion(rs.db) } - return rs.lastCommitInfo.Version + return lastCommitInfo.Version +} + +func (rs *Store) LastCommitInfo() *types.CommitInfo { + rs.lastCommitInfoMut.RLock() + defer rs.lastCommitInfoMut.RUnlock() + return rs.lastCommitInfo } // LastCommitID implements Committer/CommitStore. func (rs *Store) LastCommitID() types.CommitID { - if rs.lastCommitInfo == nil { + lastCommitInfo := rs.LastCommitInfo() + if lastCommitInfo == nil { emptyHash := sha256.Sum256([]byte{}) appHash := emptyHash[:] return types.CommitID{ @@ -451,16 +462,16 @@ func (rs *Store) LastCommitID() types.CommitID { Hash: appHash, // set empty apphash to sha256([]byte{}) if info is nil } } - if len(rs.lastCommitInfo.CommitID().Hash) == 0 { + if len(lastCommitInfo.CommitID().Hash) == 0 { emptyHash := sha256.Sum256([]byte{}) appHash := emptyHash[:] return types.CommitID{ - Version: rs.lastCommitInfo.Version, + Version: lastCommitInfo.Version, Hash: appHash, // set empty apphash to sha256([]byte{}) if hash is nil } } - return rs.lastCommitInfo.CommitID() + return lastCommitInfo.CommitID() } // PausePruning temporarily pauses the pruning of all individual stores which implement @@ -499,10 +510,15 @@ func (rs *Store) Commit() types.CommitID { rs.PausePruning(true) // unset the committing flag on all stores to continue the pruning defer rs.PausePruning(false) + rs.lastCommitInfoMut.Lock() rs.lastCommitInfo = commitStores(version, rs.stores, rs.removalMap) + rs.lastCommitInfoMut.Unlock() }() + rs.lastCommitInfoMut.Lock() rs.lastCommitInfo.Timestamp = rs.commitHeader.Time + rs.lastCommitInfoMut.Unlock() + defer rs.flushMetadata(rs.db, version, rs.lastCommitInfo) // remove remnants of removed stores @@ -781,8 +797,9 @@ func (rs *Store) Query(req *types.RequestQuery) (*types.ResponseQuery, error) { // Otherwise, we query for the commit info from disk. var commitInfo *types.CommitInfo - if res.Height == rs.lastCommitInfo.Version { - commitInfo = rs.lastCommitInfo + lastCommitInfo := rs.LastCommitInfo() + if res.Height == lastCommitInfo.Version { + commitInfo = lastCommitInfo } else { commitInfo, err = rs.GetCommitInfo(res.Height) if err != nil {