From 9d07cfaa675cd3d94f7d5f6a608788ee13da2d70 Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Sun, 10 Dec 2023 23:05:52 +0800 Subject: [PATCH 1/8] limit tx count of per peer --- app/config/config.go | 25 +++++++++++++++++-- dev/testnet/testnet.sh | 3 ++- .../cmd/tendermint/commands/run_node.go | 5 ++++ libs/tendermint/config/config.go | 10 +++++--- .../config/dynamic_config_okchain.go | 5 ++++ libs/tendermint/mempool/clist_mempool.go | 25 +++++++++++++++++++ 6 files changed, 67 insertions(+), 6 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 3bdeceb9cb..be3d0abd3c 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -124,6 +124,19 @@ type OecConfig struct { commitGapOffset int64 maxSubscriptionClients int + + maxTxLimitPerPeer uint64 +} + +func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) { + if maxTxLimitPerPeer < 0 { + return + } + c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer) +} + +func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 { + return c.maxTxLimitPerPeer } const ( @@ -161,8 +174,9 @@ const ( FlagEnableHasBlockPartMsg = "enable-blockpart-ack" FlagDebugGcInterval = "debug.gc-interval" FlagCommitGapOffset = "commit-gap-offset" - - FlagMaxSubscriptionClients = "max-subscription-clients" + FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor" + FlagMaxSubscriptionClients = "max-subscription-clients" + FlagMaxTxLimitPerPeer = "mempool.max_tx_limit_per_peer" ) var ( @@ -278,6 +292,7 @@ func (c *OecConfig) loadFromConfig() { c.SetMempoolFlush(viper.GetBool(FlagMempoolFlush)) c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost)) c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock)) + c.SetMaxTxLimitPerPeer(int64(viper.GetUint64(FlagMaxTxLimitPerPeer))) c.SetEnableDeleteMinGPTx(viper.GetBool(FlagMempoolEnableDeleteMinGPTx)) c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock)) c.SetEnablePGU(viper.GetBool(FlagEnablePGU)) @@ -463,6 +478,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) { return } c.SetMaxTxNumPerBlock(r) + case FlagMaxTxLimitPerPeer: + r, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return + } + c.SetMaxTxLimitPerPeer(r) case FlagMempoolEnableDeleteMinGPTx: r, err := strconv.ParseBool(v) if err != nil { diff --git a/dev/testnet/testnet.sh b/dev/testnet/testnet.sh index d049d9494d..2c28b69c43 100755 --- a/dev/testnet/testnet.sh +++ b/dev/testnet/testnet.sh @@ -150,6 +150,7 @@ run() { --enable-wtx=${WRAPPEDTX} \ --mempool.node_key_whitelist ${WHITE_LIST} \ --p2p.pex=false \ + --mempool.max_tx_limit_per_peer=1 \ --p2p.addr_book_strict=false \ $p2p_seed_opt $p2p_seed_arg \ --p2p.laddr tcp://${IP}:${p2pport} \ @@ -158,7 +159,7 @@ run() { --chain-id ${CHAIN_ID} \ --upload-delta=false \ --enable-gid \ - --consensus.timeout_commit 3800ms \ + --consensus.timeout_commit 10000ms \ --enable-blockpart-ack=false \ --append-pid=true \ ${LOG_SERVER} \ diff --git a/libs/tendermint/cmd/tendermint/commands/run_node.go b/libs/tendermint/cmd/tendermint/commands/run_node.go index 0a41d913cf..999f150c72 100644 --- a/libs/tendermint/cmd/tendermint/commands/run_node.go +++ b/libs/tendermint/cmd/tendermint/commands/run_node.go @@ -176,6 +176,11 @@ func AddNodeFlags(cmd *cobra.Command) { config.Mempool.PendingRemoveEvent, "Push event when remove a pending tx", ) + cmd.Flags().Uint64( + "mempool.max_tx_limit_per_peer", + config.Mempool.MaxTxLimitPerPeer, + "Max tx limit per peer", + ) cmd.Flags().String( "mempool.node_key_whitelist", diff --git a/libs/tendermint/config/config.go b/libs/tendermint/config/config.go index ae3f587780..b5559335b3 100644 --- a/libs/tendermint/config/config.go +++ b/libs/tendermint/config/config.go @@ -689,6 +689,7 @@ type MempoolConfig struct { PendingPoolMaxTxPerAddress int `mapstructure:"pending_pool_max_tx_per_address"` NodeKeyWhitelist []string `mapstructure:"node_key_whitelist"` PendingRemoveEvent bool `mapstructure:"pending_remove_event"` + MaxTxLimitPerPeer uint64 `mapstructure:"max_tx_limit_per_peer"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool @@ -715,6 +716,7 @@ func DefaultMempoolConfig() *MempoolConfig { PendingPoolMaxTxPerAddress: 100, NodeKeyWhitelist: []string{}, PendingRemoveEvent: false, + MaxTxLimitPerPeer: 100, } } @@ -953,12 +955,14 @@ func (cfg *ConsensusConfig) ValidateBasic() error { return nil } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // TxIndexConfig // Remember that Event has the following structure: // type: [ -// key: value, -// ... +// +// key: value, +// ... +// // ] // // CompositeKeys are constructed by `type.key` diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index fd4a1b836e..58687743bb 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -34,6 +34,7 @@ type IDynamicConfig interface { GetDynamicGpMaxTxNum() int64 GetDynamicGpMaxGasUsed() int64 GetMaxSubscriptionClients() int + GetMaxTxLimitPerPeer() uint64 } var DynamicConfig IDynamicConfig = MockDynamicConfig{} @@ -198,3 +199,7 @@ func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) { } d.maxSubscriptionClients = value } + +func (c MockDynamicConfig) GetMaxTxLimitPerPeer() uint64 { + return DefaultMempoolConfig().MaxTxLimitPerPeer +} diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index c7e6cff30e..f5359771f1 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -108,6 +108,9 @@ type CListMempool struct { rmPendingTxChan chan types.EventDataRmPendingTx gpo *Oracle + + peersTxCountMtx sync.RWMutex + peersTxCount map[string]uint64 } func (mem *CListMempool) filterCMTx(tx abci.TxEssentials) bool { @@ -166,6 +169,7 @@ func NewCListMempool( simQueue: make(chan *mempoolTx, 100000), gasCache: gasCache, gpo: gpo, + peersTxCount: make(map[string]uint64, 0), } if config.PendingRemoveEvent { @@ -305,6 +309,22 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { // // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { + mem.peersTxCountMtx.Lock() + if len(txInfo.SenderP2PID) != 0 { + peerTxCount, ok := mem.peersTxCount[string(txInfo.SenderP2PID)] + if !ok { + peerTxCount = 0 + } + if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { + mem.peersTxCountMtx.Unlock() + mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())) + return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()) + } + peerTxCount++ + mem.peersTxCount[string(txInfo.SenderP2PID)] = peerTxCount + } + mem.peersTxCountMtx.Unlock() + timeStart := int64(0) if cfg.DynamicConfig.GetMempoolCheckTxCost() { timeStart = time.Now().UnixMicro() @@ -1010,6 +1030,11 @@ func (mem *CListMempool) Update( preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { + mem.peersTxCountMtx.Lock() + for key := range mem.peersTxCount { + delete(mem.peersTxCount, key) + } + mem.peersTxCountMtx.Unlock() // no need to update when mempool is unavailable if mem.config.Sealed { return mem.updateSealed(height, txs, deliverTxResponses) From 5b3528e1711376f94d701aaf6d4ad74b6713a1c1 Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Mon, 11 Dec 2023 10:43:29 +0800 Subject: [PATCH 2/8] remove useles code --- app/config/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/app/config/config.go b/app/config/config.go index be3d0abd3c..6130ad286b 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -174,7 +174,6 @@ const ( FlagEnableHasBlockPartMsg = "enable-blockpart-ack" FlagDebugGcInterval = "debug.gc-interval" FlagCommitGapOffset = "commit-gap-offset" - FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor" FlagMaxSubscriptionClients = "max-subscription-clients" FlagMaxTxLimitPerPeer = "mempool.max_tx_limit_per_peer" ) From 46391dc1e550feca1a90e7966b9933988c9e5ae8 Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Mon, 11 Dec 2023 10:59:55 +0800 Subject: [PATCH 3/8] change code location --- app/config/config.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index 6130ad286b..9bee4bc596 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -128,17 +128,6 @@ type OecConfig struct { maxTxLimitPerPeer uint64 } -func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) { - if maxTxLimitPerPeer < 0 { - return - } - c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer) -} - -func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 { - return c.maxTxLimitPerPeer -} - const ( FlagEnableDynamic = "config.enable-dynamic" @@ -1119,3 +1108,14 @@ func (c *OecConfig) SetMaxSubscriptionClients(v int) { func (c *OecConfig) GetMaxSubscriptionClients() int { return c.maxSubscriptionClients } + +func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) { + if maxTxLimitPerPeer < 0 { + return + } + c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer) +} + +func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 { + return c.maxTxLimitPerPeer +} From 3bc08c433123c2fa1dae717e2ba434895ad60fd0 Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Mon, 11 Dec 2023 11:07:10 +0800 Subject: [PATCH 4/8] can not limit tx count when limit == 0 --- libs/tendermint/mempool/clist_mempool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index f5359771f1..aad9104444 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -315,7 +315,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx if !ok { peerTxCount = 0 } - if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { + if cfg.DynamicConfig.GetMaxTxLimitPerPeer() != 0 && peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { mem.peersTxCountMtx.Unlock() mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())) return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()) From 48aff454955006fb49ca1f4344a32cdb29e4b1af Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Mon, 11 Dec 2023 11:08:40 +0800 Subject: [PATCH 5/8] can not limit tx count when limit == 0 --- libs/tendermint/cmd/tendermint/commands/run_node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/tendermint/cmd/tendermint/commands/run_node.go b/libs/tendermint/cmd/tendermint/commands/run_node.go index 999f150c72..7ac2f28961 100644 --- a/libs/tendermint/cmd/tendermint/commands/run_node.go +++ b/libs/tendermint/cmd/tendermint/commands/run_node.go @@ -179,7 +179,7 @@ func AddNodeFlags(cmd *cobra.Command) { cmd.Flags().Uint64( "mempool.max_tx_limit_per_peer", config.Mempool.MaxTxLimitPerPeer, - "Max tx limit per peer", + "Max tx limit per peer. If set 0 ,this flag disable", ) cmd.Flags().String( From 4e0788ddd39b0ba6dd42afacd1e06897ca640ff1 Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Mon, 11 Dec 2023 11:15:11 +0800 Subject: [PATCH 6/8] optimize code --- libs/tendermint/mempool/clist_mempool.go | 41 +++++++++++++++--------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index aad9104444..7c372d7314 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -300,31 +300,44 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { return mem.txs.TxsWaitChan() } -// It blocks if we're waiting on Update() or Reap(). -// cb: A callback from the CheckTx command. -// -// It gets called from another goroutine. -// -// CONTRACT: Either cb will get called, or err returned. -// -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { +func (mem *CListMempool) validatePeerCount(txInfo TxInfo) error { mem.peersTxCountMtx.Lock() + defer mem.peersTxCountMtx.Unlock() if len(txInfo.SenderP2PID) != 0 { peerTxCount, ok := mem.peersTxCount[string(txInfo.SenderP2PID)] if !ok { peerTxCount = 0 } if cfg.DynamicConfig.GetMaxTxLimitPerPeer() != 0 && peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { - mem.peersTxCountMtx.Unlock() mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())) return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()) } peerTxCount++ mem.peersTxCount[string(txInfo.SenderP2PID)] = peerTxCount } - mem.peersTxCountMtx.Unlock() + return nil +} + +func (mem *CListMempool) resetPeerCount() { + mem.peersTxCountMtx.Lock() + defer mem.peersTxCountMtx.Unlock() + for key := range mem.peersTxCount { + delete(mem.peersTxCount, key) + } +} +// It blocks if we're waiting on Update() or Reap(). +// cb: A callback from the CheckTx command. +// +// It gets called from another goroutine. +// +// CONTRACT: Either cb will get called, or err returned. +// +// Safe for concurrent use by multiple goroutines. +func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { + if err := mem.validatePeerCount(txInfo); err != nil { + return err + } timeStart := int64(0) if cfg.DynamicConfig.GetMempoolCheckTxCost() { timeStart = time.Now().UnixMicro() @@ -1030,11 +1043,7 @@ func (mem *CListMempool) Update( preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { - mem.peersTxCountMtx.Lock() - for key := range mem.peersTxCount { - delete(mem.peersTxCount, key) - } - mem.peersTxCountMtx.Unlock() + mem.resetPeerCount() // no need to update when mempool is unavailable if mem.config.Sealed { return mem.updateSealed(height, txs, deliverTxResponses) From 0faff8c77fd83a3ed2fc8c2fd85c5c878b161f4c Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Mon, 11 Dec 2023 11:17:17 +0800 Subject: [PATCH 7/8] optimize code --- libs/tendermint/mempool/clist_mempool.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index 7c372d7314..dde2eea2ed 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -301,6 +301,9 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { } func (mem *CListMempool) validatePeerCount(txInfo TxInfo) error { + if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 { + return nil + } mem.peersTxCountMtx.Lock() defer mem.peersTxCountMtx.Unlock() if len(txInfo.SenderP2PID) != 0 { @@ -308,7 +311,7 @@ func (mem *CListMempool) validatePeerCount(txInfo TxInfo) error { if !ok { peerTxCount = 0 } - if cfg.DynamicConfig.GetMaxTxLimitPerPeer() != 0 && peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { + if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())) return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()) } From 735c0966d0311499dad659561c0af8f8b2962418 Mon Sep 17 00:00:00 2001 From: BananaLF <864685021@qq.com> Date: Mon, 11 Dec 2023 11:21:55 +0800 Subject: [PATCH 8/8] optimize code --- libs/tendermint/mempool/clist_mempool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index dde2eea2ed..bf63262b20 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -322,6 +322,9 @@ func (mem *CListMempool) validatePeerCount(txInfo TxInfo) error { } func (mem *CListMempool) resetPeerCount() { + if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 { + return + } mem.peersTxCountMtx.Lock() defer mem.peersTxCountMtx.Unlock() for key := range mem.peersTxCount {