Skip to content

Commit

Permalink
Recover validation txs on after long session from tx keeper (#831)
Browse files Browse the repository at this point in the history
  • Loading branch information
sidenaio authored Oct 20, 2021
1 parent a5fe5ca commit 7dc7fc1
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 25 deletions.
5 changes: 3 additions & 2 deletions blockchain/blockchain_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockchain
import (
"crypto/ecdsa"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/blockchain/validation"
"github.com/idena-network/idena-go/common"
"github.com/idena-network/idena-go/common/eventbus"
"github.com/idena-network/idena-go/config"
Expand Down Expand Up @@ -137,7 +138,7 @@ type TestBlockchain struct {
}

func (chain *TestBlockchain) AddTx(tx *types.Transaction) error {
return chain.txpool.AddExternalTxs(tx)
return chain.txpool.AddExternalTxs(validation.InboundTx, tx)
}

func (chain *TestBlockchain) Copy() (*TestBlockchain, *appstate.AppState) {
Expand Down Expand Up @@ -202,7 +203,7 @@ func (chain *TestBlockchain) GenerateBlocks(count int, txsInBlock int) *TestBloc
if err != nil {
panic(err)
}
if err = chain.txpool.AddExternalTxs(tx); err != nil {
if err = chain.txpool.AddExternalTxs(validation.InboundTx, tx); err != nil {
panic(err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions blockchain/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ const (
GodValidUntilNetworkSize = 10
)

type TxType int
type TxType = int

const (
InBlockTx = 1
MempoolTx = 2
InboundTx = 3
InBlockTx = TxType(1)
MempoolTx = TxType(2)
InboundTx = TxType(3)
)

var (
Expand Down
7 changes: 4 additions & 3 deletions consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/idena-network/idena-go/blockchain"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/blockchain/validation"
"github.com/idena-network/idena-go/common"
"github.com/idena-network/idena-go/common/hexutil"
"github.com/idena-network/idena-go/common/math"
Expand Down Expand Up @@ -162,7 +163,7 @@ func (engine *Engine) loop() {
if engine.forkResolver.HasLoadedFork() {
if revertedTxs, err := engine.forkResolver.ApplyFork(); err == nil {
if len(revertedTxs) > 0 {
engine.txpool.AddExternalTxs(revertedTxs...)
engine.txpool.AddExternalTxs(validation.MempoolTx, revertedTxs...)
}
} else {
engine.log.Warn("fork apply error", "err", err)
Expand Down Expand Up @@ -191,7 +192,7 @@ func (engine *Engine) loop() {
roundStart := time.Now().UTC()

shardId, _ := engine.chain.CoinbaseShard()
engine.log.Info("Start loop", "round", round, "head", head.Hash().Hex(),"shardId", shardId , "p2p-shardId", engine.pm.OwnPeeringShardId(), "total-peers",
engine.log.Info("Start loop", "round", round, "head", head.Hash().Hex(), "shardId", shardId, "p2p-shardId", engine.pm.OwnPeeringShardId(), "total-peers",
engine.pm.PeersCount(), "own-shard-peers", engine.pm.OwnShardPeersCount(), "online-nodes", engine.appState.ValidatorsCache.OnlineSize(),
"network", engine.appState.ValidatorsCache.NetworkSize())

Expand Down Expand Up @@ -240,7 +241,7 @@ func (engine *Engine) loop() {
engine.log.Error("error occurred during applying of fork", "err", err)
} else {
if len(revertedTxs) > 0 {
engine.txpool.AddExternalTxs(revertedTxs...)
engine.txpool.AddExternalTxs(validation.MempoolTx, revertedTxs...)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion core/flip/flipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/idena-network/idena-go/blockchain/attachments"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/blockchain/validation"
"github.com/idena-network/idena-go/common"
"github.com/idena-network/idena-go/common/eventbus"
"github.com/idena-network/idena-go/core/appstate"
Expand Down Expand Up @@ -177,7 +178,7 @@ func (fp *Flipper) addNewFlip(flip *types.Flip, local bool) error {
if local {
err = fp.txpool.AddInternalTx(flip.Tx)
} else {
err = fp.txpool.AddExternalTxs(flip.Tx)
err = fp.txpool.AddExternalTxs(validation.InboundTx, flip.Tx)
}
if err != nil && err != mempool.DuplicateTxError {
return err
Expand Down
5 changes: 3 additions & 2 deletions core/mempool/async_txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/blockchain/validation"
"github.com/idena-network/idena-go/common"
)

Expand Down Expand Up @@ -31,7 +32,7 @@ func (pool *AsyncTxPool) AddInternalTx(tx *types.Transaction) error {
panic("not implemented")
}

func (pool *AsyncTxPool) AddExternalTxs(txs ...*types.Transaction) error {
func (pool *AsyncTxPool) AddExternalTxs(txType validation.TxType, txs ...*types.Transaction) error {
skipped := 0
for _, tx := range txs {
select {
Expand Down Expand Up @@ -65,6 +66,6 @@ func (pool *AsyncTxPool) loop() {
break batchLoop
}
}
pool.txPool.AddExternalTxs(batch...)
pool.txPool.AddExternalTxs(validation.InboundTx, batch...)
}
}
16 changes: 8 additions & 8 deletions core/mempool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (

type TransactionPool interface {
AddInternalTx(tx *types.Transaction) error
AddExternalTxs(txs ...*types.Transaction) error
AddExternalTxs(txType validation.TxType, txs ...*types.Transaction) error
GetPendingTransaction(noFilter bool, id common.ShardId, count bool) []*types.Transaction
IsSyncing() bool
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func (pool *TxPool) Initialize(head *types.Header, coinbase common.Address, useT

keeperTxs := pool.txKeeper.List()
pool.txKeeper.Clear()
pool.AddExternalTxs(keeperTxs...)
pool.AddExternalTxs(validation.MempoolTx, keeperTxs...)
}
}

Expand Down Expand Up @@ -214,7 +214,7 @@ func (pool *TxPool) validate(tx *types.Transaction, appState *appstate.AppState,
return validation.ValidateTx(appState, tx, minFeePerGas, txType)
}

func (pool *TxPool) AddExternalTxs(txs ...*types.Transaction) error {
func (pool *TxPool) AddExternalTxs(txType validation.TxType, txs ...*types.Transaction) error {
appState, err := pool.appState.Readonly(pool.head.Height())

if err != nil {
Expand All @@ -240,7 +240,7 @@ func (pool *TxPool) AddExternalTxs(txs ...*types.Transaction) error {
continue
}

if err = pool.add(tx, appState, sender == pool.coinbase); err != nil && len(txs) == 1 {
if err = pool.add(tx, appState, sender == pool.coinbase, txType); err != nil && len(txs) == 1 {
return err
}
if err == nil {
Expand All @@ -259,7 +259,7 @@ func (pool *TxPool) AddInternalTx(tx *types.Transaction) error {
pool.txKeeper.AddTx(tx)
}
appState, _ := pool.appState.Readonly(pool.head.Height())
if err := pool.add(tx, appState, true); err != nil {
if err := pool.add(tx, appState, true, validation.InboundTx); err != nil {
if _, ok := priorityTypes[tx.Type]; ok {
sender, _ := types.Sender(tx)
pool.bus.Publish(&events.NewTxEvent{
Expand All @@ -279,15 +279,15 @@ func (pool *TxPool) AddInternalTx(tx *types.Transaction) error {
if err != nil {
return errors.WithMessage(err, "tx can't be validated")
}
if err = pool.add(tx, appState, true); err == nil {
if err = pool.add(tx, appState, true, validation.InboundTx); err == nil {
if pool.txKeeper != nil {
pool.txKeeper.AddTx(tx)
}
}
return err
}

func (pool *TxPool) add(tx *types.Transaction, appState *appstate.AppState, own bool) error {
func (pool *TxPool) add(tx *types.Transaction, appState *appstate.AppState, own bool, txType validation.TxType) error {
if _, ok := pool.all.Get(tx.Hash()); ok {
return DuplicateTxError
}
Expand All @@ -302,7 +302,7 @@ func (pool *TxPool) add(tx *types.Transaction, appState *appstate.AppState, own

sender, _ := types.Sender(tx)

if err := pool.validate(tx, appState, validation.InboundTx); err != nil {
if err := pool.validate(tx, appState, txType); err != nil {
pool.mutex.Unlock()
if sender == pool.coinbase {
log.Warn("Tx is not valid", "hash", tx.Hash().Hex(), "err", err)
Expand Down
86 changes: 84 additions & 2 deletions core/mempool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package mempool

import (
"crypto/ecdsa"
"github.com/idena-network/idena-go/blockchain/attachments"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/blockchain/validation"
"github.com/idena-network/idena-go/common"
"github.com/idena-network/idena-go/common/eventbus"
"github.com/idena-network/idena-go/config"
"github.com/idena-network/idena-go/core/appstate"
"github.com/idena-network/idena-go/core/state"
"github.com/idena-network/idena-go/crypto"
"github.com/idena-network/idena-go/secstore"
"github.com/idena-network/idena-go/stats/collector"
Expand Down Expand Up @@ -311,14 +314,14 @@ func TestTxPool_AddWithTxKeeper(t *testing.T) {
}
pool.txKeeper.persist()
for i := 0; i < 20; i++ {
require.NoError(t, pool.AddExternalTxs(getTx(keys[i])))
require.NoError(t, pool.AddExternalTxs(validation.InboundTx, getTx(keys[i])))
}
time.Sleep(time.Second)
require.Len(t, pool.txKeeper.txs, 320)

pool.txKeeper.RemoveTxs([]common.Hash{pool.GetPendingTransaction(false, common.MultiShard, false)[0].Hash()})
time.Sleep(time.Second)

prevPool := pool

prevPool.ResetTo(&types.Block{Header: &types.Header{
Expand Down Expand Up @@ -365,3 +368,82 @@ func TestTxPool_AddWithTxKeeper(t *testing.T) {
txKeeper.Load()
require.Len(t, pool.txKeeper.txs, 0)
}

func TestTxPool_RecoverValidationTxs_OnAfterLongSession(t *testing.T) {

txKeeperPersistInterval = time.Millisecond * 200

pool := getPool()

key, _ := crypto.GenerateKey()
address := crypto.PubkeyToAddress(key.PublicKey)
pool.appState.State.SetBalance(address, big.NewInt(0).Mul(big.NewInt(10000), common.DnaBase))
pool.appState.State.SetState(address, state.Candidate)

pool.appState.State.SetValidationPeriod(state.LongSessionPeriod)

pool.appState.Commit(nil)
pool.appState.Initialize(1)
pool.Initialize(&types.Header{
EmptyBlockHeader: &types.EmptyBlockHeader{
Height: 1,
},
}, common.Address{0x1}, true)

getTx := func(key *ecdsa.PrivateKey) *types.Transaction {

address := crypto.PubkeyToAddress(key.PublicKey)

nonce := pool.appState.NonceCache.GetNonce(address, 0)

attachment := attachments.CreateShortAnswerAttachment([]byte{0x1}, 1, 1)

tx := &types.Transaction{
AccountNonce: nonce + 1,
Epoch: 0,
Type: types.SubmitShortAnswersTx,
Payload: attachment,
}

tx, _ = types.SignTx(tx, key)
return tx
}

require.NoError(t, pool.AddExternalTxs(validation.InboundTx, getTx(key)))

time.Sleep(time.Millisecond * 500)
prevPool := pool
pool = getPool()
pool.appState = prevPool.appState
pool.appState.State.SetValidationPeriod(state.AfterLongSessionPeriod)

pool.appState.Commit(nil)

pool.Initialize(&types.Header{
EmptyBlockHeader: &types.EmptyBlockHeader{
Height: 2,
},
}, common.Address{0x1}, true)

require.Len(t, pool.all.txs, 1)

key2, _ := crypto.GenerateKey()
address2 := crypto.PubkeyToAddress(key2.PublicKey)
pool.appState.State.SetBalance(address2, big.NewInt(0).Mul(big.NewInt(10000), common.DnaBase))
pool.appState.State.SetState(address2, state.Candidate)
pool.appState.Commit(nil)

pool = getPool()
pool.appState = prevPool.appState

pool.Initialize(&types.Header{
EmptyBlockHeader: &types.EmptyBlockHeader{
Height: 3,
},
}, common.Address{0x1}, true)

require.Error(t, pool.AddInternalTx(getTx(key2)))
require.Error(t, pool.AddExternalTxs(validation.InboundTx, getTx(key2)))
require.Len(t, pool.all.txs, 1)
require.NoError(t, pool.AddExternalTxs(validation.InBlockTx, getTx(key2)))
}
3 changes: 2 additions & 1 deletion deferredtx/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package deferredtx
import (
"github.com/idena-network/idena-go/blockchain"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/blockchain/validation"
"github.com/idena-network/idena-go/common"
"github.com/idena-network/idena-go/config"
"github.com/idena-network/idena-go/core/appstate"
Expand Down Expand Up @@ -39,7 +40,7 @@ func (f *fakeTxPool) AddInternalTx(tx *types.Transaction) error {
f.counter++
return nil
}
func (f *fakeTxPool) AddExternalTxs(txs ...*types.Transaction) error {
func (f *fakeTxPool) AddExternalTxs(txType validation.TxType, txs ...*types.Transaction) error {
panic("implement me")
}

Expand Down
5 changes: 3 additions & 2 deletions protocol/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/idena-network/idena-go/blockchain"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/blockchain/validation"
"github.com/idena-network/idena-go/common"
"github.com/idena-network/idena-go/common/eventbus"
"github.com/idena-network/idena-go/common/maputil"
Expand Down Expand Up @@ -287,7 +288,7 @@ func (h *IdenaGossipHandler) handle(p *protoPeer) error {
return nil
}
p.markKey(key)
if err := h.txpool.AddExternalTxs(tx); err != nil {
if err := h.txpool.AddExternalTxs(validation.InboundTx, tx); err != nil {
h.throttlingLogger.Warn("Failed to add external txs", "err", err)
}
case GetBlockByHash:
Expand Down Expand Up @@ -1006,7 +1007,7 @@ func (h *IdenaGossipHandler) watchShardSubscription() {
for {
time.Sleep(time.Second * 20)
ownShard := h.OwnPeeringShardId()
if ownShard!=common.MultiShard && (sub == nil || topicShard != ownShard) {
if ownShard != common.MultiShard && (sub == nil || topicShard != ownShard) {
if sub != nil {
sub.Cancel()
}
Expand Down

0 comments on commit 7dc7fc1

Please sign in to comment.