From 4e968a2217066c6f5a61525aac95636088643636 Mon Sep 17 00:00:00 2001 From: Tolik Zinovyev Date: Wed, 1 Jun 2022 19:20:18 -0400 Subject: [PATCH] pingpong: improve transaction scheduling (#3478) --- cmd/pingpong/runCmd.go | 18 ------------ shared/pingpong/accounts.go | 40 +++++--------------------- shared/pingpong/config.go | 4 --- shared/pingpong/pingpong.go | 57 ++++++++++++------------------------- 4 files changed, 25 insertions(+), 94 deletions(-) diff --git a/cmd/pingpong/runCmd.go b/cmd/pingpong/runCmd.go index cb62840360..21bd8c1aa8 100644 --- a/cmd/pingpong/runCmd.go +++ b/cmd/pingpong/runCmd.go @@ -43,9 +43,7 @@ var minFee uint64 var randomFee, noRandomFee bool var randomAmount, noRandomAmount bool var randomDst bool -var delayBetween string var runTime string -var restTime string var refreshTime string var saveConfig bool var useDefault bool @@ -84,9 +82,7 @@ func init() { runCmd.Flags().BoolVar(&randomFee, "rf", false, "Set to enable random fees (between minf and mf)") runCmd.Flags().BoolVar(&noRandomFee, "nrf", false, "Set to disable random fees") runCmd.Flags().BoolVar(&randomDst, "rd", false, "Send money to randomly-generated addresses") - runCmd.Flags().StringVar(&delayBetween, "delay", "", "Delay (ms) between every transaction (0 means none)") runCmd.Flags().StringVar(&runTime, "run", "", "Duration of time (seconds) to run transfers before resting (0 means non-stop)") - runCmd.Flags().StringVar(&restTime, "rest", "", "Duration of time (seconds) to rest between transfer periods (0 means no rest)") runCmd.Flags().StringVar(&refreshTime, "refresh", "", "Duration of time (seconds) between refilling accounts with money (0 means no refresh)") runCmd.Flags().StringVar(&logicProg, "program", "", "File containing the compiled program to include as a logic sig") runCmd.Flags().BoolVar(&saveConfig, "save", false, "Save the effective configuration to disk") @@ -187,13 +183,6 @@ var runCmd = &cobra.Command{ } cfg.RandomizeDst = randomDst cfg.Quiet = quietish - if delayBetween != "" { - val, err := strconv.ParseUint(delayBetween, 10, 32) - if err != nil { - reportErrorf("Invalid value specified for --delay: %v\n", err) - } - cfg.DelayBetweenTxn = time.Duration(uint32(val)) * time.Millisecond - } if runTime != "" { val, err := strconv.ParseUint(runTime, 10, 32) if err != nil { @@ -201,13 +190,6 @@ var runCmd = &cobra.Command{ } cfg.RunTime = time.Duration(uint32(val)) * time.Second } - if restTime != "" { - val, err := strconv.ParseUint(restTime, 10, 32) - if err != nil { - reportErrorf("Invalid value specified for --rest: %v\n", err) - } - cfg.RestTime = time.Duration(uint32(val)) * time.Second - } if refreshTime != "" { val, err := strconv.ParseUint(refreshTime, 10, 32) if err != nil { diff --git a/shared/pingpong/accounts.go b/shared/pingpong/accounts.go index 7cf8de4054..fa8cfecdf0 100644 --- a/shared/pingpong/accounts.go +++ b/shared/pingpong/accounts.go @@ -19,7 +19,6 @@ package pingpong import ( "fmt" "io/ioutil" - "math" "math/rand" "os" "path/filepath" @@ -36,7 +35,6 @@ import ( "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/libgoal" "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/util" "github.com/algorand/go-algorand/util/db" ) @@ -132,17 +130,6 @@ func (pps *WorkerState) ensureAccounts(ac libgoal.Client, initCfg PpConfig) (acc return } -// throttle transaction rate -func throttleTransactionRate(startTime time.Time, cfg PpConfig, totalSent uint64) { - localTimeDelta := time.Since(startTime) - currentTps := float64(totalSent) / localTimeDelta.Seconds() - if currentTps > float64(cfg.TxnPerSec) { - sleepSec := float64(totalSent)/float64(cfg.TxnPerSec) - localTimeDelta.Seconds() - sleepTime := time.Duration(int64(math.Round(sleepSec*1000))) * time.Millisecond - util.NanoSleep(sleepTime) - } -} - // Prepare assets for asset transaction testing // Step 1) Create X assets for each of the participant accounts // Step 2) For each participant account, opt-in to assets of all other participant accounts @@ -153,13 +140,14 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie return } - var startTime = time.Now() - var totalSent uint64 = 0 resultAssetMaps = make(map[uint64]v1.AssetParams) // optIns contains own and explicitly opted-in assets optIns = make(map[uint64][]string) numCreatedAssetsByAddr := make(map[string]int, len(accounts)) + + nextSendTime := time.Now() + // 1) Create X assets for each of the participant accounts for addr := range accounts { if addr == pps.cfg.SrcAccount { @@ -179,6 +167,7 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie fmt.Printf("cfg.NumAsset %v, addrAccount.AssetParams %v\n", pps.cfg.NumAsset, addrAccount.AssetParams) totalSupply := pps.cfg.MinAccountAsset * uint64(pps.cfg.NumPartAccounts) * 9 * uint64(pps.cfg.GroupSize) * uint64(pps.cfg.RefreshTime.Seconds()) / pps.cfg.TxnPerSec + // create assets in participant account for i := 0; i < toCreate; i++ { var metaLen = 32 @@ -205,14 +194,12 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie return } tx.Note = pps.makeNextUniqueNoteField() + schedule(pps.cfg.TxnPerSec, &nextSendTime) _, err = signAndBroadcastTransaction(accounts[addr], tx, client) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset creation failed with error %v\n", err) return } - - totalSent++ - throttleTransactionRate(startTime, pps.cfg, totalSent) } } @@ -255,10 +242,6 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie // optInsByAddr tracks only explicitly opted-in assetsA optInsByAddr := make(map[string]map[uint64]bool) - // reset rate-control - startTime = time.Now() - totalSent = 0 - // 2) For each participant account, opt-in up to proto.MaxAssetsPerAccount assets of all other participant accounts for addr := range accounts { if addr == pps.cfg.SrcAccount { @@ -308,17 +291,14 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie } tx.Note = pps.makeNextUniqueNoteField() + schedule(pps.cfg.TxnPerSec, &nextSendTime) _, err = signAndBroadcastTransaction(accounts[addr], tx, client) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset optin failed with error %v\n", err) return } - totalSent++ - optIns[k] = append(optIns[k], addr) optInsByAddr[addr][k] = true - - throttleTransactionRate(startTime, pps.cfg, totalSent) } } @@ -354,10 +334,6 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie } } - // reset rate-control - startTime = time.Now() - totalSent = 0 - // Step 3) Evenly distribute the assets across all opted-in accounts for k, creator := range allAssets { if !pps.cfg.Quiet { @@ -403,14 +379,12 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie } } + schedule(pps.cfg.TxnPerSec, &nextSendTime) _, err = signAndBroadcastTransaction(accounts[creator], tx, client) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset distribution failed with error %v\n", err) return } - - totalSent++ - throttleTransactionRate(startTime, pps.cfg, totalSent) } // append the asset to the result assets resultAssetMaps[k] = assetParams[k] diff --git a/shared/pingpong/config.go b/shared/pingpong/config.go index 73ad4e4ec7..db6cbb4ed1 100644 --- a/shared/pingpong/config.go +++ b/shared/pingpong/config.go @@ -31,7 +31,6 @@ const ConfigFilename = "ppconfig.json" // PpConfig defines configuration structure for type PpConfig struct { SrcAccount string - DelayBetweenTxn time.Duration RandomizeFee bool RandomizeAmt bool RandomizeDst bool @@ -41,7 +40,6 @@ type PpConfig struct { TxnPerSec uint64 NumPartAccounts uint32 RunTime time.Duration - RestTime time.Duration RefreshTime time.Duration MinAccountFunds uint64 Quiet bool @@ -71,7 +69,6 @@ type PpConfig struct { // DefaultConfig object for Ping Pong var DefaultConfig = PpConfig{ SrcAccount: "", - DelayBetweenTxn: 100, RandomizeFee: false, RandomizeAmt: false, RandomizeDst: false, @@ -81,7 +78,6 @@ var DefaultConfig = PpConfig{ TxnPerSec: 200, NumPartAccounts: 10, RunTime: 10 * time.Second, - RestTime: 1 * time.Hour, // Long default rest to avoid accidental DoS RefreshTime: 10 * time.Second, MinAccountFunds: 100000, GroupSize: 1, diff --git a/shared/pingpong/pingpong.go b/shared/pingpong/pingpong.go index 71d787f792..5fcde03737 100644 --- a/shared/pingpong/pingpong.go +++ b/shared/pingpong/pingpong.go @@ -262,6 +262,16 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (fundingRequi return } +// Wait for `*nextSendTime` and update it afterwards. +func schedule(tps uint64, nextSendTime *time.Time) { + dur := time.Until(*nextSendTime) + if dur > 0 { + time.Sleep(dur) + } + + *nextSendTime = nextSendTime.Add(time.Second / time.Duration(tps)) +} + func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) error { var srcFunds, minFund uint64 var err error @@ -272,7 +282,6 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien return err } - startTime := time.Now() var totalSent uint64 // Fee of 0 will make cause the function to use the suggested one by network @@ -282,12 +291,12 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien if err != nil { return err } - fmt.Printf("adjusting account balance to %d\n", minFund) + + nextSendTime := time.Now() for { accountsAdjusted := 0 for addr, acct := range accounts { - if addr == pps.cfg.SrcAccount { continue } @@ -307,6 +316,7 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien fmt.Printf("adjusting balance of account %v by %d\n ", addr, toSend) } + schedule(cfg.TxnPerSec, &nextSendTime) tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend) if err != nil { if strings.Contains(err.Error(), "broadcast queue full") { @@ -323,7 +333,6 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien } totalSent++ - throttleTransactionRate(startTime, cfg, totalSent) } accounts[cfg.SrcAccount].setBalance(srcFunds) // wait until all the above transactions are sent, or that we have no more transactions @@ -462,7 +471,6 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { if cfg.MaxRuntime > 0 { endTime = time.Now().Add(cfg.MaxRuntime) } - restTime := cfg.RestTime refreshTime := time.Now().Add(cfg.RefreshTime) var nftThrottler *throttler @@ -473,6 +481,7 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { lastLog := time.Now() nextLog := lastLog.Add(logPeriod) + nextSendTime := time.Now() for { if ctx.Err() != nil { _, _ = fmt.Fprintf(os.Stderr, "error bad context in RunPingPong: %v\n", ctx.Err()) @@ -520,7 +529,7 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { } toList := listSufficientAccounts(pps.accounts, minimumAmount, cfg.SrcAccount) - sent, succeeded, err := pps.sendFromTo(fromList, toList, ac) + sent, succeeded, err := pps.sendFromTo(fromList, toList, ac, &nextSendTime) totalSent += sent totalSucceeded += succeeded if err != nil { @@ -535,16 +544,10 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { refreshTime = refreshTime.Add(cfg.RefreshTime) } - - throttleTransactionRate(startTime, cfg, totalSent) } timeDelta := time.Since(startTime) _, _ = fmt.Fprintf(os.Stdout, "Sent %d transactions (%d attempted) in %d seconds\n", totalSucceeded, totalSent, int(math.Round(timeDelta.Seconds()))) - if cfg.RestTime > 0 { - _, _ = fmt.Fprintf(os.Stdout, "Pausing %d seconds before sending more transactions\n", int(math.Round(cfg.RestTime.Seconds()))) - time.Sleep(restTime) - } } } @@ -672,7 +675,7 @@ func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64, func (pps *WorkerState) sendFromTo( fromList, toList []string, - client libgoal.Client, + client libgoal.Client, nextSendTime *time.Time, ) (sentCount, successCount uint64, err error) { accounts := pps.accounts cinfo := pps.cinfo @@ -693,8 +696,6 @@ func (pps *WorkerState) sendFromTo( *ap = p assetsByCreator[c] = append(assetsByCreator[c], ap) } - lastTransactionTime := time.Now() - timeCredit := time.Duration(0) for i := 0; i < len(fromList); i = (i + 1) % len(fromList) { from := fromList[i] @@ -770,6 +771,7 @@ func (pps *WorkerState) sendFromTo( return } + schedule(cfg.TxnPerSec, nextSendTime) sentCount++ _, sendErr = client.BroadcastTransaction(stxn) } else { @@ -856,6 +858,7 @@ func (pps *WorkerState) sendFromTo( } } + schedule(cfg.TxnPerSec, nextSendTime) sentCount++ sendErr = client.BroadcastTransactionGroup(stxGroup) } @@ -871,30 +874,6 @@ func (pps *WorkerState) sendFromTo( accounts[from].addBalance(fromBalanceChange) // avoid updating the "to" account. - // the logic here would sleep for the remaining of time to match the desired cfg.DelayBetweenTxn - if cfg.DelayBetweenTxn > 0 { - time.Sleep(cfg.DelayBetweenTxn) - } - if cfg.TxnPerSec > 0 { - timeCredit += time.Second / time.Duration(cfg.TxnPerSec) - - now := time.Now() - took := now.Sub(lastTransactionTime) - timeCredit -= took - if timeCredit > 0 { - time.Sleep(timeCredit) - timeCredit -= time.Since(now) - } else if timeCredit < -1000*time.Millisecond { - // cap the "time debt" to 1000 ms. - timeCredit = -1000 * time.Millisecond - } - lastTransactionTime = time.Now() - - // since we just slept enough here, we can take it off the counters - sentCount-- - successCount-- - // fmt.Printf("itration took %v\n", took) - } } return }