Skip to content

Commit

Permalink
Implement txn pool syncing by random address and time window
Browse files Browse the repository at this point in the history
Signed-off-by: billfort <[email protected]>
  • Loading branch information
billfort committed Dec 12, 2022
1 parent 4bca218 commit 02ae58c
Show file tree
Hide file tree
Showing 8 changed files with 1,463 additions and 216 deletions.
137 changes: 136 additions & 1 deletion chain/pool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/nknorg/nkn/v2/pb"
"github.com/nknorg/nkn/v2/transaction"
"github.com/nknorg/nkn/v2/util/log"
om "github.com/wk8/go-ordered-map"
)

var (
Expand All @@ -34,6 +35,13 @@ func nanoPayKey(sender, recipient string, nonce uint64) nanoPay {
return nanoPay{sender, recipient, nonce}
}

// sync txn
const MaxSyncTxnInterval = 16000 // in milli seconds
type TxnWithTime struct { // abbreviation is twt: txn with time
ArriveTime int64
Txn *transaction.Transaction
}

// TxnPool is a list of txns that need to by add to ledger sent by user.
type TxnPool struct {
TxLists sync.Map // NonceSortedTxs instance to store user's account.
Expand All @@ -46,12 +54,21 @@ type TxnPool struct {

sync.RWMutex
lastDroppedTxn *transaction.Transaction

// sync txn
lastSyncTime int64 // last sync time in milli-second
twtMap *om.OrderedMap // txn with time ordered map, FIFO txn of last MaxSyncTxnInterval milli-seconds
twtMapMu sync.RWMutex // concurrent mutex for twtMap
twtCount int32 // simple counter of twtMap
twtFingerprint []byte // xor of txn hash in twtMap
}

func NewTxPool() *TxnPool {
tp := &TxnPool{
blockValidationState: chain.NewBlockValidationState(),
txnCount: 0,
twtFingerprint: make([]byte, 32),
twtMap: om.New(),
}

go func() {
Expand Down Expand Up @@ -224,7 +241,9 @@ func (tp *TxnPool) AppendTxnPool(txn *transaction.Transaction) error {
return err
}

if _, err := list.GetByNonce(txn.UnsignedTx.Nonce); err != nil && list.Full() {
// sync txn, we trace if this txn is exsit this pool
var oldTxn *transaction.Transaction
if oldTxn, err = list.GetByNonce(txn.UnsignedTx.Nonce); err != nil && list.Full() {
return errors.New("account txpool full, too many transaction in list")
}

Expand All @@ -243,6 +262,12 @@ func (tp *TxnPool) AppendTxnPool(txn *transaction.Transaction) error {
return err
}

// 5. sync txn, add to txn with time map
// append it into list only when it doesn't exist in the pool before or hash is different
if oldTxn == nil || oldTxn.Hash() != txn.Hash() {
tp.AppendTwt(txn)
}

return nil
}

Expand Down Expand Up @@ -609,6 +634,10 @@ func (tp *TxnPool) removeTransactions(txns []*transaction.Transaction) []*transa

func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) error {
txnsRemoved := tp.removeTransactions(txns)

// sync txn
tp.removeFromTwtMap(txnsRemoved)

tp.blockValidationState.Lock()
defer tp.blockValidationState.Unlock()
return tp.CleanBlockValidationState(txnsRemoved)
Expand Down Expand Up @@ -678,3 +707,109 @@ func (tp *TxnPool) GetNonceByTxnPool(addr common.Uint160) (uint64, error) {
func shortHashToKey(shortHash []byte) string {
return string(shortHash)
}

// sync txn
// update twtFingerprint
func (tp *TxnPool) updateTwtFingerprint(txnHash common.Uint256) {
byteTxnHash := txnHash.ToArray()
if len(tp.twtFingerprint) == 0 {
tp.twtFingerprint = byteTxnHash
} else {
for i := 0; i < len(byteTxnHash); i++ {
tp.twtFingerprint[i] ^= byteTxnHash[i]
}
}
}

// when append new txn to pool, we add this txn pointer to txtWithTime map too.
func (tp *TxnPool) AppendTwt(txn *transaction.Transaction) {

now := time.Now().UnixMilli()

twt := TxnWithTime{
ArriveTime: now,
Txn: txn,
}

tp.twtMapMu.Lock()
tp.twtMap.Set(txn.Hash(), twt)
tp.updateTwtFingerprint(txn.Hash())
tp.twtMapMu.Unlock()

atomic.AddInt32(&tp.twtCount, 1)

return

}

// romve time over items from txn with time list
func (tp *TxnPool) RemoveExpiredTwt() {

now := time.Now().UnixMilli()
expiredThreshold := now - MaxSyncTxnInterval

delKeys := make([]interface{}, 0)

pair := tp.twtMap.Oldest()
for pair != nil {
twt := pair.Value.(TxnWithTime)
if twt.ArriveTime < expiredThreshold { // expired
delKeys = append(delKeys, pair.Key)
} else {
break
}
pair = pair.Next()
}

tp.twtMapMu.Lock()
defer tp.twtMapMu.Unlock()

for _, key := range delKeys {
tp.twtMap.Delete(key)
tp.updateTwtFingerprint(key.(common.Uint256))
}

}

// remove submitted txn from txn with time list
func (tp *TxnPool) removeFromTwtMap(txnsRemoved []*transaction.Transaction) {
var totalRemoved int32 = 0

tp.twtMapMu.Lock()
for _, txn := range txnsRemoved {
txnHash := txn.Hash()
_, ok := tp.twtMap.Load(txnHash)
if ok {
tp.twtMap.Delete(txnHash)
tp.updateTwtFingerprint(txnHash)
totalRemoved++
}
}
tp.twtMapMu.Unlock()

if totalRemoved > 0 {
atomic.AddInt32(&tp.twtCount, -totalRemoved)
}
}

// get txn with time list
func (tp *TxnPool) GetTwtMap() *om.OrderedMap {
return tp.twtMap
}

// get xorHashOfTxnTime
func (tp *TxnPool) GetTwtFingerprint() ([]byte, int32) {
tp.RemoveExpiredTwt()

return tp.twtFingerprint, tp.twtCount
}

// set last sync time
func (tp *TxnPool) SetSyncTime() {
tp.lastSyncTime = time.Now().UnixMilli()
}

// set last sync time
func (tp *TxnPool) GetSyncTime() int64 {
return tp.lastSyncTime
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/wk8/go-ordered-map v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
google.golang.org/protobuf v1.26.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM=
github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 h1:89CEmDvlq/F7SJEOqkIdNDGJXrQIhuIx9D2DBXjavSU=
Expand All @@ -372,6 +373,8 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vultr/govultr v0.4.2/go.mod h1:TUuUizMOFc7z+PNMssb6iGjKjQfpw5arIaOLfocVudQ=
github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8=
github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down Expand Up @@ -653,6 +656,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
9 changes: 9 additions & 0 deletions lnode/syncblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ func (localNode *LocalNode) initSyncing() {
localNode.AddMessageHandler(pb.MessageType_GET_BLOCKS, localNode.getBlocksMessageHandler)
localNode.AddMessageHandler(pb.MessageType_GET_STATES, localNode.getStatesMessageHandler)
localNode.ResetSyncing()

// sync txn poo
localNode.AddMessageHandler(pb.MessageType_REQ_TXN_POOL_HASH, localNode.requestTxnPoolHashHandler)
localNode.AddMessageHandler(pb.MessageType_REQ_SYNC_TXN_POOL, localNode.requestSyncTxnPoolHandler)
go localNode.StartSyncTxnPool()
// sync txn, random address
localNode.AddMessageHandler(pb.MessageType_REQ_ADDR_NONCE, localNode.requestAddrNonceHandler)
localNode.AddMessageHandler(pb.MessageType_REQ_SYNC_ADDR_TXN, localNode.requestSyncAddrTxnHandler)
go localNode.StartSyncRandAddrTxn()
}

func removeStoppedNeighbors(neighbors []*node.RemoteNode) []*node.RemoteNode {
Expand Down
Loading

0 comments on commit 02ae58c

Please sign in to comment.