Skip to content

Commit

Permalink
Implement txn pool syncing by random address and time window
Browse files Browse the repository at this point in the history
Signed-off-by: billfort <[email protected]>
  • Loading branch information
billfort committed Dec 19, 2022
1 parent c32148c commit 2d5434a
Show file tree
Hide file tree
Showing 10 changed files with 1,900 additions and 216 deletions.
20 changes: 19 additions & 1 deletion chain/pool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/nknorg/nkn/v2/pb"
"github.com/nknorg/nkn/v2/transaction"
"github.com/nknorg/nkn/v2/util/log"
om "github.com/wk8/go-ordered-map"
)

var (
Expand Down Expand Up @@ -46,12 +47,22 @@ type TxnPool struct {

sync.RWMutex
lastDroppedTxn *transaction.Transaction

// sync txn
lastSyncTime int64 // last sync time in milli-second
twtMap *om.OrderedMap // txn with time ordered map, FIFO txn of last MaxSyncTxnInterval milli-seconds
twtMapMu sync.RWMutex // concurrent mutex for twtMap
twtCount int // simple counter of twtMap

fingerprintMu sync.RWMutex // mutex for twtFingerprint
twtFingerprint []byte // xor of txn hash in twtMap
}

func NewTxPool() *TxnPool {
tp := &TxnPool{
blockValidationState: chain.NewBlockValidationState(),
txnCount: 0,
twtMap: om.New(),
}

go func() {
Expand Down Expand Up @@ -224,7 +235,7 @@ func (tp *TxnPool) AppendTxnPool(txn *transaction.Transaction) error {
return err
}

if _, err := list.GetByNonce(txn.UnsignedTx.Nonce); err != nil && list.Full() {
if _, err = list.GetByNonce(txn.UnsignedTx.Nonce); err != nil && list.Full() {
return errors.New("account txpool full, too many transaction in list")
}

Expand All @@ -243,6 +254,9 @@ func (tp *TxnPool) AppendTxnPool(txn *transaction.Transaction) error {
return err
}

// 5. sync txn, add to txn with time map
tp.appendTwt(txn)

return nil
}

Expand Down Expand Up @@ -609,6 +623,10 @@ func (tp *TxnPool) removeTransactions(txns []*transaction.Transaction) []*transa

func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) error {
txnsRemoved := tp.removeTransactions(txns)

// sync txn
tp.removeFromTwtMap(txnsRemoved)

tp.blockValidationState.Lock()
defer tp.blockValidationState.Unlock()
return tp.CleanBlockValidationState(txnsRemoved)
Expand Down
220 changes: 220 additions & 0 deletions chain/pool/txwithtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package pool

import (
"fmt"
"time"

"github.com/nknorg/nkn/v2/common"
"github.com/nknorg/nkn/v2/pb"
"github.com/nknorg/nkn/v2/transaction"
)

// sync txn
const MaxSyncTxnInterval = 16000 // in milli seconds

type txnWithTime struct { // abbreviation is twt: txn with time
arriveTime int64
txn *transaction.Transaction
}

// xor of byte slices
func xorBytes(b1 []byte, b2 []byte) ([]byte, error) {
if len(b1) != len(b2) {
return nil, fmt.Errorf("Input slices don't have same length")
}

b := make([]byte, len(b1))
for i := 0; i < len(b1); i++ {
b[i] = b1[i] ^ b2[i]
}

return b, nil
}

// update twtFingerprint
func (tp *TxnPool) updateTwtFingerprint(txnHash common.Uint256) []byte {

tp.fingerprintMu.Lock()
defer tp.fingerprintMu.Unlock()

if tp.twtFingerprint == nil {
tp.twtFingerprint = make([]byte, 0, len(txnHash.ToArray()))
tp.twtFingerprint = append(tp.twtFingerprint, txnHash.ToArray()...)
} else {
tp.twtFingerprint, _ = xorBytes(tp.twtFingerprint, txnHash.ToArray())
}
return tp.twtFingerprint
}

// when append new txn to pool, we add this txn pointer to txtWithTime map too.
func (tp *TxnPool) appendTwt(txn *transaction.Transaction) *txnWithTime {

now := time.Now().UnixMilli()

twt := &txnWithTime{
arriveTime: now,
txn: txn,
}

tp.twtMapMu.Lock()
defer tp.twtMapMu.Unlock()

tp.twtMap.Set(txn.Hash(), twt)
tp.updateTwtFingerprint(txn.Hash())
tp.twtCount++

return twt
}

// remove a txn from twtMap
func (tp *TxnPool) removeTwt(txnHash common.Uint256) (twt *txnWithTime) {
tp.twtMapMu.Lock()
defer tp.twtMapMu.Unlock()

v, ok := tp.twtMap.Delete(txnHash)
if ok {
tp.twtCount--
twt = v.(*txnWithTime)
txnHash := twt.txn.Hash()
tp.twtFingerprint, _ = xorBytes(tp.twtFingerprint, txnHash.ToArray())
}

return
}

// romve time over items from txn with time list
func (tp *TxnPool) RemoveExpiredTwt() (expiredTwt []*txnWithTime) {

now := time.Now().UnixMilli()
expiredThreshold := now - MaxSyncTxnInterval

delKeys := make([]interface{}, 0)

tp.twtMapMu.Lock()
defer tp.twtMapMu.Unlock()

for pair := tp.twtMap.Oldest(); pair != nil; pair = pair.Next() {
twt := pair.Value.(*txnWithTime)
if twt.arriveTime < expiredThreshold { // expired
delKeys = append(delKeys, pair.Key)
expiredTwt = append(expiredTwt, pair.Value.(*txnWithTime))
} else {
break
}
}

for _, key := range delKeys {
tp.twtMap.Delete(key)
tp.twtCount--
tp.updateTwtFingerprint(key.(common.Uint256))
}

return
}

// remove submitted txns from txn with time map
func (tp *TxnPool) removeFromTwtMap(txnsRemoved []*transaction.Transaction) {
var totalRemoved int = 0

tp.twtMapMu.Lock()
defer tp.twtMapMu.Unlock()

for _, txn := range txnsRemoved {
txnHash := txn.Hash()
_, ok := tp.twtMap.Load(txnHash)
if ok {
tp.twtMap.Delete(txnHash)
tp.updateTwtFingerprint(txnHash)
totalRemoved++
}
}
tp.twtCount -= totalRemoved

}

// get address and nonce from txn with time map
func (tp *TxnPool) GetTwtAddrNonce() map[common.Uint160]uint64 {

mapAddrNonce := make(map[common.Uint160]uint64)

tp.twtMapMu.Lock()
defer tp.twtMapMu.Unlock()

for pair := tp.twtMap.Oldest(); pair != nil; pair = pair.Next() {

twt := pair.Value.(*txnWithTime)
sender, err := twt.txn.GetProgramHashes()
if err != nil {
return nil
}
nonce, _ := mapAddrNonce[sender[0]]
if twt.txn.UnsignedTx.Nonce > nonce {
mapAddrNonce[sender[0]] = twt.txn.UnsignedTx.Nonce
}
}

return mapAddrNonce
}

// get transactions later than specific time from txn with time map
// earliest is in milli-second
func (tp *TxnPool) GetTwtTxnAfter(earliest int64, reqAddrNonce map[common.Uint160]uint64) ([]*pb.Transaction, error) {
respTxns := make([]*pb.Transaction, 0)

tp.twtMapMu.Lock()
defer tp.twtMapMu.Unlock()

for pair := tp.twtMap.Newest(); pair != nil; pair = pair.Prev() {

twt := pair.Value.(*txnWithTime)
if twt.arriveTime < earliest { // skip too old txn
break
}

sender, err := twt.txn.GetProgramHashes()
if err != nil {
return nil, err
}

addr := sender[0]
nonce, ok := reqAddrNonce[addr]

if ok { // if addr is in request list, we append greater nonce txns to response list

if twt.txn.UnsignedTx.Nonce > nonce {
respTxns = append(respTxns, &pb.Transaction{
UnsignedTx: twt.txn.UnsignedTx,
Programs: twt.txn.Programs,
})
}
} else { // if the address is not in request list, we append it to response list
respTxns = append(respTxns, &pb.Transaction{
UnsignedTx: twt.txn.UnsignedTx,
Programs: twt.txn.Programs,
})
}
}

return respTxns, nil
}

// get xorHashOfTxnTime
func (tp *TxnPool) GetTwtFingerprintAndCount() ([]byte, int) {
tp.fingerprintMu.Lock()
defer tp.fingerprintMu.Unlock()

fp := make([]byte, 0, len(tp.twtFingerprint))
fp = append(fp, tp.twtFingerprint...)

return fp, tp.twtCount
}

// set last sync time
func (tp *TxnPool) SetSyncTime() {
tp.lastSyncTime = time.Now().UnixMilli()
}

// set last sync time
func (tp *TxnPool) GetSyncTime() int64 {
return tp.lastSyncTime
}
Loading

0 comments on commit 2d5434a

Please sign in to comment.