Skip to content

Commit

Permalink
Problem: block-stm is not integrated in sdk
Browse files Browse the repository at this point in the history
add patcher

add default

pre estimate

btree

fix patch
  • Loading branch information
mmsqe committed Dec 12, 2024
1 parent 274b4da commit d01a12a
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 15 deletions.
2 changes: 1 addition & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecT
if app.txExecutor != nil {
return app.txExecutor(ctx, txs, app.finalizeBlockState.ms, func(i int, memTx sdk.Tx, ms storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], memTx, i, ms, incarnationCache)
})
}, app.txResponsePatcher)
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
Expand Down
3 changes: 3 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ type BaseApp struct {

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor

// Optional alternative tx response patcher, used for block-stm parallel transaction execution.
txResponsePatcher TxResponsePatcher
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
10 changes: 10 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

// SetTxResponsePatcher sets a custom tx response patcher for the BaseApp, usually for parallel execution.
func SetTxResponsePatcher(patcher TxResponsePatcher) func(*BaseApp) {
return func(app *BaseApp) { app.txResponsePatcher = patcher }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -393,6 +398,11 @@ func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}

// SetTxResponsePatcher sets a custom tx response patcher for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxResponsePatcher(patcher TxResponsePatcher) {
app.txResponsePatcher = patcher
}

// SetMsgServiceRouter sets the MsgServiceRouter of a BaseApp.
func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
app.msgServiceRouter = msgServiceRouter
Expand Down
145 changes: 145 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,161 @@ package baseapp

import (
"context"
"io"
"sync/atomic"

abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"cosmossdk.io/store/cachemulti"
"cosmossdk.io/store/types"
blockstm "github.com/crypto-org-chain/go-block-stm"
)

type TxResponsePatcher interface {
Patch(input []*abci.ExecTxResult) []*abci.ExecTxResult
}

type stmMultiStoreWrapper struct {
types.MultiStore
}

var _ types.MultiStore = stmMultiStoreWrapper{}

type msWrapper struct {
blockstm.MultiStore
}

var _ types.MultiStore = msWrapper{}

func (ms msWrapper) getCacheWrapper(key types.StoreKey) types.CacheWrapper {
return ms.GetStore(key)
}

func (ms msWrapper) CacheMultiStore() types.CacheMultiStore {
return cachemulti.NewFromParent(ms.getCacheWrapper, nil, nil)
}

// Implements CacheWrapper.
func (ms msWrapper) CacheWrap() types.CacheWrap {
return ms.CacheMultiStore().(types.CacheWrap)
}

// GetStoreType returns the type of the store.
func (ms msWrapper) GetStoreType() types.StoreType {
return types.StoreTypeMulti
}

// Implements interface MultiStore
func (ms msWrapper) SetTracer(io.Writer) types.MultiStore {
return nil
}

// Implements interface MultiStore
func (ms msWrapper) SetTracingContext(types.TraceContext) types.MultiStore {
return nil
}

// Implements interface MultiStore
func (ms msWrapper) TracingEnabled() bool {
return false
}

type TxExecutor func(
ctx context.Context,
block [][]byte,
cms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
patcher TxResponsePatcher,
) ([]*abci.ExecTxResult, error)

func DefaultTxExecutor(_ context.Context,
txs [][]byte,
ms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
patcher TxResponsePatcher,
) ([]*abci.ExecTxResult, error) {
blockSize := len(txs)
results := make([]*abci.ExecTxResult, blockSize)
for i := 0; i < blockSize; i++ {
results[i] = deliverTxWithMultiStore(i, nil, ms, nil)
}
if patcher != nil {
return patcher.Patch(results), nil
}
return results, nil
}

