Skip to content

Commit

Permalink
pingpong: improve transaction scheduling (#3478)
Browse files Browse the repository at this point in the history
  • Loading branch information
tolikzinovyev authored Jun 1, 2022
1 parent 9237316 commit 4e968a2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 94 deletions.
18 changes: 0 additions & 18 deletions cmd/pingpong/runCmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ var minFee uint64
var randomFee, noRandomFee bool
var randomAmount, noRandomAmount bool
var randomDst bool
var delayBetween string
var runTime string
var restTime string
var refreshTime string
var saveConfig bool
var useDefault bool
Expand Down Expand Up @@ -84,9 +82,7 @@ func init() {
runCmd.Flags().BoolVar(&randomFee, "rf", false, "Set to enable random fees (between minf and mf)")
runCmd.Flags().BoolVar(&noRandomFee, "nrf", false, "Set to disable random fees")
runCmd.Flags().BoolVar(&randomDst, "rd", false, "Send money to randomly-generated addresses")
runCmd.Flags().StringVar(&delayBetween, "delay", "", "Delay (ms) between every transaction (0 means none)")
runCmd.Flags().StringVar(&runTime, "run", "", "Duration of time (seconds) to run transfers before resting (0 means non-stop)")
runCmd.Flags().StringVar(&restTime, "rest", "", "Duration of time (seconds) to rest between transfer periods (0 means no rest)")
runCmd.Flags().StringVar(&refreshTime, "refresh", "", "Duration of time (seconds) between refilling accounts with money (0 means no refresh)")
runCmd.Flags().StringVar(&logicProg, "program", "", "File containing the compiled program to include as a logic sig")
runCmd.Flags().BoolVar(&saveConfig, "save", false, "Save the effective configuration to disk")
Expand Down Expand Up @@ -187,27 +183,13 @@ var runCmd = &cobra.Command{
}
cfg.RandomizeDst = randomDst
cfg.Quiet = quietish
if delayBetween != "" {
val, err := strconv.ParseUint(delayBetween, 10, 32)
if err != nil {
reportErrorf("Invalid value specified for --delay: %v\n", err)
}
cfg.DelayBetweenTxn = time.Duration(uint32(val)) * time.Millisecond
}
if runTime != "" {
val, err := strconv.ParseUint(runTime, 10, 32)
if err != nil {
reportErrorf("Invalid value specified for --run: %v\n", err)
}
cfg.RunTime = time.Duration(uint32(val)) * time.Second
}
if restTime != "" {
val, err := strconv.ParseUint(restTime, 10, 32)
if err != nil {
reportErrorf("Invalid value specified for --rest: %v\n", err)
}
cfg.RestTime = time.Duration(uint32(val)) * time.Second
}
if refreshTime != "" {
val, err := strconv.ParseUint(refreshTime, 10, 32)
if err != nil {
Expand Down
40 changes: 7 additions & 33 deletions shared/pingpong/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package pingpong
import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
Expand All @@ -36,7 +35,6 @@ import (
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/libgoal"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/db"
)

Expand Down Expand Up @@ -132,17 +130,6 @@ func (pps *WorkerState) ensureAccounts(ac libgoal.Client, initCfg PpConfig) (acc
return
}

// throttle transaction rate
func throttleTransactionRate(startTime time.Time, cfg PpConfig, totalSent uint64) {
localTimeDelta := time.Since(startTime)
currentTps := float64(totalSent) / localTimeDelta.Seconds()
if currentTps > float64(cfg.TxnPerSec) {
sleepSec := float64(totalSent)/float64(cfg.TxnPerSec) - localTimeDelta.Seconds()
sleepTime := time.Duration(int64(math.Round(sleepSec*1000))) * time.Millisecond
util.NanoSleep(sleepTime)
}
}

// Prepare assets for asset transaction testing
// Step 1) Create X assets for each of the participant accounts
// Step 2) For each participant account, opt-in to assets of all other participant accounts
Expand All @@ -153,13 +140,14 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie
return
}

var startTime = time.Now()
var totalSent uint64 = 0
resultAssetMaps = make(map[uint64]v1.AssetParams)

// optIns contains own and explicitly opted-in assets
optIns = make(map[uint64][]string)
numCreatedAssetsByAddr := make(map[string]int, len(accounts))

nextSendTime := time.Now()

// 1) Create X assets for each of the participant accounts
for addr := range accounts {
if addr == pps.cfg.SrcAccount {
Expand All @@ -179,6 +167,7 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie
fmt.Printf("cfg.NumAsset %v, addrAccount.AssetParams %v\n", pps.cfg.NumAsset, addrAccount.AssetParams)

totalSupply := pps.cfg.MinAccountAsset * uint64(pps.cfg.NumPartAccounts) * 9 * uint64(pps.cfg.GroupSize) * uint64(pps.cfg.RefreshTime.Seconds()) / pps.cfg.TxnPerSec

// create assets in participant account
for i := 0; i < toCreate; i++ {
var metaLen = 32
Expand All @@ -205,14 +194,12 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie
return
}
tx.Note = pps.makeNextUniqueNoteField()
schedule(pps.cfg.TxnPerSec, &nextSendTime)
_, err = signAndBroadcastTransaction(accounts[addr], tx, client)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset creation failed with error %v\n", err)
return
}

totalSent++
throttleTransactionRate(startTime, pps.cfg, totalSent)
}
}

Expand Down Expand Up @@ -255,10 +242,6 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie
// optInsByAddr tracks only explicitly opted-in assetsA
optInsByAddr := make(map[string]map[uint64]bool)

// reset rate-control
startTime = time.Now()
totalSent = 0

// 2) For each participant account, opt-in up to proto.MaxAssetsPerAccount assets of all other participant accounts
for addr := range accounts {
if addr == pps.cfg.SrcAccount {
Expand Down Expand Up @@ -308,17 +291,14 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie
}
tx.Note = pps.makeNextUniqueNoteField()

schedule(pps.cfg.TxnPerSec, &nextSendTime)
_, err = signAndBroadcastTransaction(accounts[addr], tx, client)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset optin failed with error %v\n", err)
return
}
totalSent++

optIns[k] = append(optIns[k], addr)
optInsByAddr[addr][k] = true

throttleTransactionRate(startTime, pps.cfg, totalSent)
}
}

