Skip to content

Commit

Permalink
finish sync txn coding
Browse files Browse the repository at this point in the history
Signed-off-by: billfort <[email protected]>
  • Loading branch information
billfort committed Dec 2, 2022
1 parent 53cb516 commit 26caceb
Show file tree
Hide file tree
Showing 12 changed files with 1,652 additions and 223 deletions.
6 changes: 3 additions & 3 deletions api/websocket/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func (ws *WsServer) registryMethod() {

return api.RespPacking(nil, errcode.INVALID_SIGNATURE)
} else {
log.Infof("client auth pass")
log.Infof("Client auth pass")
}
} else {
log.Infof("client doesn't send signature, it should be old version sdk")
log.Infof("Client doesn't send signature, it should be old version sdk")
}

newSessionID := hex.EncodeToString(clientID)
Expand Down Expand Up @@ -336,7 +336,7 @@ func (ws *WsServer) websocketHandler(w http.ResponseWriter, r *http.Request) {
// client auth
err = ws.sendClientAuthChallenge(sess)
if err != nil {
log.Error("send client auth challenge: ", err)
log.Error("Send client auth challenge: ", err)
return
}

Expand Down
131 changes: 131 additions & 0 deletions chain/pool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ func nanoPayKey(sender, recipient string, nonce uint64) nanoPay {
return nanoPay{sender, recipient, nonce}
}

// sync txn
const MaxSyncTxnInterval = 16000 // in milli seconds
type TxnWithTime struct {
ArriveTime int64
Txn *transaction.Transaction
}

// TxnPool is a list of txns that need to by add to ledger sent by user.
type TxnPool struct {
TxLists sync.Map // NonceSortedTxs instance to store user's account.
Expand All @@ -46,12 +53,19 @@ type TxnPool struct {

sync.RWMutex
lastDroppedTxn *transaction.Transaction

// sync txn
latSyncTime int64 // last sync time in second
mapTxnWithTime sync.Map // FIFO txn of last MaxSyncTxnInterval seconds
txnWithTimeCount int // simple counter of mapTxnWithTie
xorHashOfTxnWithTime []byte // xor of txn hash in txnWithTimeList
}

func NewTxPool() *TxnPool {
tp := &TxnPool{
blockValidationState: chain.NewBlockValidationState(),
txnCount: 0,
xorHashOfTxnWithTime: make([]byte, 32),
}

go func() {
Expand Down Expand Up @@ -224,9 +238,14 @@ func (tp *TxnPool) AppendTxnPool(txn *transaction.Transaction) error {
return err
}

// sync txn, we trace if this txn is exsit this pool
bExist := false
if _, err := list.GetByNonce(txn.UnsignedTx.Nonce); err != nil && list.Full() {
return errors.New("account txpool full, too many transaction in list")
}
if err == nil { // this txn is in the pool already.
bExist = true
}

// 2. verify txn
if err := txvalidator.VerifyTransaction(txn, chain.DefaultLedger.Store.GetHeight()+1); err != nil {
Expand All @@ -243,6 +262,11 @@ func (tp *TxnPool) AppendTxnPool(txn *transaction.Transaction) error {
return err
}

// 5. sync txn, add to txn with time list
if !bExist { // append it into list only when it doesn't exist in the pool before.
tp.AppendTxnWithTime(txn)
}

return nil
}

Expand Down Expand Up @@ -609,6 +633,10 @@ func (tp *TxnPool) removeTransactions(txns []*transaction.Transaction) []*transa

func (tp *TxnPool) CleanSubmittedTransactions(txns []*transaction.Transaction) error {
txnsRemoved := tp.removeTransactions(txns)

// sync txn
tp.removeFromTxnWithTimeList(txnsRemoved)

tp.blockValidationState.Lock()
defer tp.blockValidationState.Unlock()
return tp.CleanBlockValidationState(txnsRemoved)
Expand Down Expand Up @@ -678,3 +706,106 @@ func (tp *TxnPool) GetNonceByTxnPool(addr common.Uint160) (uint64, error) {
func shortHashToKey(shortHash []byte) string {
return string(shortHash)
}

// sync txn
// update xorHashOfTxnWithTime
func (tp *TxnPool) updateXorHashOfTxnWithTime(txnHash common.Uint256) {
byteTxnHash := txnHash.ToArray()
if len(tp.xorHashOfTxnWithTime) == 0 {
tp.xorHashOfTxnWithTime = byteTxnHash
} else {
for i := 0; i < len(byteTxnHash); i++ {
tp.xorHashOfTxnWithTime[i] ^= byteTxnHash[i]
}
}
}

// when append new txn to pool, we add this txn pointer to txtWithTime list too.
func (tp *TxnPool) AppendTxnWithTime(txn *transaction.Transaction) {

tp.RemoveTimeoverTxnWithTime()

now := time.Now().UnixMilli()

txnWithTime := TxnWithTime{
ArriveTime: now,
Txn: txn,
}
tp.mapTxnWithTime.Store(txn.Hash(), txnWithTime)
tp.txnWithTimeCount++

tp.updateXorHashOfTxnWithTime(txn.Hash())

return

}

// set txn arrive local time by duration.
func (tp *TxnPool) SetTxnTime(txnHash common.Uint256, duration int64) {
if duration <= 0 {
return
}
v, ok := tp.mapTxnWithTime.Load(txnHash)
if !ok {
return
}
txnWithTime, ok := v.(TxnWithTime)
if !ok {
return
}
txnWithTime.ArriveTime = time.Now().UnixMilli() - duration
tp.mapTxnWithTime.Store(txnHash, txnWithTime)
}

// romve time over items from txn with time list
func (tp *TxnPool) RemoveTimeoverTxnWithTime() {

iNow := time.Now().UnixMilli()

tp.mapTxnWithTime.Range(func(k, v interface{}) bool {
txnWithTime := v.(TxnWithTime)
if iNow-txnWithTime.ArriveTime > MaxSyncTxnInterval { // check if it is over time
tp.mapTxnWithTime.Delete(k)
tp.txnWithTimeCount--
txnHash := k.(common.Uint256)
tp.updateXorHashOfTxnWithTime(txnHash)
}
return true
})

}

// remove submitted txn from txn with time list
func (tp *TxnPool) removeFromTxnWithTimeList(txnsRemoved []*transaction.Transaction) {
for _, txn := range txnsRemoved {
txnHash := txn.Hash()
_, ok := tp.mapTxnWithTime.Load(txnHash)
if ok {
tp.mapTxnWithTime.Delete(txnHash)
tp.txnWithTimeCount--
tp.updateXorHashOfTxnWithTime(txnHash)
}
}
}

// get txn with time list
func (tp *TxnPool) GetMapTxnWithTime() sync.Map {
return tp.mapTxnWithTime
}

// get xorHashOfTxnTime
func (tp *TxnPool) GetXorHashAndCount() ([]byte, int) {
tp.RemoveTimeoverTxnWithTime()

return tp.xorHashOfTxnWithTime, tp.txnWithTimeCount
}

// set last sync time
func (tp *TxnPool) SetSyncTime() {
tp.latSyncTime = time.Now().UnixMilli()
}

// set last sync time
func (tp *TxnPool) GetSyncTime() int64 {
return tp.latSyncTime
}
9 changes: 9 additions & 0 deletions lnode/syncblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,15 @@ func (localNode *LocalNode) initSyncing() {
localNode.AddMessageHandler(pb.MessageType_GET_BLOCKS, localNode.getBlocksMessageHandler)
localNode.AddMessageHandler(pb.MessageType_GET_STATES, localNode.getStatesMessageHandler)
localNode.ResetSyncing()

// sync txn
localNode.AddMessageHandler(pb.MessageType_REQ_TXN_POOL_HASH, localNode.reqTxnPoolHashHandler)
localNode.AddMessageHandler(pb.MessageType_REQ_SYNC_TXN_POOL, localNode.reqSyncTxnPoolHandler)
go localNode.StartSyncTxnPool()
// sync txn, random address
localNode.AddMessageHandler(pb.MessageType_REQ_ADDR_NONCE, localNode.reqAddrNonceHandler)
localNode.AddMessageHandler(pb.MessageType_REQ_SYNC_ADDR_TXN, localNode.reqSyncAddrTxnHandler)
go localNode.StartSyncRandAddrTxn()
}

func removeStoppedNeighbors(neighbors []*node.RemoteNode) []*node.RemoteNode {
Expand Down
Loading

0 comments on commit 26caceb

Please sign in to comment.