func STMTxExecutor(
stores []types.StoreKey,
workers int,
txDecoder sdk.TxDecoder,
preEstimates func(txs [][]byte, workers int, txDecoder sdk.TxDecoder, ms types.MultiStore) ([]sdk.Tx, []blockstm.MultiLocations),
) TxExecutor {
index := make(map[types.StoreKey]int, len(stores))
for i, k := range stores {
index[k] = i
}
return func(
ctx context.Context,
txs [][]byte,
ms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
patcher TxResponsePatcher,
) ([]*abci.ExecTxResult, error) {
blockSize := len(txs)
if blockSize == 0 {
return nil, nil
}
results := make([]*abci.ExecTxResult, blockSize)
incarnationCache := make([]atomic.Pointer[map[string]any], blockSize)
for i := 0; i < blockSize; i++ {
m := make(map[string]any)
incarnationCache[i].Store(&m)
}

var (
estimates []blockstm.MultiLocations
memTxs []sdk.Tx
)
if preEstimates != nil {
// pre-estimation
memTxs, estimates = preEstimates(txs, workers, txDecoder, ms)
}

if err := blockstm.ExecuteBlockWithEstimates(
ctx,
blockSize,
index,
stmMultiStoreWrapper{ms},
workers,
estimates,
func(txn blockstm.TxnIndex, ms blockstm.MultiStore) {
var cache map[string]any

// only one of the concurrent incarnations gets the cache if there are any, otherwise execute without
// cache, concurrent incarnations should be rare.
v := incarnationCache[txn].Swap(nil)
if v != nil {
cache = *v
}

var memTx sdk.Tx
if memTxs != nil {
memTx = memTxs[txn]
}
results[txn] = deliverTxWithMultiStore(int(txn), memTx, msWrapper{ms}, cache)

if v != nil {
incarnationCache[txn].Store(v)
}
},
); err != nil {
return nil, err
}
if patcher != nil {
return patcher.Patch(results), nil
}
return results, nil
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/cosmos/gogogateway v1.2.0
github.com/cosmos/gogoproto v1.7.0
github.com/cosmos/ledger-cosmos-go v0.13.3
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.4
Expand Down Expand Up @@ -184,6 +185,7 @@ replace (
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.1
// replace broken goleveldb
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tidwall/btree => github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c
)

retract (
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c h1:MOgfS4+FBB8cMkDE2j2VBVsbY+HCkPIu0YsJ/9bbGeQ=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 h1:OvD5Rm0B6LHUJk6z858UgwdP72jU2DuUdXeclRyKpDI=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716/go.mod h1:iwQTX9xMX8NV9k3o2BiWXA0SswpsZrDk5q3gA7nWYiE=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -709,8 +713,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2lyGa2E=
github.com/tendermint/go-amino v0.16.0/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
Expand Down
25 changes: 24 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"

"github.com/cometbft/cometbft/libs/strings"
"github.com/spf13/viper"

pruningtypes "cosmossdk.io/store/pruning/types"
Expand All @@ -29,6 +30,9 @@ const (
// DefaultGRPCMaxSendMsgSize defines the default gRPC max message size in
// bytes the server can send.
DefaultGRPCMaxSendMsgSize = math.MaxInt32

BlockExecutorSequential = "sequential"
BlockExecutorBlockSTM = "block-stm"
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -182,6 +186,16 @@ type (
}
)

// BlockStmConfig defines the block stm configuration.
type BlockSTMConfig struct {
// Executor sets the executor type, "block-stm" for parallel execution, "sequential" for sequential execution.
Executor string `mapstructure:"executor"`
// Workers is the number of workers for block-stm execution, 0 means using all available CPUs.
Workers uint64 `mapstructure:"workers"`
// PreEstimate is the flag to enable pre-estimation for block-stm execution.
PreEstimate bool `mapstructure:"pre-estimate"`
}

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`
Expand All @@ -194,6 +208,7 @@ type Config struct {
StateSync StateSyncConfig `mapstructure:"state-sync"`
Streaming StreamingConfig `mapstructure:"streaming"`
Mempool MempoolConfig `mapstructure:"mempool"`
BlockSTM BlockSTMConfig `mapstructure:"block-stm"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -265,6 +280,11 @@ func DefaultConfig() *Config {
Mempool: MempoolConfig{
MaxTxs: -1,
},
BlockSTM: BlockSTMConfig{
Executor: BlockExecutorSequential,
Workers: 0,
PreEstimate: false,
},
}
}

Expand All @@ -287,6 +307,9 @@ func (c Config) ValidateBasic() error {
"cannot enable state sync snapshots with '%s' pruning setting", pruningtypes.PruningOptionEverything,
)
}

blockExecutors := []string{BlockExecutorSequential, BlockExecutorBlockSTM}
if c.BlockSTM.Executor != "" && !strings.StringInSlice(c.BlockSTM.Executor, blockExecutors) {
return fmt.Errorf("invalid block executor type %s, available types: %v", c.BlockSTM.Executor, blockExecutors)
}
return nil
}
15 changes: 15 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,21 @@ stop-node-on-err = {{ .Streaming.ABCI.StopNodeOnErr }}
# Note, this configuration only applies to SDK built-in app-side mempool
# implementations.
max-txs = {{ .Mempool.MaxTxs }}
###############################################################################
### Block STM ###
###############################################################################
[block-stm]
# Executor sets the executor type, "block-stm" for parallel execution, "sequential" for sequential execution.
executor = "{{ .BlockSTM.Executor }}"
# STMWorkers is the number of workers for block-stm execution, 0 means using all available CPUs.
workers = {{ .BlockSTM.Workers }}
# PreEstimate is the flag to enable pre-estimation for block-stm execution.
pre-estimate = {{ .BlockSTM.PreEstimate }}
`

var configTemplate *template.Template
Expand Down
8 changes: 8 additions & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ const (
// mempool flags
FlagMempoolMaxTxs = "mempool.max-txs"

// block-stm flags
FlagBlockSTMExecutor = "block-stm.executor"
FlagBlockSTMWorkers = "block-stm.workers"
FlagBlockSTMPreEstimate = "block-stm.pre-estimate"

// testnet keys
KeyIsTestnet = "is-testnet"
KeyNewChainID = "new-chain-ID"
Expand Down Expand Up @@ -997,6 +1002,9 @@ func addStartNodeFlags(cmd *cobra.Command, opts StartCmdOptions) {
cmd.Flags().Uint32(FlagStateSyncSnapshotKeepRecent, 2, "State sync snapshot to keep")
cmd.Flags().Bool(FlagDisableIAVLFastNode, false, "Disable fast node for IAVL tree")
cmd.Flags().Int(FlagMempoolMaxTxs, mempool.DefaultMaxTx, "Sets MaxTx value for the app-side mempool")
cmd.Flags().String(FlagBlockSTMExecutor, serverconfig.BlockExecutorSequential, "Sets the executor type (block-stm|sequential)")
cmd.Flags().Int(FlagBlockSTMWorkers, 0, "Sets the number of workers for block-stm execution, 0 means using all available CPUs")
cmd.Flags().Bool(FlagBlockSTMPreEstimate, false, "Sets the flag to enable pre-estimation for block-stm execution")
cmd.Flags().Duration(FlagShutdownGrace, 0*time.Second, "On Shutdown, duration to wait for resource clean up")

// support old flags name for backwards compatibility
Expand Down
2 changes: 2 additions & 0 deletions simapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/cosmos/ledger-cosmos-go v0.13.3 // indirect
github.com/creachadair/atomicfile v0.3.1 // indirect
github.com/creachadair/tomledit v0.0.24 // indirect
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
Expand Down Expand Up @@ -219,4 +220,5 @@ replace (
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.1
// replace broken goleveldb
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tidwall/btree => github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c
)
8 changes: 6 additions & 2 deletions simapp/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ github.com/creachadair/tomledit v0.0.24 h1:5Xjr25R2esu1rKCbQEmjZYlrhFkDspoAbAKb6
github.com/creachadair/tomledit v0.0.24/go.mod h1:9qHbShRWQzSCcn617cMzg4eab1vbLCOjOshAWSzWr8U=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c h1:MOgfS4+FBB8cMkDE2j2VBVsbY+HCkPIu0YsJ/9bbGeQ=
github.com/crypto-org-chain/btree v0.0.0-20240406140148-2687063b042c/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716 h1:OvD5Rm0B6LHUJk6z858UgwdP72jU2DuUdXeclRyKpDI=
github.com/crypto-org-chain/go-block-stm v0.0.0-20240919080136-6c49aef68716/go.mod h1:iwQTX9xMX8NV9k3o2BiWXA0SswpsZrDk5q3gA7nWYiE=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -1009,8 +1013,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2lyGa2E=
github.com/tendermint/go-amino v0.16.0/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
Expand Down
Loading

0 comments on commit d01a12a

Please sign in to comment.