Expand Down Expand Up @@ -354,10 +334,6 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie
}
}

// reset rate-control
startTime = time.Now()
totalSent = 0

// Step 3) Evenly distribute the assets across all opted-in accounts
for k, creator := range allAssets {
if !pps.cfg.Quiet {
Expand Down Expand Up @@ -403,14 +379,12 @@ func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, clie
}
}

schedule(pps.cfg.TxnPerSec, &nextSendTime)
_, err = signAndBroadcastTransaction(accounts[creator], tx, client)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset distribution failed with error %v\n", err)
return
}

totalSent++
throttleTransactionRate(startTime, pps.cfg, totalSent)
}
// append the asset to the result assets
resultAssetMaps[k] = assetParams[k]
Expand Down
4 changes: 0 additions & 4 deletions shared/pingpong/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const ConfigFilename = "ppconfig.json"
// PpConfig defines configuration structure for
type PpConfig struct {
SrcAccount string
DelayBetweenTxn time.Duration
RandomizeFee bool
RandomizeAmt bool
RandomizeDst bool
Expand All @@ -41,7 +40,6 @@ type PpConfig struct {
TxnPerSec uint64
NumPartAccounts uint32
RunTime time.Duration
RestTime time.Duration
RefreshTime time.Duration
MinAccountFunds uint64
Quiet bool
Expand Down Expand Up @@ -71,7 +69,6 @@ type PpConfig struct {
// DefaultConfig object for Ping Pong
var DefaultConfig = PpConfig{
SrcAccount: "",
DelayBetweenTxn: 100,
RandomizeFee: false,
RandomizeAmt: false,
RandomizeDst: false,
Expand All @@ -81,7 +78,6 @@ var DefaultConfig = PpConfig{
TxnPerSec: 200,
NumPartAccounts: 10,
RunTime: 10 * time.Second,
RestTime: 1 * time.Hour, // Long default rest to avoid accidental DoS
RefreshTime: 10 * time.Second,
MinAccountFunds: 100000,
GroupSize: 1,
Expand Down
57 changes: 18 additions & 39 deletions shared/pingpong/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,16 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (fundingRequi
return
}

// Wait for `*nextSendTime` and update it afterwards.
func schedule(tps uint64, nextSendTime *time.Time) {
dur := time.Until(*nextSendTime)
if dur > 0 {
time.Sleep(dur)
}

*nextSendTime = nextSendTime.Add(time.Second / time.Duration(tps))
}

func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) error {
var srcFunds, minFund uint64
var err error
Expand All @@ -272,7 +282,6 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien
return err
}

startTime := time.Now()
var totalSent uint64

// Fee of 0 will make cause the function to use the suggested one by network
Expand All @@ -282,12 +291,12 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien
if err != nil {
return err
}

fmt.Printf("adjusting account balance to %d\n", minFund)

nextSendTime := time.Now()
for {
accountsAdjusted := 0
for addr, acct := range accounts {

if addr == pps.cfg.SrcAccount {
continue
}
Expand All @@ -307,6 +316,7 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien
fmt.Printf("adjusting balance of account %v by %d\n ", addr, toSend)
}

schedule(cfg.TxnPerSec, &nextSendTime)
tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend)
if err != nil {
if strings.Contains(err.Error(), "broadcast queue full") {
Expand All @@ -323,7 +333,6 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien
}

totalSent++
throttleTransactionRate(startTime, cfg, totalSent)
}
accounts[cfg.SrcAccount].setBalance(srcFunds)
// wait until all the above transactions are sent, or that we have no more transactions
Expand Down Expand Up @@ -462,7 +471,6 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) {
if cfg.MaxRuntime > 0 {
endTime = time.Now().Add(cfg.MaxRuntime)
}
restTime := cfg.RestTime
refreshTime := time.Now().Add(cfg.RefreshTime)

var nftThrottler *throttler
Expand All @@ -473,6 +481,7 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) {
lastLog := time.Now()
nextLog := lastLog.Add(logPeriod)

nextSendTime := time.Now()
for {
if ctx.Err() != nil {
_, _ = fmt.Fprintf(os.Stderr, "error bad context in RunPingPong: %v\n", ctx.Err())
Expand Down Expand Up @@ -520,7 +529,7 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) {
}
toList := listSufficientAccounts(pps.accounts, minimumAmount, cfg.SrcAccount)

sent, succeeded, err := pps.sendFromTo(fromList, toList, ac)
sent, succeeded, err := pps.sendFromTo(fromList, toList, ac, &nextSendTime)
totalSent += sent
totalSucceeded += succeeded
if err != nil {
Expand All @@ -535,16 +544,10 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) {

refreshTime = refreshTime.Add(cfg.RefreshTime)
}

throttleTransactionRate(startTime, cfg, totalSent)
}

timeDelta := time.Since(startTime)
_, _ = fmt.Fprintf(os.Stdout, "Sent %d transactions (%d attempted) in %d seconds\n", totalSucceeded, totalSent, int(math.Round(timeDelta.Seconds())))
if cfg.RestTime > 0 {
_, _ = fmt.Fprintf(os.Stdout, "Pausing %d seconds before sending more transactions\n", int(math.Round(cfg.RestTime.Seconds())))
time.Sleep(restTime)
}
}
}

Expand Down Expand Up @@ -672,7 +675,7 @@ func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64,

func (pps *WorkerState) sendFromTo(
fromList, toList []string,
client libgoal.Client,
client libgoal.Client, nextSendTime *time.Time,
) (sentCount, successCount uint64, err error) {
accounts := pps.accounts
cinfo := pps.cinfo
Expand All @@ -693,8 +696,6 @@ func (pps *WorkerState) sendFromTo(
*ap = p
assetsByCreator[c] = append(assetsByCreator[c], ap)
}
lastTransactionTime := time.Now()
timeCredit := time.Duration(0)
for i := 0; i < len(fromList); i = (i + 1) % len(fromList) {
from := fromList[i]

Expand Down Expand Up @@ -770,6 +771,7 @@ func (pps *WorkerState) sendFromTo(
return
}

schedule(cfg.TxnPerSec, nextSendTime)
sentCount++
_, sendErr = client.BroadcastTransaction(stxn)
} else {
Expand Down Expand Up @@ -856,6 +858,7 @@ func (pps *WorkerState) sendFromTo(
}
}

schedule(cfg.TxnPerSec, nextSendTime)
sentCount++
sendErr = client.BroadcastTransactionGroup(stxGroup)
}
Expand All @@ -871,30 +874,6 @@ func (pps *WorkerState) sendFromTo(
accounts[from].addBalance(fromBalanceChange)
// avoid updating the "to" account.

// the logic here would sleep for the remaining of time to match the desired cfg.DelayBetweenTxn
if cfg.DelayBetweenTxn > 0 {
time.Sleep(cfg.DelayBetweenTxn)
}
if cfg.TxnPerSec > 0 {
timeCredit += time.Second / time.Duration(cfg.TxnPerSec)

now := time.Now()
took := now.Sub(lastTransactionTime)
timeCredit -= took
if timeCredit > 0 {
time.Sleep(timeCredit)
timeCredit -= time.Since(now)
} else if timeCredit < -1000*time.Millisecond {
// cap the "time debt" to 1000 ms.
timeCredit = -1000 * time.Millisecond
}
lastTransactionTime = time.Now()

// since we just slept enough here, we can take it off the counters
sentCount--
successCount--
// fmt.Printf("itration took %v\n", took)
}
}
return
}
Expand Down

0 comments on commit 4e968a2

Please sign in to comment.