diff --git a/cmd/pingpong/runCmd.go b/cmd/pingpong/runCmd.go index 452231f4df..9df0052ac1 100644 --- a/cmd/pingpong/runCmd.go +++ b/cmd/pingpong/runCmd.go @@ -19,11 +19,14 @@ package main import ( "context" "encoding/base64" + "encoding/json" "fmt" + "math/rand" "os" "path/filepath" "runtime/pprof" "strconv" + "strings" "time" "github.com/spf13/cobra" @@ -67,16 +70,21 @@ var rekey bool var nftAsaPerSecond uint32 var pidFile string var cpuprofile string +var randSeed int64 +var deterministicKeys bool +var generatedAccountsCount uint32 +var generatedAccountSampleMethod string +var configPath string func init() { rootCmd.AddCommand(runCmd) runCmd.PersistentFlags().StringVarP(&dataDir, "datadir", "d", "", "Data directory for the node") - runCmd.Flags().StringVarP(&srcAddress, "src", "s", "", "Account address to use as funding source for new accounts)") + runCmd.Flags().StringVarP(&srcAddress, "src", "s", "", "Account address to use as funding source for new accounts") runCmd.Flags().Uint32VarP(&numAccounts, "numaccounts", "n", 0, "The number of accounts to include in the transfers") runCmd.Flags().Uint64VarP(&maxAmount, "ma", "a", 0, "The (max) amount to be transferred") runCmd.Flags().Uint64VarP(&minAccountFunds, "minaccount", "", 0, "The minimum amount to fund a test account with") - runCmd.Flags().Uint64VarP(&txnPerSec, "tps", "t", 200, "Number of Txn per second that pingpong sends") + runCmd.Flags().Uint64VarP(&txnPerSec, "tps", "t", 0, "Number of Txn per second that pingpong sends") runCmd.Flags().Int64VarP(&maxFee, "mf", "f", -1, "The MAX fee to be used for transactions, a value of '0' tells the server to use a suggested fee.") runCmd.Flags().Uint64VarP(&minFee, "minf", "m", 1000, "The MIN fee to be used for randomFee transactions") runCmd.Flags().BoolVar(&randomAmount, "ra", false, "Set to enable random amounts (up to maxamount)") @@ -87,6 +95,7 @@ func init() { runCmd.Flags().StringVar(&runTime, "run", "", "Duration of time (seconds) to run transfers before resting (0 means non-stop)") runCmd.Flags().StringVar(&refreshTime, "refresh", "", "Duration of time (seconds) between refilling accounts with money (0 means no refresh)") runCmd.Flags().StringVar(&logicProg, "program", "", "File containing the compiled program to include as a logic sig") + runCmd.Flags().StringVar(&configPath, "config", "", "path to read config json from, or json literal") runCmd.Flags().BoolVar(&saveConfig, "save", false, "Save the effective configuration to disk") runCmd.Flags().BoolVar(&useDefault, "reset", false, "Reset to the default configuration (not read from disk)") runCmd.Flags().BoolVar(&quietish, "quiet", false, "quietish stdout logging") @@ -107,6 +116,10 @@ func init() { runCmd.Flags().Uint32Var(&nftAsaPerSecond, "nftasapersecond", 0, "The number of NFT-style ASAs to create per second") runCmd.Flags().StringVar(&pidFile, "pidfile", "", "path to write process id of this pingpong") runCmd.Flags().StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to `file`") + runCmd.Flags().Int64Var(&randSeed, "seed", 0, "input to math/rand.Seed(), defaults to time.Now().UnixNano()") + runCmd.Flags().BoolVar(&deterministicKeys, "deterministicKeys", false, "Draw from set of netgoal-created accounts using deterministic keys") + runCmd.Flags().Uint32Var(&generatedAccountsCount, "genaccounts", 0, "The total number of accounts pre-generated by netgoal") + runCmd.Flags().StringVar(&generatedAccountSampleMethod, "gensamplemethod", "random", "The method of sampling from the total # of pre-generated accounts") } var runCmd = &cobra.Command{ @@ -155,17 +168,45 @@ var runCmd = &cobra.Command{ } // Prepare configuration + dataDirCfgPath := filepath.Join(ac.DataDir(), pingpong.ConfigFilename) var cfg pingpong.PpConfig - cfgPath := filepath.Join(ac.DataDir(), pingpong.ConfigFilename) - if useDefault { - cfg = pingpong.DefaultConfig + if configPath != "" { + if configPath[0] == '{' { + // json literal as arg + cfg = pingpong.DefaultConfig + lf := strings.NewReader(configPath) + dec := json.NewDecoder(lf) + err = dec.Decode(&cfg) + if err != nil { + reportErrorf("-config: bad config json, %v", err) + } + fmt.Fprintf(os.Stdout, "config from --config:\n") + cfg.Dump(os.Stdout) + } else { + cfg, err = pingpong.LoadConfigFromFile(configPath) + if err != nil { + reportErrorf("%s: bad config json, %v", configPath, err) + } + fmt.Fprintf(os.Stdout, "config from %#v:\n", configPath) + cfg.Dump(os.Stdout) + } } else { - cfg, err = pingpong.LoadConfigFromFile(cfgPath) - if err != nil && !os.IsNotExist(err) { - reportErrorf("Error loading configuration from '%s': %v\n", cfgPath, err) + if useDefault { + cfg = pingpong.DefaultConfig + } else { + cfg, err = pingpong.LoadConfigFromFile(dataDirCfgPath) + if err != nil && !os.IsNotExist(err) { + reportErrorf("Error loading configuration from '%s': %v\n", dataDirCfgPath, err) + } } } + if randSeed == 0 { + rand.Seed(time.Now().UnixNano()) + } else { + rand.Seed(randSeed) + } + if srcAddress != "" { cfg.SrcAccount = srcAddress } @@ -185,10 +226,12 @@ var runCmd = &cobra.Command{ cfg.MinAccountFunds = minAccountFunds } - if txnPerSec == 0 { + if txnPerSec != 0 { + cfg.TxnPerSec = txnPerSec + } + if cfg.TxnPerSec == 0 { reportErrorf("cannot set tps to 0") } - cfg.TxnPerSec = txnPerSec if randomFee { if cfg.MinFee > cfg.MaxFee { @@ -205,15 +248,15 @@ var runCmd = &cobra.Command{ if randomAmount { cfg.RandomizeAmt = true } - cfg.RandomLease = randomLease + cfg.RandomLease = randomLease || cfg.RandomLease if noRandomAmount { if randomAmount { reportErrorf("Error --ra and --nra can't both be specified\n") } cfg.RandomizeAmt = false } - cfg.RandomizeDst = randomDst - cfg.Quiet = quietish + cfg.RandomizeDst = randomDst || cfg.RandomizeDst + cfg.Quiet = quietish || cfg.Quiet if runTime != "" { val, err := strconv.ParseUint(runTime, 10, 32) if err != nil { @@ -274,17 +317,27 @@ var runCmd = &cobra.Command{ reportErrorf("Invalid group size: %v\n", groupSize) } - if numAsset <= 1000 { + if numAsset == 0 { + // nop + } else if numAsset <= 1000 { cfg.NumAsset = numAsset } else { reportErrorf("Invalid number of assets: %d, (valid number: 0 - 1000)\n", numAsset) } - cfg.AppProgOps = appProgOps - cfg.AppProgHashes = appProgHashes - cfg.AppProgHashSize = appProgHashSize + if appProgOps != 0 { + cfg.AppProgOps = appProgOps + } + if appProgHashes != 0 { + cfg.AppProgHashes = appProgHashes + } + if appProgHashSize != "sha256" { + cfg.AppProgHashSize = appProgHashSize + } - if numApp <= 1000 { + if numApp == 0 { + // nop + } else if numApp <= 1000 { cfg.NumApp = numApp } else { reportErrorf("Invalid number of apps: %d, (valid number: 0 - 1000)\n", numApp) @@ -294,7 +347,9 @@ var runCmd = &cobra.Command{ reportErrorf("Cannot opt in %d times of %d total apps\n", numAppOptIn, numApp) } - cfg.NumAppOptIn = numAppOptIn + if numAppOptIn != 0 { + cfg.NumAppOptIn = numAppOptIn + } if appProgGlobKeys > 0 { cfg.AppGlobKeys = appProgGlobKeys @@ -303,10 +358,6 @@ var runCmd = &cobra.Command{ cfg.AppLocalKeys = appProgLocalKeys } - if numAsset != 0 && numApp != 0 { - reportErrorf("only one of numapp and numasset may be specified\n") - } - if rekey { cfg.Rekey = rekey if !cfg.RandomLease && !cfg.RandomNote && !cfg.RandomizeFee && !cfg.RandomizeAmt { @@ -317,7 +368,32 @@ var runCmd = &cobra.Command{ } } - cfg.NftAsaPerSecond = nftAsaPerSecond + if nftAsaPerSecond != 0 { + cfg.NftAsaPerSecond = nftAsaPerSecond + } + + if deterministicKeys && generatedAccountsCount == 0 { + reportErrorf("deterministicKeys requires setting generatedAccountsCount") + } + if !deterministicKeys && generatedAccountsCount > 0 { + reportErrorf("generatedAccountsCount requires deterministicKeys=true") + } + if deterministicKeys && numAccounts > generatedAccountsCount { + reportErrorf("numAccounts must be <= generatedAccountsCount") + } + cfg.DeterministicKeys = deterministicKeys || cfg.DeterministicKeys + if generatedAccountsCount != 0 { + cfg.GeneratedAccountsCount = generatedAccountsCount + } + if generatedAccountSampleMethod != "" { + cfg.GeneratedAccountSampleMethod = generatedAccountSampleMethod + } + + cfg.SetDefaultWeights() + err = cfg.Check() + if err != nil { + reportErrorf("%v", err) + } reportInfof("Preparing to initialize PingPong with config:\n") cfg.Dump(os.Stdout) @@ -325,20 +401,23 @@ var runCmd = &cobra.Command{ pps := pingpong.NewPingpong(cfg) // Initialize accounts if necessary - err = pps.PrepareAccounts(ac) + err = pps.PrepareAccounts(&ac) if err != nil { reportErrorf("Error preparing accounts for transfers: %v\n", err) } if saveConfig { - cfg.Save(cfgPath) + err = cfg.Save(dataDirCfgPath) + if err != nil { + reportErrorf("%s: could not save config, %v\n", dataDirCfgPath, err) + } } reportInfof("Preparing to run PingPong with config:\n") cfg.Dump(os.Stdout) // Kick off the real processing - pps.RunPingPong(context.Background(), ac) + pps.RunPingPong(context.Background(), &ac) }, } diff --git a/libgoal/libgoal.go b/libgoal/libgoal.go index e0910e1758..3ec8cd45c6 100644 --- a/libgoal/libgoal.go +++ b/libgoal/libgoal.go @@ -62,6 +62,10 @@ type Client struct { consensus config.ConsensusProtocols algodVersionAffinity algodclient.APIVersion kmdVersionAffinity kmdclient.APIVersion + + suggestedParamsCache v1.TransactionParams + suggestedParamsExpire time.Time + suggestedParamsMaxAge time.Duration } // ClientConfig is data to configure a Client @@ -513,7 +517,7 @@ func (c *Client) signAndBroadcastTransactionWithWallet(walletHandle, pw []byte, // M | M | error // func (c *Client) ComputeValidityRounds(firstValid, lastValid, validRounds uint64) (first, last, latest uint64, err error) { - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return 0, 0, 0, err } @@ -576,7 +580,7 @@ func (c *Client) ConstructPayment(from, to string, fee, amount uint64, note []by } // Get current round, protocol, genesis ID - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return transactions.Transaction{}, err } @@ -920,6 +924,23 @@ func (c *Client) SuggestedParams() (params v1.TransactionParams, err error) { return } +// SetSuggestedParamsCacheAge sets the maximum age for an internal cached version of SuggestedParams() used internally to many libgoal Client functions. +func (c *Client) SetSuggestedParamsCacheAge(maxAge time.Duration) { + c.suggestedParamsMaxAge = maxAge +} + +func (c *Client) cachedSuggestedParams() (params v1.TransactionParams, err error) { + if c.suggestedParamsMaxAge == 0 || time.Now().After(c.suggestedParamsExpire) { + params, err = c.SuggestedParams() + if err == nil && c.suggestedParamsMaxAge != 0 { + c.suggestedParamsCache = params + c.suggestedParamsExpire = time.Now().Add(c.suggestedParamsMaxAge) + } + return + } + return c.suggestedParamsCache, nil +} + // GetPendingTransactions gets a snapshot of current pending transactions on the node. // If maxTxns = 0, fetches as many transactions as possible. func (c *Client) GetPendingTransactions(maxTxns uint64) (resp v1.PendingTransactions, err error) { diff --git a/libgoal/transactions.go b/libgoal/transactions.go index c28fd0216a..fb788f0249 100644 --- a/libgoal/transactions.go +++ b/libgoal/transactions.go @@ -24,7 +24,7 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/crypto/merklesignature" "github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated" - "github.com/algorand/go-algorand/daemon/algod/api/spec/v1" + v1 "github.com/algorand/go-algorand/daemon/algod/api/spec/v1" "github.com/algorand/go-algorand/data/account" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/transactions" @@ -253,7 +253,7 @@ func generateRegistrationTransaction(part generated.ParticipationKey, fee basics func (c *Client) MakeRegistrationTransactionWithGenesisID(part account.Participation, fee, txnFirstValid, txnLastValid uint64, leaseBytes [32]byte, includeStateProofKeys bool) (transactions.Transaction, error) { // Get current round, protocol, genesis ID - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return transactions.Transaction{}, err } @@ -293,7 +293,7 @@ func (c *Client) MakeUnsignedGoOnlineTx(address string, firstValid, lastValid, f } // Get current round, protocol, genesis ID - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return transactions.Transaction{}, err } @@ -350,7 +350,7 @@ func (c *Client) MakeUnsignedGoOfflineTx(address string, firstValid, lastValid, return transactions.Transaction{}, err } - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return transactions.Transaction{}, err } @@ -405,7 +405,7 @@ func (c *Client) MakeUnsignedBecomeNonparticipatingTx(address string, firstValid return transactions.Transaction{}, err } - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return transactions.Transaction{}, err } @@ -460,7 +460,7 @@ func (c *Client) FillUnsignedTxTemplate(sender string, firstValid, lastValid, fe return transactions.Transaction{}, err } - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return transactions.Transaction{}, err } @@ -637,7 +637,7 @@ func (c *Client) MakeUnsignedAssetCreateTx(total uint64, defaultFrozen bool, man } // Get consensus params so we can get max field lengths - params, err := c.SuggestedParams() + params, err := c.cachedSuggestedParams() if err != nil { return transactions.Transaction{}, err } diff --git a/shared/pingpong/accounts.go b/shared/pingpong/accounts.go index b63214fdd2..63cd17ef9b 100644 --- a/shared/pingpong/accounts.go +++ b/shared/pingpong/accounts.go @@ -17,7 +17,9 @@ package pingpong import ( + "encoding/binary" "fmt" + "log" "math/rand" "os" "path/filepath" @@ -33,14 +35,61 @@ import ( "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/libgoal" - "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" ) -func (pps *WorkerState) ensureAccounts(ac libgoal.Client, initCfg PpConfig) (accounts map[string]*pingPongAccount, cfg PpConfig, err error) { - accounts = make(map[string]*pingPongAccount) - cfg = initCfg +func deterministicAccounts(initCfg PpConfig) <-chan *crypto.SignatureSecrets { + out := make(chan *crypto.SignatureSecrets) + if initCfg.GeneratedAccountSampleMethod == "" || initCfg.GeneratedAccountSampleMethod == "random" { + go randomDeterministicAccounts(initCfg, out) + } else if initCfg.GeneratedAccountSampleMethod == "sequential" { + go sequentialDeterministicAccounts(initCfg, out) + } + return out +} +func randomDeterministicAccounts(initCfg PpConfig, out chan *crypto.SignatureSecrets) { + numAccounts := initCfg.NumPartAccounts + totalAccounts := initCfg.GeneratedAccountsCount + if totalAccounts < numAccounts*4 { + // simpler rand strategy for smaller totalAccounts + order := rand.Perm(int(totalAccounts))[:numAccounts] + for _, acct := range order { + var seed crypto.Seed + binary.LittleEndian.PutUint64(seed[:], uint64(acct)) + out <- crypto.GenerateSignatureSecrets(seed) + } + } else { + // randomly select numAccounts from generatedAccountsCount + // better for generatedAccountsCount much bigger than numAccounts + selected := make(map[uint32]bool, numAccounts) + for uint32(len(selected)) < numAccounts { + acct := uint32(rand.Int31n(int32(totalAccounts))) + if selected[acct] { + continue // already picked this account + } + // generate deterministic secret key from integer ID + // same uint64 seed used as netdeploy/remote/deployedNetwork.go + var seed crypto.Seed + binary.LittleEndian.PutUint64(seed[:], uint64(acct)) + out <- crypto.GenerateSignatureSecrets(seed) + selected[acct] = true + } + } + close(out) +} + +func sequentialDeterministicAccounts(initCfg PpConfig, out chan *crypto.SignatureSecrets) { + for i := uint32(0); i < initCfg.NumPartAccounts; i++ { + acct := uint64(i) + uint64(initCfg.GeneratedAccountsOffset) + var seed crypto.Seed + binary.LittleEndian.PutUint64(seed[:], uint64(acct)) + out <- crypto.GenerateSignatureSecrets(seed) + } +} + +// load accounts from ${ALGORAND_DATA}/${netname}-${version}/*.rootkey +func fileAccounts(ac *libgoal.Client) (out <-chan *crypto.SignatureSecrets, err error) { genID, err2 := ac.GenesisID() if err2 != nil { err = err2 @@ -53,10 +102,12 @@ func (pps *WorkerState) ensureAccounts(ac libgoal.Client, initCfg PpConfig) (acc return } - var srcAcctPresent bool - var richestAccount string - var richestBalance uint64 + ch := make(chan *crypto.SignatureSecrets) + go enumerateFileAccounts(files, genesisDir, ch) + return ch, nil +} +func enumerateFileAccounts(files []os.DirEntry, genesisDir string, out chan<- *crypto.SignatureSecrets) { for _, info := range files { var handle db.Accessor @@ -66,7 +117,7 @@ func (pps *WorkerState) ensureAccounts(ac libgoal.Client, initCfg PpConfig) (acc } // Fetch a handle to this database - handle, err = db.MakeErasableAccessor(filepath.Join(genesisDir, info.Name())) + handle, err := db.MakeErasableAccessor(filepath.Join(genesisDir, info.Name())) if err != nil { // Couldn't open it, skip it continue @@ -80,339 +131,301 @@ func (pps *WorkerState) ensureAccounts(ac libgoal.Client, initCfg PpConfig) (acc continue } - publicKey := root.Secrets().SignatureVerifier - accountAddress := basics.Address(publicKey) + out <- root.Secrets() + } + close(out) +} - if accountAddress.String() == cfg.SrcAccount { - srcAcctPresent = true - } +func (pps *WorkerState) ensureAccounts(ac *libgoal.Client) (err error) { + if pps.accounts == nil { + pps.accounts = make(map[string]*pingPongAccount) + } - amt, err := ac.GetBalance(accountAddress.String()) - if err != nil { - return nil, PpConfig{}, err - } + if pps.cinfo.OptIns == nil { + pps.cinfo.OptIns = make(map[uint64][]string, pps.cfg.NumAsset+pps.cfg.NumApp) + } + if pps.cinfo.AssetParams == nil { + pps.cinfo.AssetParams = make(map[uint64]v1.AssetParams, pps.cfg.NumAsset) + } + if pps.cinfo.AppParams == nil { + pps.cinfo.AppParams = make(map[uint64]v1.AppParams, pps.cfg.NumApp) + } - if !srcAcctPresent && amt > richestBalance { - richestAccount = accountAddress.String() - richestBalance = amt - } + sources := make([]<-chan *crypto.SignatureSecrets, 0, 2) + // read file accounts for local big source money + var fileSource <-chan *crypto.SignatureSecrets + fileSource, err = fileAccounts(ac) + if err != nil { + return + } + sources = append(sources, fileSource) + if pps.cfg.DeterministicKeys { + // add deterministic key accounts for re-use across runs + detSource := deterministicAccounts(pps.cfg) + sources = append(sources, detSource) + } - if !initCfg.Quiet { - fmt.Printf("Found local account: %s -> %v\n", accountAddress.String(), amt) - } + var srcAcctPresent bool + var richestAccount string + var richestBalance uint64 + + for _, source := range sources { + for secret := range source { + publicKey := secret.SignatureVerifier + accountAddress := basics.Address(publicKey) + addr := accountAddress.String() + + if addr == pps.cfg.SrcAccount { + srcAcctPresent = true + } + + // TODO: switch to v2 API + //ai, err := ac.AccountInformationV2(addr, false) + ai, err := ac.AccountInformation(addr) + if err != nil { + return err + } + amt := ai.Amount + + if !srcAcctPresent && amt > richestBalance { + richestAccount = addr + richestBalance = amt + } - accounts[accountAddress.String()] = &pingPongAccount{ - balance: amt, - sk: root.Secrets(), - pk: accountAddress, + ppa := &pingPongAccount{ + balance: amt, + sk: secret, + pk: accountAddress, + } + + pps.integrateAccountInfo(addr, ppa, ai) + + if !pps.cfg.Quiet { + fmt.Printf("Found local account: %s\n", ppa.String()) + } + + pps.accounts[addr] = ppa } } if !srcAcctPresent { - if cfg.SrcAccount != "" { - err = fmt.Errorf("specified Source Account '%s' not found", cfg.SrcAccount) + if pps.cfg.SrcAccount != "" { + err = fmt.Errorf("specified Source Account '%s' not found", pps.cfg.SrcAccount) return } - if richestBalance >= cfg.MinAccountFunds { - cfg.SrcAccount = richestAccount + if richestBalance >= pps.cfg.MinAccountFunds { + pps.cfg.SrcAccount = richestAccount fmt.Printf("Identified richest account to use for Source Account: %s -> %v\n", richestAccount, richestBalance) } else { - err = fmt.Errorf("no accounts found with sufficient stake (> %d)", cfg.MinAccountFunds) + err = fmt.Errorf("no accounts found with sufficient stake (> %d)", pps.cfg.MinAccountFunds) return } } else { - fmt.Printf("Located Source Account: %s -> %v\n", cfg.SrcAccount, accounts[cfg.SrcAccount]) + fmt.Printf("Located Source Account: %s -> %v\n", pps.cfg.SrcAccount, pps.accounts[pps.cfg.SrcAccount]) } return } -// Prepare assets for asset transaction testing -// Step 1) Create X assets for each of the participant accounts -// Step 2) For each participant account, opt-in to assets of all other participant accounts -// Step 3) Evenly distribute the assets across all participant accounts -func (pps *WorkerState) prepareAssets(accounts map[string]*pingPongAccount, client libgoal.Client) (resultAssetMaps map[uint64]v1.AssetParams, optIns map[uint64][]string, err error) { - proto, err := getProto(client) - if err != nil { - return +func (pps *WorkerState) integrateAccountInfo(addr string, ppa *pingPongAccount, ai v1.Account) { + ppa.balance = ai.Amount + // assets this account has created + for assetID, ap := range ai.AssetParams { + pps.cinfo.OptIns[assetID] = uniqueAppend(pps.cinfo.OptIns[assetID], addr) + pps.cinfo.AssetParams[assetID] = ap } + // assets held + for assetID, holding := range ai.Assets { + pps.cinfo.OptIns[assetID] = uniqueAppend(pps.cinfo.OptIns[assetID], addr) + if ppa.holdings == nil { + ppa.holdings = make(map[uint64]uint64) + } + ppa.holdings[assetID] = holding.Amount + } + // apps created by this account + for appID, ap := range ai.AppParams { + pps.cinfo.OptIns[appID] = uniqueAppend(pps.cinfo.OptIns[appID], addr) + pps.cinfo.AppParams[appID] = ap + } + // apps opted into + for appID := range ai.AppLocalStates { + pps.cinfo.OptIns[appID] = uniqueAppend(pps.cinfo.OptIns[appID], addr) + } +} - resultAssetMaps = make(map[uint64]v1.AssetParams) +type assetopti struct { + assetID uint64 + params v1.AssetParams // TODO: switch to v2 API + optins []string // addr strings +} - // optIns contains own and explicitly opted-in assets - optIns = make(map[uint64][]string) - numCreatedAssetsByAddr := make(map[string]int, len(accounts)) +type assetSet []assetopti - nextSendTime := time.Now() +// Len is part of sort.Interface +func (as *assetSet) Len() int { + return len(*as) +} - // 1) Create X assets for each of the participant accounts - for addr := range accounts { - if addr == pps.cfg.SrcAccount { - continue - } - addrAccount, addrErr := client.AccountInformation(addr) - if addrErr != nil { - fmt.Printf("Cannot lookup source account %v\n", addr) - err = addrErr - return - } +// Less is part of sort.Interface +// This is a reversed sort, higher values first +func (as *assetSet) Less(a, b int) bool { + return len((*as)[a].optins) > len((*as)[b].optins) +} - toCreate := int(pps.cfg.NumAsset) - len(addrAccount.AssetParams) - numCreatedAssetsByAddr[addr] = toCreate +// Swap is part of sort.Interface +func (as *assetSet) Swap(a, b int) { + t := (*as)[a] + (*as)[a] = (*as)[b] + (*as)[b] = t +} - fmt.Printf("Creating %v create asset transaction for account %v \n", toCreate, addr) - fmt.Printf("cfg.NumAsset %v, addrAccount.AssetParams %v\n", pps.cfg.NumAsset, addrAccount.AssetParams) +func (pps *WorkerState) prepareAssets(client *libgoal.Client) (err error) { + if pps.cinfo.AssetParams == nil { + pps.cinfo.AssetParams = make(map[uint64]v1.AssetParams) + } + if pps.cinfo.OptIns == nil { + pps.cinfo.OptIns = make(map[uint64][]string) + } - totalSupply := pps.cfg.MinAccountAsset * uint64(pps.cfg.NumPartAccounts) * 9 * uint64(pps.cfg.GroupSize) * uint64(pps.cfg.RefreshTime.Seconds()) / pps.cfg.TxnPerSec + // create new assets as needed + err = pps.makeNewAssets(client) + if err != nil { + return + } - // create assets in participant account - for i := 0; i < toCreate; i++ { - var metaLen = 32 - meta := make([]byte, metaLen) - crypto.RandBytes(meta[:]) + // find the most-opted-in assets to work with + assets := make([]assetopti, len(pps.cinfo.AssetParams)) + pos := 0 + for assetID, params := range pps.cinfo.AssetParams { + assets[pos].assetID = assetID + assets[pos].params = params + assets[pos].optins = pps.cinfo.OptIns[assetID] + pos++ + } + ta := assetSet(assets) + sort.Sort(&ta) + if len(assets) > int(pps.cfg.NumAsset) { + assets = assets[:pps.cfg.NumAsset] + nap := make(map[uint64]v1.AssetParams, pps.cfg.NumAsset) + for _, asset := range assets { + nap[asset.assetID] = asset.params + } + pps.cinfo.AssetParams = nap + } + + // opt-in more accounts as needed + for assetID := range pps.cinfo.AssetParams { + for addr, acct := range pps.accounts { + _, has := acct.holdings[assetID] + if !has { + tx, sendErr := client.MakeUnsignedAssetSendTx(assetID, 0, addr, "", "") + if sendErr != nil { + fmt.Printf("Cannot initiate asset optin %v in account %v\n", assetID, addr) + err = sendErr + continue + } - if totalSupply < pps.cfg.MinAccountAsset { // overflow - fmt.Printf("Too many NumPartAccounts\n") - return - } - assetName := fmt.Sprintf("pong%d", i) - if !pps.cfg.Quiet { - fmt.Printf("Creating asset %s\n", assetName) - } - tx, createErr := client.MakeUnsignedAssetCreateTx(totalSupply, false, addr, addr, addr, addr, "ping", assetName, "", meta, 0) - if createErr != nil { - fmt.Printf("Cannot make asset create txn with meta %v\n", meta) - err = createErr - return - } - tx, err = client.FillUnsignedTxTemplate(addr, 0, 0, pps.cfg.MaxFee, tx) - if err != nil { - fmt.Printf("Cannot fill asset creation txn\n") - return - } - tx.Note = pps.makeNextUniqueNoteField() - schedule(pps.cfg.TxnPerSec, &nextSendTime) - _, err = signAndBroadcastTransaction(accounts[addr], tx, client) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset creation failed with error %v\n", err) - return - } - } - } + tx, err = client.FillUnsignedTxTemplate(addr, 0, 0, pps.cfg.MaxFee, tx) + if err != nil { + fmt.Printf("Cannot fill asset optin %v in account %v\n", assetID, addr) + continue + } + tx.Note = pps.makeNextUniqueNoteField() - // wait until all the assets created - allAssets := make(map[uint64]string, int(pps.cfg.NumAsset)*len(accounts)) - for addr := range accounts { - if addr == pps.cfg.SrcAccount { - continue - } - var account v1.Account - deadline := time.Now().Add(3 * time.Minute) - for { - account, err = client.AccountInformation(addr) - if err != nil { - fmt.Printf("Warning: cannot lookup source account after assets creation") - time.Sleep(1 * time.Second) - continue - } - if len(account.AssetParams) >= numCreatedAssetsByAddr[addr] { - break - } - if time.Now().After(deadline) { - err = fmt.Errorf("asset creation took too long") - fmt.Printf("Error: %s\n", err.Error()) - return + pps.schedule(1) + _, err = signAndBroadcastTransaction(acct, tx, client) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset optin failed with error %v\n", err) + continue + } + pps.cinfo.OptIns[assetID] = uniqueAppend(pps.cinfo.OptIns[assetID], addr) } - waitForNextRoundOrSleep(client, 500*time.Millisecond) - } - assetParams := account.AssetParams - if !pps.cfg.Quiet { - fmt.Printf("Configured %d assets %+v\n", len(assetParams), assetParams) - } - // add own asset to opt-ins since asset creators are auto-opted in - for k := range account.AssetParams { - optIns[k] = append(optIns[k], addr) - allAssets[k] = addr } } - // optInsByAddr tracks only explicitly opted-in assetsA - optInsByAddr := make(map[string]map[uint64]bool) + // Could distribute value here, but just waits till constructAssetTxn() + return +} - // 2) For each participant account, opt-in up to proto.MaxAssetsPerAccount assets of all other participant accounts - for addr := range accounts { - if addr == pps.cfg.SrcAccount { - continue +const totalSupply = 10_000_000_000_000_000 + +func (pps *WorkerState) makeNewAssets(client *libgoal.Client) (err error) { + if len(pps.cinfo.AssetParams) >= int(pps.cfg.NumAsset) { + return + } + assetsNeeded := int(pps.cfg.NumAsset) - len(pps.cinfo.AssetParams) + newAssetAddrs := make(map[string]*pingPongAccount, assetsNeeded) + for addr, acct := range pps.accounts { + if assetsNeeded <= 0 { + break } + assetsNeeded-- + var meta [32]byte + crypto.RandBytes(meta[:]) + assetName := fmt.Sprintf("pong%d_%d", len(pps.cinfo.AssetParams), rand.Intn(8999)+1000) if !pps.cfg.Quiet { - fmt.Printf("Opting to account %v\n", addr) + fmt.Printf("Creating asset %s\n", assetName) } - - acct, addrErr := client.AccountInformation(addr) - if addrErr != nil { - fmt.Printf("Cannot lookup optin account\n") - err = addrErr + tx, createErr := client.MakeUnsignedAssetCreateTx(totalSupply, false, addr, addr, addr, addr, "ping", assetName, "", meta[:], 0) + if createErr != nil { + fmt.Printf("Cannot make asset create txn with meta %v\n", meta) + err = createErr return } - maxAssetsPerAccount := proto.MaxAssetsPerAccount - // TODO : given that we've added unlimited asset support, we should revise this - // code so that we'll have control on how many asset/account we want to create. - // for now, I'm going to keep the previous max values until we have refactored this code. - if maxAssetsPerAccount == 0 { - maxAssetsPerAccount = config.Consensus[protocol.ConsensusV30].MaxAssetsPerAccount - } - numSlots := maxAssetsPerAccount - len(acct.Assets) - optInsByAddr[addr] = make(map[uint64]bool) - for k, creator := range allAssets { - if creator == addr { - continue - } - // do we have any more asset slots for this? - if numSlots <= 0 { - break - } - numSlots-- - - // opt-in asset k for addr - tx, sendErr := client.MakeUnsignedAssetSendTx(k, 0, addr, "", "") - if sendErr != nil { - fmt.Printf("Cannot initiate asset optin %v in account %v\n", k, addr) - err = sendErr - return - } - - tx, err = client.FillUnsignedTxTemplate(addr, 0, 0, pps.cfg.MaxFee, tx) - if err != nil { - fmt.Printf("Cannot fill asset optin %v in account %v\n", k, addr) - return - } - tx.Note = pps.makeNextUniqueNoteField() - - schedule(pps.cfg.TxnPerSec, &nextSendTime) - _, err = signAndBroadcastTransaction(accounts[addr], tx, client) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset optin failed with error %v\n", err) - return - } - optIns[k] = append(optIns[k], addr) - optInsByAddr[addr][k] = true + tx, err = client.FillUnsignedTxTemplate(addr, 0, 0, pps.cfg.MaxFee, tx) + if err != nil { + fmt.Printf("Cannot fill asset creation txn\n") + return } - } - - // wait until all opt-ins completed - waitForNextRoundOrSleep(client, 500*time.Millisecond) - for addr := range accounts { - if addr == pps.cfg.SrcAccount { - continue + tx.Note = pps.makeNextUniqueNoteField() + pps.schedule(1) + _, err = signAndBroadcastTransaction(pps.accounts[addr], tx, client) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset creation failed with error %v\n", err) + return } - expectedAssets := numCreatedAssetsByAddr[addr] + len(optInsByAddr[addr]) - var account v1.Account - deadline := time.Now().Add(3 * time.Minute) - for { - account, err = client.AccountInformation(addr) + newAssetAddrs[addr] = acct + } + // wait for new assets to be created, fetch account data for them + newAssets := make(map[uint64]v1.AssetParams, assetsNeeded) + timeout := time.Now().Add(10 * time.Second) + for len(newAssets) < assetsNeeded { + for addr, acct := range newAssetAddrs { + // TODO: switch to v2 API + ai, err := client.AccountInformation(addr) if err != nil { - fmt.Printf("Warning: cannot lookup source account after assets opt in") + fmt.Printf("Warning: cannot lookup source account after assets creation") time.Sleep(1 * time.Second) continue } - if len(account.Assets) == expectedAssets { - break - } else if len(account.Assets) > expectedAssets { - err = fmt.Errorf("account %v has too many assets %d > %d ", addr, len(account.Assets), expectedAssets) - return - } - - if time.Now().After(deadline) { - err = fmt.Errorf("asset opting in took too long") - fmt.Printf("Error: %s\n", err.Error()) - return - } - waitForNextRoundOrSleep(client, 500*time.Millisecond) - } - } - - // Step 3) Evenly distribute the assets across all opted-in accounts - for k, creator := range allAssets { - if !pps.cfg.Quiet { - fmt.Printf("Distributing asset %+v from account %v\n", k, creator) - } - creatorAccount, creatorErr := client.AccountInformation(creator) - if creatorErr != nil { - fmt.Printf("Cannot lookup source account\n") - err = creatorErr - return - } - assetParams := creatorAccount.AssetParams - - for _, addr := range optIns[k] { - assetAmt := assetParams[k].Total / uint64(len(optIns[k])) - if !pps.cfg.Quiet { - fmt.Printf("Distributing assets from %v to %v \n", creator, addr) - } - - tx, sendErr := client.MakeUnsignedAssetSendTx(k, assetAmt, addr, "", "") - if sendErr != nil { - _, _ = fmt.Fprintf(os.Stdout, "error making unsigned asset send tx %v\n", sendErr) - err = fmt.Errorf("error making unsigned asset send tx : %w", sendErr) - return - } - tx.Note = pps.makeNextUniqueNoteField() - tx, sendErr = client.FillUnsignedTxTemplate(creator, 0, 0, pps.cfg.MaxFee, tx) - if sendErr != nil { - _, _ = fmt.Fprintf(os.Stdout, "error making unsigned asset send tx %v\n", sendErr) - err = fmt.Errorf("error making unsigned asset send tx : %w", sendErr) - return - } - tx.LastValid = tx.FirstValid + 5 - if pps.cfg.MaxFee == 0 { - var suggestedFee uint64 - suggestedFee, err = client.SuggestedFee() - if err != nil { - _, _ = fmt.Fprintf(os.Stdout, "error retrieving suggestedFee: %v\n", err) - return - } - if suggestedFee > tx.Fee.Raw { - tx.Fee.Raw = suggestedFee + for assetID, ap := range ai.AssetParams { + pps.cinfo.OptIns[assetID] = uniqueAppend(pps.cinfo.OptIns[assetID], addr) + _, has := pps.cinfo.AssetParams[assetID] + if !has { + newAssets[assetID] = ap } } - - schedule(pps.cfg.TxnPerSec, &nextSendTime) - _, err = signAndBroadcastTransaction(accounts[creator], tx, client) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "signing and broadcasting asset distribution failed with error %v\n", err) - return + for assetID, holding := range ai.Assets { + pps.cinfo.OptIns[assetID] = uniqueAppend(pps.cinfo.OptIns[assetID], addr) + acct.holdings[assetID] = holding.Amount } } - // append the asset to the result assets - resultAssetMaps[k] = assetParams[k] - } - - // wait for all transfers acceptance - waitForNextRoundOrSleep(client, 500*time.Millisecond) - deadline := time.Now().Add(3 * time.Minute) - var pending v1.PendingTransactions - for { - pending, err = client.GetPendingTransactions(100) - if err != nil { - fmt.Printf("Warning: cannot get pending txn") - time.Sleep(1 * time.Second) - continue - } - if pending.TotalTxns == 0 { + if time.Now().After(timeout) { + // complain, but try to keep running on what assets we have + log.Printf("WARNING took too long to create new assets") + // TODO: error? break } - if time.Now().After(deadline) { - fmt.Printf("Warning: assets distribution took too long") - break - } - waitForNextRoundOrSleep(client, 500*time.Millisecond) } - return + for assetID, ap := range newAssets { + pps.cinfo.AssetParams[assetID] = ap + } + return nil } -func signAndBroadcastTransaction(senderAccount *pingPongAccount, tx transactions.Transaction, client libgoal.Client) (txID string, err error) { +func signAndBroadcastTransaction(senderAccount *pingPongAccount, tx transactions.Transaction, client *libgoal.Client) (txID string, err error) { signedTx := tx.Sign(senderAccount.sk) txID, err = client.BroadcastTransaction(signedTx) if err != nil { @@ -580,7 +593,7 @@ func genAppProgram(numOps uint32, numHashes uint32, hashSize string, numGlobalKe return ops.Program, progAsm } -func waitForNextRoundOrSleep(client libgoal.Client, waitTime time.Duration) { +func waitForNextRoundOrSleep(client *libgoal.Client, waitTime time.Duration) { status, err := client.Status() if err == nil { status, err = client.WaitForRound(status.LastRound) @@ -591,7 +604,7 @@ func waitForNextRoundOrSleep(client libgoal.Client, waitTime time.Duration) { time.Sleep(waitTime) } -func (pps *WorkerState) sendAsGroup(txgroup []transactions.Transaction, client libgoal.Client, senders []string) (err error) { +func (pps *WorkerState) sendAsGroup(txgroup []transactions.Transaction, client *libgoal.Client, senders []string) (err error) { if len(txgroup) == 0 { err = fmt.Errorf("sendAsGroup: empty group") return @@ -623,7 +636,7 @@ repeat: var proto *config.ConsensusParams -func getProto(client libgoal.Client) (config.ConsensusParams, error) { +func getProto(client *libgoal.Client) (config.ConsensusParams, error) { if proto == nil { var err error status, err := client.Status() @@ -640,207 +653,136 @@ func getProto(client libgoal.Client) (config.ConsensusParams, error) { return *proto, nil } -func (pps *WorkerState) prepareApps(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) (appParams map[uint64]v1.AppParams, optIns map[uint64][]string, err error) { - proto, err := getProto(client) - if err != nil { - return +// ensure that cfg.NumPartAccounts have cfg.NumAppOptIn opted in selecting from cfg.NumApp +func (pps *WorkerState) prepareApps(client *libgoal.Client) (err error) { + if pps.cinfo.AppParams == nil { + pps.cinfo.AppParams = make(map[uint64]v1.AppParams) } - toCreate := int(cfg.NumApp) - appsPerAcct := proto.MaxAppsCreated - // TODO : given that we've added unlimited app support, we should revise this - // code so that we'll have control on how many app/account we want to create. - // for now, I'm going to keep the previous max values until we have refactored this code. - if appsPerAcct == 0 { - appsPerAcct = config.Consensus[protocol.ConsensusV30].MaxAppsCreated + if pps.cinfo.OptIns == nil { + pps.cinfo.OptIns = make(map[uint64][]string, pps.cfg.NumAsset+pps.cfg.NumApp) } - // create min(groupSize, maxAppsPerAcct) per account to optimize sending in batches - groupSize := proto.MaxTxGroupSize - if appsPerAcct > groupSize { - appsPerAcct = groupSize - } - - acctNeeded := toCreate / appsPerAcct - if toCreate%appsPerAcct != 0 { - acctNeeded++ - } - if acctNeeded >= len(accounts) { // >= because cfg.SrcAccount is skipped - err = fmt.Errorf("need %d accts to create %d apps but got only %d accts", acctNeeded, toCreate, len(accounts)) - return - } - maxOptIn := uint32(config.Consensus[protocol.ConsensusCurrentVersion].MaxAppsOptedIn) - if maxOptIn > 0 && cfg.NumAppOptIn > maxOptIn { - err = fmt.Errorf("each acct can only opt in to %d but %d requested", maxOptIn, cfg.NumAppOptIn) - return - } - - appAccounts := make([]v1.Account, len(accounts)) - accountsCount := 0 - for acctAddr := range accounts { - if acctAddr == cfg.SrcAccount { - continue + // generate new apps + var txgroup []transactions.Transaction + var senders []string + for addr, acct := range pps.accounts { + if len(pps.cinfo.AppParams) >= int(pps.cfg.NumApp) { + break } - appAccounts[accountsCount], err = client.AccountInformation(acctAddr) + var tx transactions.Transaction + tx, err = pps.newApp(addr, client) if err != nil { - fmt.Printf("Warning, cannot lookup acctAddr account %s", acctAddr) return } - accountsCount++ - if accountsCount == acctNeeded { - break + acct.addBalance(-int64(pps.cfg.MaxFee)) + txgroup = append(txgroup, tx) + senders = append(senders, addr) + if len(txgroup) == int(pps.cfg.GroupSize) { + pps.schedule(len(txgroup)) + err = pps.sendAsGroup(txgroup, client, senders) + if err != nil { + return + } + txgroup = txgroup[:0] + senders = senders[:0] } } - appAccounts = appAccounts[:accountsCount] - - if !cfg.Quiet { - fmt.Printf("Selected temp account:\n") - for _, acct := range appAccounts { - fmt.Printf("%s\n", acct.Address) + if len(txgroup) > 0 { + pps.schedule(len(txgroup)) + err = pps.sendAsGroup(txgroup, client, senders) + if err != nil { + return } + txgroup = txgroup[:0] + senders = senders[:0] } - // generate app program with roughly some number of operations - prog, asm := genAppProgram(cfg.AppProgOps, cfg.AppProgHashes, cfg.AppProgHashSize, cfg.AppGlobKeys, cfg.AppLocalKeys) - if !cfg.Quiet { - fmt.Printf("generated program: \n%s\n", asm) - } - globSchema := basics.StateSchema{NumByteSlice: proto.MaxGlobalSchemaEntries} - locSchema := basics.StateSchema{NumByteSlice: proto.MaxLocalSchemaEntries} - - // for each account, store the number of expected applications. - accountsApplicationCount := make(map[string]int) - - // create apps - for idx, appAccount := range appAccounts { - begin := idx * appsPerAcct - end := (idx + 1) * appsPerAcct - if end > toCreate { - end = toCreate - } - - var txgroup []transactions.Transaction - var senders []string - for i := begin; i < end; i++ { + // opt-in more accounts to apps + acctPerApp := (pps.cfg.NumAppOptIn * pps.cfg.NumPartAccounts) / pps.cfg.NumApp + for appid := range pps.cinfo.AppParams { + optins := pps.cinfo.OptIns[appid] + for addr, acct := range pps.accounts { + if len(optins) >= int(acctPerApp) { + break + } + // opt-in the account to the app var tx transactions.Transaction - - tx, err = client.MakeUnsignedAppCreateTx(transactions.NoOpOC, prog, prog, globSchema, locSchema, nil, nil, nil, nil, 0) + tx, err = pps.appOptIn(addr, appid, client) if err != nil { - fmt.Printf("Cannot create app txn\n") - panic(err) - // TODO : if we fail here for too long, we should re-create new accounts, etc. + return } - - tx, err = client.FillUnsignedTxTemplate(appAccount.Address, 0, 0, cfg.MaxFee, tx) - if err != nil { - fmt.Printf("Cannot fill app creation txn\n") - panic(err) - // TODO : if we fail here for too long, we should re-create new accounts, etc. + acct.addBalance(-int64(pps.cfg.MaxFee)) + txgroup = append(txgroup, tx) + senders = append(senders, addr) + if len(txgroup) == int(pps.cfg.GroupSize) { + pps.schedule(len(txgroup)) + err = pps.sendAsGroup(txgroup, client, senders) + if err != nil { + return + } + txgroup = txgroup[:0] + senders = senders[:0] } - // Ensure different txids - tx.Note = pps.makeNextUniqueNoteField() - - txgroup = append(txgroup, tx) - accounts[appAccount.Address].addBalance(-int64(tx.Fee.Raw)) - senders = append(senders, appAccount.Address) - accountsApplicationCount[appAccount.Address]++ } - + } + if len(txgroup) > 0 { + pps.schedule(len(txgroup)) err = pps.sendAsGroup(txgroup, client, senders) if err != nil { - balance, err2 := client.GetBalance(appAccount.Address) - if err2 == nil { - fmt.Printf("account %v balance is %d, logged balance is %d\n", appAccount.Address, balance, accounts[appAccount.Address].getBalance()) - } else { - fmt.Printf("account %v balance cannot be determined : %v\n", appAccount.Address, err2) - } return } - if !cfg.Quiet { - fmt.Printf("Created new %d apps\n", len(txgroup)) - } + //txgroup = txgroup[:0] + //senders = senders[:0] } + return +} - // get these apps - var aidxs []uint64 - appParams = make(map[uint64]v1.AppParams) - for _, appAccount := range appAccounts { - var account v1.Account - for { - account, err = client.AccountInformation(appAccount.Address) - if err != nil { - fmt.Printf("Warning, cannot lookup source account") - return - } - if len(account.AppParams) >= accountsApplicationCount[appAccount.Address] { - break - } - waitForNextRoundOrSleep(client, 500*time.Millisecond) - // TODO : if we fail here for too long, we should re-create new accounts, etc. - } - for idx, v := range account.AppParams { - appParams[idx] = v - aidxs = append(aidxs, idx) - } +func (pps *WorkerState) newApp(addr string, client *libgoal.Client) (tx transactions.Transaction, err error) { + // generate app program with roughly some number of operations + prog, asm := genAppProgram(pps.cfg.AppProgOps, pps.cfg.AppProgHashes, pps.cfg.AppProgHashSize, pps.cfg.AppGlobKeys, pps.cfg.AppLocalKeys) + if !pps.cfg.Quiet { + fmt.Printf("generated program: \n%s\n", asm) } - if len(aidxs) != len(appParams) { - err = fmt.Errorf("duplicates in aidxs, %d != %d", len(aidxs), len(appParams)) - return + globSchema := basics.StateSchema{NumByteSlice: proto.MaxGlobalSchemaEntries} + locSchema := basics.StateSchema{NumByteSlice: proto.MaxLocalSchemaEntries} + + tx, err = client.MakeUnsignedAppCreateTx(transactions.NoOpOC, prog, prog, globSchema, locSchema, nil, nil, nil, nil, 0) + if err != nil { + fmt.Printf("Cannot create app txn\n") + panic(err) + // TODO : if we fail here for too long, we should re-create new accounts, etc. } - // time to opt in to these apps - if cfg.NumAppOptIn > 0 { - optIns = make(map[uint64][]string) - for addr := range accounts { - if addr == cfg.SrcAccount { - continue - } - var txgroup []transactions.Transaction - var senders []string - permAppIndices := rand.Perm(len(aidxs)) - for i := uint32(0); i < cfg.NumAppOptIn; i++ { - j := permAppIndices[i] - aidx := aidxs[j] - var tx transactions.Transaction - tx, err = client.MakeUnsignedAppOptInTx(aidx, nil, nil, nil, nil) - if err != nil { - fmt.Printf("Cannot create app txn\n") - panic(err) - } + tx, err = client.FillUnsignedTxTemplate(addr, 0, 0, pps.cfg.MaxFee, tx) + if err != nil { + fmt.Printf("Cannot fill app creation txn\n") + panic(err) + // TODO : if we fail here for too long, we should re-create new accounts, etc. + } - tx, err = client.FillUnsignedTxTemplate(addr, 0, 0, cfg.MaxFee, tx) - if err != nil { - fmt.Printf("Cannot fill app creation txn\n") - panic(err) - } + // Ensure different txids + tx.Note = pps.makeNextUniqueNoteField() - // Ensure different txids - tx.Note = pps.makeNextUniqueNoteField() + return tx, err +} - optIns[aidx] = append(optIns[aidx], addr) - - txgroup = append(txgroup, tx) - senders = append(senders, addr) - if len(txgroup) == groupSize { - err = pps.sendAsGroup(txgroup, client, senders) - if err != nil { - return - } - txgroup = txgroup[:0] - senders = senders[:0] - } - } - // broadcast leftovers - if len(txgroup) > 0 { - err = pps.sendAsGroup(txgroup, client, senders) - if err != nil { - return - } - } - } +func (pps *WorkerState) appOptIn(addr string, appID uint64, client *libgoal.Client) (tx transactions.Transaction, err error) { + tx, err = client.MakeUnsignedAppOptInTx(appID, nil, nil, nil, nil) + if err != nil { + fmt.Printf("Cannot create app txn\n") + panic(err) } + tx, err = client.FillUnsignedTxTemplate(addr, 0, 0, pps.cfg.MaxFee, tx) + if err != nil { + fmt.Printf("Cannot fill app creation txn\n") + panic(err) + } + + // Ensure different txids + tx.Note = pps.makeNextUniqueNoteField() return } @@ -871,17 +813,28 @@ func takeTopAccounts(allAccounts map[string]*pingPongAccount, numAccounts uint32 return } -func generateAccounts(allAccounts map[string]*pingPongAccount, numAccounts uint32) { +// generate random ephemeral accounts +// TODO: don't do this and _always_ use the deterministic account mechanism? +func (pps *WorkerState) generateAccounts() { var seed crypto.Seed - for accountsRequired := int(numAccounts+1) - len(allAccounts); accountsRequired > 0; accountsRequired-- { + for accountsRequired := int(pps.cfg.NumPartAccounts+1) - len(pps.accounts); accountsRequired > 0; accountsRequired-- { crypto.RandBytes(seed[:]) privateKey := crypto.GenerateSignatureSecrets(seed) publicKey := basics.Address(privateKey.SignatureVerifier) - allAccounts[publicKey.String()] = &pingPongAccount{ + pps.accounts[publicKey.String()] = &pingPongAccount{ sk: privateKey, pk: publicKey, } } } + +func uniqueAppend(they []string, x string) []string { + for _, v := range they { + if v == x { + return they + } + } + return append(they, x) +} diff --git a/shared/pingpong/accounts_test.go b/shared/pingpong/accounts_test.go new file mode 100644 index 0000000000..7f2f0a737b --- /dev/null +++ b/shared/pingpong/accounts_test.go @@ -0,0 +1,60 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package pingpong + +import ( + "encoding/binary" + "testing" + + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/stretchr/testify/assert" +) + +func makeKeyFromSeed(i uint64) *crypto.SignatureSecrets { + var seed crypto.Seed + binary.LittleEndian.PutUint64(seed[:], i) + s := crypto.GenerateSignatureSecrets(seed) + return s +} + +func TestDeterministicAccounts(t *testing.T) { + initCfg := PpConfig{ + NumPartAccounts: 20, + DeterministicKeys: true, + GeneratedAccountsCount: 100, + } + + // created expected set of keys in a similar way as netgoal generate --deterministic + expectedPubKeys := make(map[crypto.PublicKey]*crypto.SignatureSecrets) + for i := 0; i < int(initCfg.GeneratedAccountsCount); i++ { + key := makeKeyFromSeed(uint64(i)) + expectedPubKeys[key.SignatureVerifier] = key + } + assert.Len(t, expectedPubKeys, int(initCfg.GeneratedAccountsCount)) + + // call pingpong acct generator and assert its separately-generated secrets are equal + accountSecrets := deterministicAccounts(initCfg) + cnt := 0 + for secret := range accountSecrets { + t.Log("Got address", basics.Address(secret.SignatureVerifier)) + assert.Contains(t, expectedPubKeys, secret.SignatureVerifier) + assert.Equal(t, *expectedPubKeys[secret.SignatureVerifier], *secret) + cnt++ + } + assert.Equal(t, int(initCfg.NumPartAccounts), cnt) +} diff --git a/shared/pingpong/config.go b/shared/pingpong/config.go index db6cbb4ed1..5b9224c017 100644 --- a/shared/pingpong/config.go +++ b/shared/pingpong/config.go @@ -18,6 +18,7 @@ package pingpong import ( "encoding/json" + "fmt" "io" "os" "time" @@ -30,6 +31,7 @@ const ConfigFilename = "ppconfig.json" // PpConfig defines configuration structure for type PpConfig struct { + // SrcAccount is address to use as funding source for new accounts SrcAccount string RandomizeFee bool RandomizeAmt bool @@ -45,12 +47,19 @@ type PpConfig struct { Quiet bool RandomNote bool RandomLease bool - Program []byte - LogicArgs [][]byte - GroupSize uint32 - NumAsset uint32 + + Program []byte + LogicArgs [][]byte + ProgramProbability float64 + + GroupSize uint32 + // NumAsset is the number of assets each account holds + NumAsset uint32 + // MinAccountAsset MinAccountAsset uint64 - NumApp uint32 + // NumApp is the total number of apps to create + NumApp uint32 + // NumAppOptIn is the number of apps each account opts in to NumAppOptIn uint32 AppProgOps uint32 AppProgHashes uint32 @@ -64,6 +73,18 @@ type PpConfig struct { NftAsaPerSecond uint32 // e.g. 100 NftAsaPerAccount uint32 // 0..999 NftAsaAccountInFlight uint32 + + // configuration related to using bootstrapped ledgers built by netgoal + // TODO: support generatedAssetsCount, generatedApplicationCount + DeterministicKeys bool + GeneratedAccountsCount uint32 + GeneratedAccountSampleMethod string + GeneratedAccountsOffset uint32 + + WeightPayment float64 + WeightAsset float64 + WeightApp float64 + WeightNFTCreation float64 } // DefaultConfig object for Ping Pong @@ -78,7 +99,7 @@ var DefaultConfig = PpConfig{ TxnPerSec: 200, NumPartAccounts: 10, RunTime: 10 * time.Second, - RefreshTime: 10 * time.Second, + RefreshTime: 3600 * time.Second, MinAccountFunds: 100000, GroupSize: 1, NumAsset: 0, @@ -90,6 +111,8 @@ var DefaultConfig = PpConfig{ Rekey: false, MaxRuntime: 0, + ProgramProbability: 1, + NftAsaAccountInFlight: 5, NftAsaPerAccount: 900, } @@ -125,3 +148,50 @@ func (cfg PpConfig) Dump(stream io.Writer) { enc := codecs.NewFormattedJSONEncoder(stream) enc.Encode(cfg) } + +// SetDefaultWeights ensures a reasonable configuration of traffic generation weights. +// With no weights set, and old args about what mode to run, each activated traffic type gets a weight of 1. +// With no weights set and some activated traffic type other than payment, payment gets deactivated (zero weight) to maintain compatibility with prior behavior. WeightPayment must be explicitly set to add it to the mix if other modes are activated. +func (cfg *PpConfig) SetDefaultWeights() { + const epsilon = 0.0000001 + if cfg.WeightPayment+cfg.WeightAsset+cfg.WeightApp+cfg.WeightNFTCreation < epsilon { + // set up some sensible run probability weights + if cfg.NumAsset > 0 && cfg.WeightAsset < epsilon { + cfg.WeightAsset = 1 + } + if cfg.NumApp > 0 && cfg.WeightApp < epsilon { + cfg.WeightApp = 1 + } + if cfg.NftAsaPerSecond > 0 && cfg.WeightNFTCreation < epsilon { + cfg.WeightNFTCreation = 1 + } + if cfg.NumAsset == 0 && cfg.NumApp == 0 && cfg.NftAsaPerSecond == 0 && cfg.WeightPayment < epsilon { + // backwards compatibility, if a mode is specified we wouldn't run payment traffic, so only set it when no mode is specified + cfg.WeightPayment = 1 + } + } +} + +var accountSampleMethods = []string{ + "", + "random", + "sequential", +} + +// Check returns an error if config is invalid. +func (cfg *PpConfig) Check() error { + sampleOk := false + for _, v := range accountSampleMethods { + if v == cfg.GeneratedAccountSampleMethod { + sampleOk = true + break + } + } + if !sampleOk { + return fmt.Errorf("unknown GeneratedAccountSampleMethod: %s", cfg.GeneratedAccountSampleMethod) + } + if cfg.DeterministicKeys && (cfg.GeneratedAccountsOffset+cfg.NumPartAccounts > cfg.GeneratedAccountsCount) { + return fmt.Errorf("(GeneratedAccountsOffset %d) + (NumPartAccounts %d) > (GeneratedAccountsCount %d)", cfg.GeneratedAccountsOffset, cfg.NumPartAccounts, cfg.GeneratedAccountsCount) + } + return nil +} diff --git a/shared/pingpong/pingpong.go b/shared/pingpong/pingpong.go index 1592ba6c9f..e0ee6812a7 100644 --- a/shared/pingpong/pingpong.go +++ b/shared/pingpong/pingpong.go @@ -19,11 +19,13 @@ package pingpong import ( "context" "encoding/binary" + "errors" "fmt" "math" "math/rand" "os" "strings" + "sync/atomic" "time" "github.com/algorand/go-deadlock" @@ -36,7 +38,6 @@ import ( "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/libgoal" "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/util" ) // CreatablesInfo has information about created assets, apps and opting in @@ -49,145 +50,146 @@ type CreatablesInfo struct { // pingPongAccount represents the account state for each account in the pingpong application // This includes the current balance and public/private keys tied to the account type pingPongAccount struct { + balance uint64 + balanceRound uint64 + deadlock.Mutex sk *crypto.SignatureSecrets pk basics.Address - balance uint64 - balanceRound uint64 + // asset holdings + holdings map[uint64]uint64 } func (ppa *pingPongAccount) getBalance() uint64 { - ppa.Lock() - defer ppa.Unlock() - return ppa.balance + return atomic.LoadUint64(&ppa.balance) } func (ppa *pingPongAccount) setBalance(balance uint64) { + atomic.StoreUint64(&ppa.balance, balance) +} + +func (ppa *pingPongAccount) addBalance(offset int64) { + if offset >= 0 { + atomic.AddUint64(&ppa.balance, uint64(offset)) + return + } + for { + v := atomic.LoadUint64(&ppa.balance) + nv := v - uint64(-offset) + done := atomic.CompareAndSwapUint64(&ppa.balance, v, nv) + if done { + return + } + } +} + +func (ppa *pingPongAccount) getAsset(aid uint64) (v uint64, ok bool) { + ppa.Lock() + defer ppa.Unlock() + v, ok = ppa.holdings[aid] + return +} +func (ppa *pingPongAccount) setAsset(aid, value uint64) { + ppa.Lock() + defer ppa.Unlock() + ppa.holdings[aid] = value +} +func (ppa *pingPongAccount) addAsset(aid uint64, dv int64) { ppa.Lock() defer ppa.Unlock() - ppa.balance = balance + v := ppa.holdings[aid] + if dv >= 0 { + v += uint64(dv) + } else { + v -= uint64(-dv) + } + ppa.holdings[aid] = v } -func (ppa *pingPongAccount) addBalance(offset int64) { +func (ppa *pingPongAccount) String() string { ppa.Lock() defer ppa.Unlock() - ppa.balance = uint64(int64(ppa.balance) + offset) + var ow strings.Builder + fmt.Fprintf(&ow, "%s %d", ppa.pk.String(), ppa.balance) + if len(ppa.holdings) > 0 { + fmt.Fprintf(&ow, "[") + first := true + for assetID, av := range ppa.holdings { + if first { + first = false + } else { + fmt.Fprintf(&ow, ", ") + } + fmt.Fprintf(&ow, "a%d=%d", assetID, av) + } + fmt.Fprintf(&ow, "]") + } + return ow.String() } // WorkerState object holds a running pingpong worker type WorkerState struct { - cfg PpConfig - accounts map[string]*pingPongAccount - accountsMu deadlock.RWMutex - cinfo CreatablesInfo + cfg PpConfig + accounts map[string]*pingPongAccount + cinfo CreatablesInfo nftStartTime int64 localNftIndex uint64 nftHolders map[string]int incTransactionSalt uint64 - muSuggestedParams deadlock.Mutex - suggestedParams v1.TransactionParams - pendingTxns v1.PendingTransactions + nextSendTime time.Time + scheduleActionTime time.Duration + scheduleCalls uint64 + scheduleSteps uint64 + + refreshAddrs []string + refreshPos int + + client *libgoal.Client } // PrepareAccounts to set up accounts and asset accounts required for Ping Pong run -func (pps *WorkerState) PrepareAccounts(ac libgoal.Client) (err error) { - pps.accounts, pps.cfg, err = pps.ensureAccounts(ac, pps.cfg) +func (pps *WorkerState) PrepareAccounts(ac *libgoal.Client) (err error) { + pps.client = ac + pps.nextSendTime = time.Now() + durPerTxn := time.Second / time.Duration(pps.cfg.TxnPerSec) + fmt.Printf("duration per txn %s\n", durPerTxn) + + err = pps.ensureAccounts(ac) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ensure accounts failed %v\n", err) return } - cfg := pps.cfg - if cfg.NumAsset > 0 { - // zero out max amount for asset transactions - cfg.MaxAmt = 0 + // create new ephemeral random accounts + pps.generateAccounts() - var assetAccounts map[string]*pingPongAccount - assetAccounts, err = pps.prepareNewAccounts(ac) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "prepare new accounts failed: %v\n", err) - return - } + err = pps.fundAccounts(ac) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "fund accounts failed %v\n", err) + return + } - pps.cinfo.AssetParams, pps.cinfo.OptIns, err = pps.prepareAssets(assetAccounts, ac) + if pps.cfg.NumAsset > 0 { + err = pps.prepareAssets(ac) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "prepare assets failed %v\n", err) return } - - if !cfg.Quiet { - for addr := range assetAccounts { - if addr != pps.cfg.SrcAccount { - fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].getBalance()) - } - } - } - } else if cfg.NumApp > 0 { - var appAccounts map[string]*pingPongAccount - appAccounts, err = pps.prepareNewAccounts(ac) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "prepare new accounts failed: %v\n", err) - return - } - pps.cinfo.AppParams, pps.cinfo.OptIns, err = pps.prepareApps(appAccounts, ac, cfg) - if err != nil { - return - } - if !cfg.Quiet { - for addr := range appAccounts { - if addr != pps.cfg.SrcAccount { - fmt.Printf("final prepareAccounts, account addr: %s, balance: %d\n", addr, pps.accounts[addr].getBalance()) - } - } - } - } else { - // If we have more accounts than requested, pick the top N (not including src) - if len(pps.accounts) > int(cfg.NumPartAccounts+1) { - fmt.Printf("Finding the richest %d accounts to use for transacting\n", cfg.NumPartAccounts) - pps.accounts = takeTopAccounts(pps.accounts, cfg.NumPartAccounts, cfg.SrcAccount) - } else { - // Not enough accounts yet (or just enough). Create more if needed - fmt.Printf("Not enough accounts - creating %d more\n", int(cfg.NumPartAccounts+1)-len(pps.accounts)) - generateAccounts(pps.accounts, cfg.NumPartAccounts) - } - - err = pps.fundAccounts(pps.accounts, ac, cfg) + } + if pps.cfg.NumApp > 0 { + err = pps.prepareApps(ac) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "fund accounts failed %v\n", err) return } - go pps.roundMonitor(ac) } - - pps.cfg = cfg - return -} - -func (pps *WorkerState) prepareNewAccounts(client libgoal.Client) (newAccounts map[string]*pingPongAccount, err error) { - // create new accounts for testing - newAccounts = make(map[string]*pingPongAccount) - generateAccounts(newAccounts, pps.cfg.NumPartAccounts) - // copy the source account, as needed. - if srcAcct, has := pps.accounts[pps.cfg.SrcAccount]; has { - newAccounts[pps.cfg.SrcAccount] = srcAcct - } - pps.accounts = newAccounts - - err = pps.fundAccounts(newAccounts, client, pps.cfg) - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "fund accounts failed %v\n", err) - return - } - - go pps.roundMonitor(client) return } // determine the min balance per participant account -func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (fundingRequiredBalance uint64, runningRequiredBalance uint64, err error) { +func computeAccountMinBalance(client *libgoal.Client, cfg PpConfig) (fundingRequiredBalance uint64, runningRequiredBalance uint64, err error) { proto, err := getProto(client) if err != nil { return @@ -207,17 +209,6 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (fundingRequi fee *= uint64(cfg.GroupSize) } - if cfg.NumApp > 0 { - amount := uint64(0) - - runningRequiredBalance = (amount + fee) * 10 * 2 - setupCost := uint64(proto.MaxTxGroupSize) * (uint64(proto.AppFlatParamsMinBalance*2) + fee) - // todo: add the cfg.NumAppOptIn to the setup cost. - fundingRequiredBalance = proto.MinBalance + cfg.MinAccountFunds + (amount+fee)*10*2*cfg.TxnPerSec*uint64(math.Ceil(cfg.RefreshTime.Seconds())) + setupCost - fmt.Printf("required min balance for app accounts: %d\n", fundingRequiredBalance) - return - } - fundingRequiredBalance = minActiveAccountBalance runningRequiredBalance = minActiveAccountBalance @@ -263,21 +254,90 @@ func computeAccountMinBalance(client libgoal.Client, cfg PpConfig) (fundingRequi return } -// Wait for `*nextSendTime` and update it afterwards. -func schedule(tps uint64, nextSendTime *time.Time) { - dur := time.Until(*nextSendTime) - if dur > 0 { - time.Sleep(dur) +func (pps *WorkerState) scheduleAction() bool { + if pps.refreshPos >= len(pps.refreshAddrs) { + if pps.refreshAddrs == nil { + pps.refreshAddrs = make([]string, 0, len(pps.accounts)) + } else { + pps.refreshAddrs = pps.refreshAddrs[:0] + } + for addr := range pps.accounts { + pps.refreshAddrs = append(pps.refreshAddrs, addr) + } + pps.refreshPos = 0 } + addr := pps.refreshAddrs[pps.refreshPos] + ai, err := pps.client.AccountInformation(addr) + if err == nil { + ppa := pps.accounts[addr] - *nextSendTime = nextSendTime.Add(time.Second / time.Duration(tps)) + pps.integrateAccountInfo(addr, ppa, ai) + } else { + if !pps.cfg.Quiet { + fmt.Printf("background refresh err: %v\n", err) + } + return false + } + pps.refreshPos++ + return true +} + +const durationEpsilon = time.Microsecond * 10 +const scheduleActionTimeAlpha = 6 + +// schedule consuming n txn time slots +func (pps *WorkerState) schedule(n int) { + pps.scheduleCalls++ + now := time.Now() + ok := true + timePerStep := time.Second / time.Duration(pps.cfg.TxnPerSec) + nextSendTime := pps.nextSendTime + if n > 1 { + nextSendTime = nextSendTime.Add(timePerStep * time.Duration(n-1)) + } + for { + if now.After(nextSendTime) { + break + } + dur := nextSendTime.Sub(now) + if dur < durationEpsilon { + break + } + if dur < pps.scheduleActionTime || !ok { + time.Sleep(dur) + now = time.Now() + } else { + ok = pps.scheduleAction() + nn := time.Now() + dt := nn.Sub(now) + // alpha blend to keep running approximation + pps.scheduleActionTime = ((pps.scheduleActionTime * scheduleActionTimeAlpha) + dt) / (scheduleActionTimeAlpha + 1) + now = nn + } + } + + steps := 0 + for now.After(nextSendTime) { + if steps > 0 { + dt := now.Sub(nextSendTime) + if dt < timePerStep/2 { + // good enough + break + } + } + pps.scheduleSteps++ + nextSendTime = nextSendTime.Add(timePerStep) + steps++ + } + pps.nextSendTime = nextSendTime + //fmt.Printf("schedule now=%s next=%s\n", now, pps.nextSendTime) } -func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, client libgoal.Client, cfg PpConfig) error { +func (pps *WorkerState) fundAccounts(client *libgoal.Client) error { var srcFunds, minFund uint64 var err error var tx transactions.Transaction - srcFunds, err = client.GetBalance(cfg.SrcAccount) + srcFunds, err = client.GetBalance(pps.cfg.SrcAccount) if err != nil { return err @@ -288,19 +348,19 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien // Fee of 0 will make cause the function to use the suggested one by network fee := uint64(0) - minFund, _, err = computeAccountMinBalance(client, cfg) + minFund, _, err = computeAccountMinBalance(client, pps.cfg) if err != nil { return err } fmt.Printf("adjusting account balance to %d\n", minFund) - srcAcct := accounts[cfg.SrcAccount] + srcAcct := pps.accounts[pps.cfg.SrcAccount] - nextSendTime := time.Now() - for { - accountsAdjusted := 0 + accountsAdjusted := 1 + for accountsAdjusted > 0 { + accountsAdjusted = 0 adjStart := time.Now() - for addr, acct := range accounts { + for addr, acct := range pps.accounts { if addr == pps.cfg.SrcAccount { continue } @@ -308,19 +368,19 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien if acct.getBalance() >= minFund { continue } - if !cfg.Quiet { + if !pps.cfg.Quiet { fmt.Printf("adjusting balance of account %v\n", addr) } toSend := minFund - acct.getBalance() if srcFunds <= toSend { - return fmt.Errorf("source account %s has insufficient funds %d - needs %d", cfg.SrcAccount, srcFunds, toSend) + return fmt.Errorf("source account %s has insufficient funds %d - needs %d", pps.cfg.SrcAccount, srcFunds, toSend) } srcFunds -= toSend - if !cfg.Quiet { + if !pps.cfg.Quiet { fmt.Printf("adjusting balance of account %v by %d\n ", addr, toSend) } - schedule(cfg.TxnPerSec, &nextSendTime) + pps.schedule(1) tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend, srcAcct) if err != nil { if strings.Contains(err.Error(), "broadcast queue full") { @@ -332,32 +392,29 @@ func (pps *WorkerState) fundAccounts(accounts map[string]*pingPongAccount, clien } srcFunds -= tx.Fee.Raw accountsAdjusted++ - if !cfg.Quiet { + if !pps.cfg.Quiet { fmt.Printf("account balance for key %s will be %d\n", addr, minFund) } acct.setBalance(minFund) totalSent++ } - accounts[cfg.SrcAccount].setBalance(srcFunds) + pps.accounts[pps.cfg.SrcAccount].setBalance(srcFunds) waitStart := time.Now() // wait until all the above transactions are sent, or that we have no more transactions // in our pending transaction pool coming from the source account. - err = waitPendingTransactions([]string{cfg.SrcAccount}, client) + err = waitPendingTransactions([]string{pps.cfg.SrcAccount}, client) if err != nil { return err } waitStop := time.Now() - if !cfg.Quiet { + if !pps.cfg.Quiet { fmt.Printf("%d sent (%s); waited %s\n", accountsAdjusted, waitStart.Sub(adjStart).String(), waitStop.Sub(waitStart).String()) } - if accountsAdjusted == 0 { - break - } } return err } -func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to string, fee, amount uint64, srcAcct *pingPongAccount) (transactions.Transaction, error) { +func (pps *WorkerState) sendPaymentFromSourceAccount(client *libgoal.Client, to string, fee, amount uint64, srcAcct *pingPongAccount) (transactions.Transaction, error) { // generate a unique note to avoid duplicate transaction failures note := pps.makeNextUniqueNoteField() @@ -388,7 +445,7 @@ func (pps *WorkerState) sendPaymentFromSourceAccount(client libgoal.Client, to s // accounts map have been cleared out of the transaction pool. A prerequesite for this is that // there is no other source who might be generating transactions that would come from these account // addresses. -func waitPendingTransactions(accounts []string, client libgoal.Client) error { +func waitPendingTransactions(accounts []string, client *libgoal.Client) error { for _, from := range accounts { repeat: pendingTxns, err := client.GetPendingTransactionsByAddress(from, 0) @@ -411,13 +468,11 @@ func waitPendingTransactions(accounts []string, client libgoal.Client) error { return nil } -func (pps *WorkerState) refreshAccounts(client libgoal.Client, cfg PpConfig) error { - pps.accountsMu.Lock() +func (pps *WorkerState) refreshAccounts(client *libgoal.Client) error { addrs := make([]string, 0, len(pps.accounts)) for addr := range pps.accounts { addrs = append(addrs, addr) } - pps.accountsMu.Unlock() // wait until all the pending transactions have been sent; otherwise, getting the balance // is pretty much meaningless. fmt.Printf("waiting for all transactions to be accepted before refreshing accounts.\n") @@ -436,13 +491,11 @@ func (pps *WorkerState) refreshAccounts(client libgoal.Client, cfg PpConfig) err balanceUpdates[addr] = amount } - pps.accountsMu.Lock() - defer pps.accountsMu.Unlock() for addr, amount := range balanceUpdates { pps.accounts[addr].setBalance(amount) } - return pps.fundAccounts(pps.accounts, client, cfg) + return pps.fundAccounts(client) } // return a shuffled list of accounts with some minimum balance @@ -463,7 +516,7 @@ func listSufficientAccounts(accounts map[string]*pingPongAccount, minimumAmount var logPeriod = 5 * time.Second // RunPingPong starts ping pong process -func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { +func (pps *WorkerState) RunPingPong(ctx context.Context, ac *libgoal.Client) { // Infinite loop given: // - accounts -> map of accounts to include in transfers (including src account, which we don't want to use) // - cfg -> configuration for how to proceed @@ -480,23 +533,21 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { // error = fundAccounts() // } - cfg := pps.cfg + pps.nextSendTime = time.Now() + ac.SetSuggestedParamsCacheAge(200 * time.Millisecond) + pps.client = ac + var runTime time.Duration - if cfg.RunTime > 0 { - runTime = cfg.RunTime + if pps.cfg.RunTime > 0 { + runTime = pps.cfg.RunTime } else { runTime = 10000 * time.Hour // Effectively 'forever' } var endTime time.Time - if cfg.MaxRuntime > 0 { - endTime = time.Now().Add(cfg.MaxRuntime) - } - refreshTime := time.Now().Add(cfg.RefreshTime) - - var nftThrottler *throttler - if pps.cfg.NftAsaPerSecond > 0 { - nftThrottler = newThrottler(20, float64(pps.cfg.NftAsaPerSecond)) + if pps.cfg.MaxRuntime > 0 { + endTime = time.Now().Add(pps.cfg.MaxRuntime) } + refreshTime := time.Now().Add(pps.cfg.RefreshTime) lastLog := time.Now() nextLog := lastLog.Add(logPeriod) @@ -518,7 +569,7 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { } if now.After(nextLog) { dt := now.Sub(lastLog) - fmt.Printf("%d sent, %0.2f/s (%d total)\n", totalSent-lastTotalSent, float64(totalSent-lastTotalSent)/dt.Seconds(), totalSent) + fmt.Printf("%d sent, %0.2f/s (%d total) (%d sc %d sts)\n", totalSent-lastTotalSent, float64(totalSent-lastTotalSent)/dt.Seconds(), totalSent, pps.scheduleCalls, pps.scheduleSteps) lastTotalSent = totalSent for now.After(nextLog) { nextLog = nextLog.Add(logPeriod) @@ -526,32 +577,18 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { lastLog = now } - if cfg.MaxRuntime > 0 && time.Now().After(endTime) { - fmt.Printf("Terminating after max run time of %.f seconds\n", cfg.MaxRuntime.Seconds()) + if pps.cfg.MaxRuntime > 0 && time.Now().After(endTime) { + fmt.Printf("Terminating after max run time of %.f seconds\n", pps.cfg.MaxRuntime.Seconds()) return } - if pps.cfg.NftAsaPerSecond > 0 { - sent, err := pps.makeNftTraffic(ac) - if err != nil { - fmt.Fprintf(os.Stderr, "error sending nft transactions: %v\n", err) - } - nftThrottler.maybeSleep(int(sent)) - totalSent += sent - continue - } - - minimumAmount := cfg.MinAccountFunds + (cfg.MaxAmt+cfg.MaxFee)*2 - pps.accountsMu.RLock() - fromList := listSufficientAccounts(pps.accounts, minimumAmount, cfg.SrcAccount) - pps.accountsMu.RUnlock() + minimumAmount := pps.cfg.MinAccountFunds + (pps.cfg.MaxAmt+pps.cfg.MaxFee)*2 + fromList := listSufficientAccounts(pps.accounts, minimumAmount, pps.cfg.SrcAccount) // in group tests txns are sent back and forth, so both parties need funds var toList []string - if cfg.GroupSize == 1 { + if pps.cfg.GroupSize == 1 { minimumAmount = 0 - pps.accountsMu.RLock() - toList = listSufficientAccounts(pps.accounts, minimumAmount, cfg.SrcAccount) - pps.accountsMu.RUnlock() + toList = listSufficientAccounts(pps.accounts, minimumAmount, pps.cfg.SrcAccount) } else { // same selection with another shuffle toList = make([]string, len(fromList)) @@ -563,16 +600,18 @@ func (pps *WorkerState) RunPingPong(ctx context.Context, ac libgoal.Client) { totalSent += sent totalSucceeded += succeeded if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "error sending transactions: %v\n", err) + _, _ = fmt.Fprintf(os.Stderr, "error sending transactions, sleeping .5 seconds: %v\n", err) + pps.nextSendTime = time.Now().Add(500 * time.Millisecond) + pps.schedule(1) } - if cfg.RefreshTime > 0 && time.Now().After(refreshTime) { - err = pps.refreshAccounts(ac, cfg) + if pps.cfg.RefreshTime > 0 && time.Now().After(refreshTime) { + err = pps.refreshAccounts(ac) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "error refreshing: %v\n", err) } - refreshTime = refreshTime.Add(cfg.RefreshTime) + refreshTime = refreshTime.Add(pps.cfg.RefreshTime) } } @@ -586,157 +625,58 @@ func NewPingpong(cfg PpConfig) *WorkerState { return &WorkerState{cfg: cfg, nftHolders: make(map[string]int)} } -func randomizeCreatableID(cfg PpConfig, cinfo CreatablesInfo) (aidx uint64) { - if cfg.NumAsset > 0 { - rindex := rand.Intn(len(cinfo.AssetParams)) - i := 0 - for k := range cinfo.AssetParams { - if i == rindex { - aidx = k - break - } - i++ - } - } else if cfg.NumApp > 0 { - rindex := rand.Intn(len(cinfo.AppParams)) - i := 0 - for k := range cinfo.AppParams { - if i == rindex { - aidx = k - break - } - i++ +func (pps *WorkerState) randAssetID() (aidx uint64) { + if len(pps.cinfo.AssetParams) == 0 { + return 0 + } + rindex := rand.Intn(len(pps.cinfo.AssetParams)) + i := 0 + for k := range pps.cinfo.AssetParams { + if i == rindex { + return k } + i++ } return } - -func (pps *WorkerState) fee() uint64 { - cfg := pps.cfg - fee := cfg.MaxFee - if cfg.RandomizeFee { - fee = rand.Uint64()%(cfg.MaxFee-cfg.MinFee) + cfg.MinFee +func (pps *WorkerState) randAppID() (aidx uint64) { + if len(pps.cinfo.AppParams) == 0 { + return 0 } - return fee -} - -func (pps *WorkerState) makeNftTraffic(client libgoal.Client) (sentCount uint64, err error) { - fee := pps.fee() - var srcCost uint64 - if (len(pps.nftHolders) == 0) || ((float64(int(pps.cfg.NftAsaAccountInFlight)-len(pps.nftHolders)) / float64(pps.cfg.NftAsaAccountInFlight)) >= rand.Float64()) { - var addr string - - var seed [32]byte - crypto.RandBytes(seed[:]) - privateKey := crypto.GenerateSignatureSecrets(seed) - publicKey := basics.Address(privateKey.SignatureVerifier) - - pps.accountsMu.Lock() - pps.accounts[publicKey.String()] = &pingPongAccount{ - sk: privateKey, - pk: publicKey, - } - pps.accountsMu.Unlock() - addr = publicKey.String() - - fmt.Printf("new NFT holder %s\n", addr) - var proto config.ConsensusParams - proto, err = getProto(client) - if err != nil { - return - } - // enough for the per-asa minbalance and more than enough for the txns to create them - toSend := proto.MinBalance * uint64(pps.cfg.NftAsaPerAccount+1) * 2 - pps.nftHolders[addr] = 0 - var tx transactions.Transaction - srcAcct := pps.acct(pps.cfg.SrcAccount) - tx, err = pps.sendPaymentFromSourceAccount(client, addr, fee, toSend, srcAcct) - if err != nil { - return + rindex := rand.Intn(len(pps.cinfo.AppParams)) + i := 0 + for k := range pps.cinfo.AppParams { + if i == rindex { + return k } - srcCost += tx.Fee.Raw + toSend - sentCount++ - // we ran one txn above already to fund the new addr, - // we'll run a second txn below - } - pps.accountsMu.Lock() - pps.accounts[pps.cfg.SrcAccount].addBalance(-int64(srcCost)) - pps.accountsMu.Unlock() - // pick a random sender from nft holder sub accounts - pick := rand.Intn(len(pps.nftHolders)) - pos := 0 - var sender string - var senderNftCount int - for addr, nftCount := range pps.nftHolders { - sender = addr - senderNftCount = nftCount - if pos == pick { - break - } - pos++ - - } - var meta [32]byte - rand.Read(meta[:]) - assetName := pps.nftSpamAssetName() - const totalSupply = 1 - txn, err := client.MakeUnsignedAssetCreateTx(totalSupply, false, sender, sender, sender, sender, "ping", assetName, "", meta[:], 0) - if err != nil { - fmt.Printf("Cannot make asset create txn with meta %v\n", meta) - return - } - txn, err = client.FillUnsignedTxTemplate(sender, 0, 0, pps.cfg.MaxFee, txn) - if err != nil { - fmt.Printf("Cannot fill asset creation txn\n") - return - } - if senderNftCount+1 >= int(pps.cfg.NftAsaPerAccount) { - delete(pps.nftHolders, sender) - } else { - pps.nftHolders[sender] = senderNftCount + 1 - } - signer := pps.acct(sender) - stxn, err := signTxn(signer, txn, pps.cfg) - if err != nil { - return + i++ } + return +} - _, err = client.BroadcastTransaction(stxn) - if err != nil { - return +func (pps *WorkerState) fee() uint64 { + fee := pps.cfg.MaxFee + if pps.cfg.RandomizeFee { + fee = rand.Uint64()%(pps.cfg.MaxFee-pps.cfg.MinFee) + pps.cfg.MinFee } - sentCount++ - return + return fee } func (pps *WorkerState) acct(from string) *pingPongAccount { - pps.accountsMu.RLock() - defer pps.accountsMu.RUnlock() return pps.accounts[from] } func (pps *WorkerState) sendFromTo( fromList, toList []string, - client libgoal.Client, nextSendTime *time.Time, + client *libgoal.Client, nextSendTime *time.Time, ) (sentCount, successCount uint64, err error) { - cinfo := pps.cinfo - cfg := pps.cfg - - amt := cfg.MaxAmt var minAccountRunningBalance uint64 - _, minAccountRunningBalance, err = computeAccountMinBalance(client, cfg) + _, minAccountRunningBalance, err = computeAccountMinBalance(client, pps.cfg) if err != nil { return 0, 0, err } belowMinBalanceAccounts := make(map[string] /*basics.Address*/ bool) - assetsByCreator := make(map[string][]*v1.AssetParams) - for _, p := range cinfo.AssetParams { - c := p.Creator - ap := &v1.AssetParams{} - *ap = p - assetsByCreator[c] = append(assetsByCreator[c], ap) - } for i, from := range fromList { // keep going until the balances of at least 20% of the accounts is too low. @@ -749,14 +689,10 @@ func (pps *WorkerState) sendFromTo( continue } - if cfg.RandomizeAmt { - amt = ((rand.Uint64() % cfg.MaxAmt) + 1) % cfg.MaxAmt - } - fee := pps.fee() to := toList[i] - if cfg.RandomizeDst { + if pps.cfg.RandomizeDst { var addr basics.Address crypto.RandBytes(addr[:]) to = addr.String() @@ -772,22 +708,15 @@ func (pps *WorkerState) sendFromTo( // Broadcast transaction var sendErr error - fromBalanceChange := int64(0) - toBalanceChange := int64(0) - if cfg.NumAsset > 0 { - amt = 1 - } else if cfg.NumApp > 0 { - amt = 0 - } - fromAcct := pps.acct(from) - if cfg.GroupSize == 1 { - // generate random assetID or appId if we send asset/app txns - aidx := randomizeCreatableID(cfg, cinfo) + var fromAcct *pingPongAccount + var update txnUpdate + var updates []txnUpdate + if pps.cfg.GroupSize == 1 { var txn transactions.Transaction var consErr error // Construct single txn - txn, from, consErr = pps.constructTxn(from, to, fee, amt, aidx, client) + txn, from, update, consErr = pps.constructTxn(from, to, fee, client) if consErr != nil { err = consErr _, _ = fmt.Fprintf(os.Stderr, "constructTxn failed: %v\n", err) @@ -795,26 +724,29 @@ func (pps *WorkerState) sendFromTo( } // would we have enough money after taking into account the current updated fees ? - if fromAcct.getBalance() <= (txn.Fee.Raw + amt + minAccountRunningBalance) { - _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d: %s -> %s; Current cost too high(%d <= %d + %d + %d).\n", amt, from, to, fromAcct.getBalance(), txn.Fee.Raw, amt, minAccountRunningBalance) + fromAcct = pps.acct(from) + if fromAcct == nil { + err = fmt.Errorf("tx %v from %s -> no acct", txn, from) + fmt.Fprintf(os.Stderr, "%s\n", err.Error()) + return + } + + if fromAcct.getBalance() <= (txn.Fee.Raw + pps.cfg.MaxAmt + minAccountRunningBalance) { + _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d: %s -> %s; Current cost too high(%d <= %d + %d + %d).\n", pps.cfg.MaxAmt, from, to, fromAcct.getBalance(), txn.Fee.Raw, pps.cfg.MaxAmt, minAccountRunningBalance) belowMinBalanceAccounts[from] = true continue } - fromBalanceChange = -int64(txn.Fee.Raw + amt) - toBalanceChange = int64(amt) - // Sign txn - signer := pps.acct(from) - stxn, signErr := signTxn(signer, txn, cfg) + stxn, signErr := signTxn(fromAcct, txn, pps.cfg) if signErr != nil { err = signErr _, _ = fmt.Fprintf(os.Stderr, "signTxn failed: %v\n", err) return } - schedule(cfg.TxnPerSec, nextSendTime) sentCount++ + pps.schedule(1) _, sendErr = client.BroadcastTransaction(stxn) } else { // Generate txn group @@ -826,31 +758,22 @@ func (pps *WorkerState) sendFromTo( var txGroup []transactions.Transaction var txSigners []string - for j := 0; j < int(cfg.GroupSize); j++ { + for j := 0; j < int(pps.cfg.GroupSize); j++ { var txn transactions.Transaction var signer string if j%2 == 0 { - txn, signer, err = pps.constructTxn(from, to, fee, amt, 0, client) - fromBalanceChange -= int64(txn.Fee.Raw + amt) - toBalanceChange += int64(amt) - } else if cfg.GroupSize == 2 && cfg.Rekey { - txn, _, err = pps.constructTxn(from, to, fee, amt, 0, client) - fromBalanceChange -= int64(txn.Fee.Raw + amt) - toBalanceChange += int64(amt) + txn, signer, update, err = pps.constructTxn(from, to, fee, client) + } else if pps.cfg.GroupSize == 2 && pps.cfg.Rekey { + txn, _, update, err = pps.constructTxn(from, to, fee, client) signer = to } else { - txn, signer, err = pps.constructTxn(to, from, fee, amt, 0, client) - toBalanceChange -= int64(txn.Fee.Raw + amt) - fromBalanceChange += int64(amt) + txn, signer, update, err = pps.constructTxn(to, from, fee, client) } if err != nil { _, _ = fmt.Fprintf(os.Stderr, "group tx failed: %v\n", err) return } - if cfg.RandomizeAmt && j%2 == 1 { - amt = rand.Uint64()%cfg.MaxAmt + 1 - } - if cfg.Rekey { + if pps.cfg.Rekey { if from == signer { // rekey to the receiver the first txn of the rekeying pair txn.RekeyTo, err = basics.UnmarshalChecksumAddress(to) @@ -865,17 +788,7 @@ func (pps *WorkerState) sendFromTo( } txGroup = append(txGroup, txn) txSigners = append(txSigners, signer) - } - - // would we have enough money after taking into account the current updated fees ? - if int64(fromAcct.getBalance())+fromBalanceChange <= int64(cfg.MinAccountFunds) { - _, _ = fmt.Fprintf(os.Stdout, "Skipping sending %d : %s -> %s; Current cost too high.\n", amt, from, to) - continue - } - toAcct := pps.acct(to) - if int64(toAcct.getBalance())+toBalanceChange <= int64(cfg.MinAccountFunds) { - _, _ = fmt.Fprintf(os.Stdout, "Skipping sending back %d : %s -> %s; Current cost too high.\n", amt, to, from) - continue + updates = append(updates, update) } // Generate group ID @@ -885,7 +798,7 @@ func (pps *WorkerState) sendFromTo( return } - if !cfg.Quiet { + if !pps.cfg.Quiet { _, _ = fmt.Fprintf(os.Stdout, "Sending TxnGroup: ID %v, size %v \n", gid, len(txGroup)) } @@ -895,29 +808,34 @@ func (pps *WorkerState) sendFromTo( for j, txn := range txGroup { txn.Group = gid signer := pps.acct(txSigners[j]) - stxGroup[j], signErr = signTxn(signer, txn, cfg) + stxGroup[j], signErr = signTxn(signer, txn, pps.cfg) if signErr != nil { err = signErr return } } - schedule(cfg.TxnPerSec, nextSendTime) - sentCount++ + sentCount += uint64(len(txGroup)) + pps.schedule(len(txGroup)) sendErr = client.BroadcastTransactionGroup(stxGroup) } if sendErr != nil { - _, _ = fmt.Fprintf(os.Stderr, "error sending Transaction, sleeping .5 seconds: %v\n", sendErr) err = sendErr - time.Sleep(500 * time.Millisecond) return } - successCount++ - fromAcct.addBalance(fromBalanceChange) - // avoid updating the "to" account. + // assume that if it was accepted by an algod, it got processed + // (this is a bad assumption, we should be checking pending status or reading blocks to see if our txid were committed) + if len(updates) > 0 { + for _, ud := range updates { + ud.apply(pps) + } + } else if update != nil { + update.apply(pps) + } + successCount++ } return } @@ -936,74 +854,14 @@ func (pps *WorkerState) makeNextUniqueNoteField() []byte { return noteField[:usedBytes] } -func (pps *WorkerState) roundMonitor(client libgoal.Client) { - var minFund uint64 - var err error - for { - minFund, _, err = computeAccountMinBalance(client, pps.cfg) - if err == nil { - break - } - } - var newBalance uint64 - for { - paramsResp, err := client.SuggestedParams() - if err != nil { - time.Sleep(5 * time.Millisecond) - continue - } - pendingTxns, err := client.GetPendingTransactions(0) - if err != nil { - time.Sleep(5 * time.Millisecond) - continue - } - pps.muSuggestedParams.Lock() - pps.suggestedParams = paramsResp - pps.pendingTxns = pendingTxns - pps.muSuggestedParams.Unlock() - - // take a quick snapshot of accounts to decrease mutex shadow - pps.accountsMu.Lock() - accountsSnapshot := make([]*pingPongAccount, 0, len(pps.accounts)) - for _, acct := range pps.accounts { - accountsSnapshot = append(accountsSnapshot, acct) - } - pps.accountsMu.Unlock() - - for _, acct := range accountsSnapshot { - acct.Lock() - needRefresh := acct.balance < minFund && acct.balanceRound < paramsResp.LastRound - acct.Unlock() - if needRefresh { - newBalance, err = client.GetBalance(acct.pk.String()) - if err == nil { - acct.Lock() - acct.balanceRound, acct.balance = paramsResp.LastRound, newBalance - acct.Unlock() - } - } - } - - // wait for the next round. - waitForNextRoundOrSleep(client, 200*time.Millisecond) - } -} - -func (pps *WorkerState) getSuggestedParams() v1.TransactionParams { - pps.muSuggestedParams.Lock() - defer pps.muSuggestedParams.Unlock() - return pps.suggestedParams -} +var errNotOptedIn = errors.New("not opted in") -func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, client libgoal.Client) (txn transactions.Transaction, sender string, err error) { - cfg := pps.cfg - cinfo := pps.cinfo - sender = from +func (pps *WorkerState) constructTxn(from, to string, fee uint64, client *libgoal.Client) (txn transactions.Transaction, sender string, update txnUpdate, err error) { var noteField []byte const pingpongTag = "pingpong" const tagLen = len(pingpongTag) // if random note flag set, then append a random number of additional bytes - if cfg.RandomNote { + if pps.cfg.RandomNote { const maxNoteFieldLen = 1024 noteLength := tagLen + int(rand.Uint32())%(maxNoteFieldLen-tagLen) noteField = make([]byte, noteLength) @@ -1015,83 +873,38 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli // if random lease flag set, fill the lease field with random bytes var lease [32]byte - if cfg.RandomLease { + if pps.cfg.RandomLease { crypto.RandBytes(lease[:]) } - if cfg.NumApp > 0 { // Construct app transaction - // select opted-in accounts for Txn.Accounts field - var accounts []string - assetOptIns := cinfo.OptIns[aidx] - if len(assetOptIns) > 0 { - indices := rand.Perm(len(assetOptIns)) - limit := 5 - if len(indices) < limit { - limit = len(indices) - } - for i := 0; i < limit; i++ { - idx := indices[i] - accounts = append(accounts, assetOptIns[idx]) - } - if cinfo.AssetParams[aidx].Creator == from { - // if the application was created by the "from" account, then we don't need to worry about it being opted-in. - } else { - fromIsOptedIn := false - for i := 0; i < len(assetOptIns); i++ { - if assetOptIns[i] == from { - fromIsOptedIn = true - break - } - } - if !fromIsOptedIn { - sender = accounts[0] - from = sender - } - } - accounts = accounts[1:] - } - txn, err = client.MakeUnsignedAppNoOpTx(aidx, nil, accounts, nil, nil) - if err != nil { - return - } - txn.Note = noteField[:] - txn.Lease = lease - txn, err = client.FillUnsignedTxTemplate(from, 0, 0, cfg.MaxFee, txn) - if !cfg.Quiet { - _, _ = fmt.Fprintf(os.Stdout, "Calling app %d : %s\n", aidx, from) - } - } else if cfg.NumAsset > 0 { // Construct asset transaction - // select a pair of random opted-in accounts by aidx - // use them as from/to addresses - if from != to { - if len(cinfo.OptIns[aidx]) > 0 { - indices := rand.Perm(len(cinfo.OptIns[aidx])) - from = cinfo.OptIns[aidx][indices[0]] - to = cinfo.OptIns[aidx][indices[1]] - sender = from - } else { - err = fmt.Errorf("asset %d has not been opted in by any account", aidx) - _, _ = fmt.Fprintf(os.Stdout, "error constructing transaction - %v\n", err) - return - } - } - txn, err = client.MakeUnsignedAssetSendTx(aidx, amt, to, "", "") - if err != nil { - _, _ = fmt.Fprintf(os.Stdout, "error making unsigned asset send tx %v\n", err) - return + // weighted random selection of traffic type + // TODO: construct*Txn() have the same signature, make this data structures and loop over them? + totalWeight := pps.cfg.WeightPayment + pps.cfg.WeightAsset + pps.cfg.WeightApp + target := rand.Float64() * totalWeight + if target < pps.cfg.WeightAsset && pps.cfg.NumAsset > 0 { + txn, sender, update, err = pps.constructAssetTxn(from, to, fee, client, noteField, lease) + if err != errNotOptedIn { + goto weightdone } - txn.Note = noteField[:] - txn.Lease = lease - txn, err = client.FillUnsignedTxTemplate(sender, 0, 0, cfg.MaxFee, txn) - if !cfg.Quiet { - _, _ = fmt.Fprintf(os.Stdout, "Sending %d asset %d: %s -> %s\n", amt, aidx, sender, to) + } + target -= pps.cfg.WeightAsset + if target < pps.cfg.WeightApp && pps.cfg.NumApp > 0 { + txn, sender, update, err = pps.constructAppTxn(from, to, fee, client, noteField, lease) + if err != errNotOptedIn { + goto weightdone } - } else { - txn, err = pps.constructPayment(from, to, fee, amt, noteField, "", lease) - if !cfg.Quiet { - _, _ = fmt.Fprintf(os.Stdout, "Sending %d : %s -> %s\n", amt, from, to) + } + target -= pps.cfg.WeightApp + if target < pps.cfg.WeightNFTCreation && pps.cfg.NftAsaPerSecond > 0 { + txn, sender, update, err = pps.constructNFTGenTxn(from, to, fee, client, noteField, lease) + if err != errNotOptedIn { + goto weightdone } } + // TODO: other traffic types here + // fallback on payment + txn, sender, update, err = pps.constructPaymentTxn(from, to, fee, client, noteField, lease) +weightdone: if err != nil { _, _ = fmt.Fprintf(os.Stdout, "error constructing transaction %v\n", err) @@ -1100,8 +913,8 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli // adjust transaction duration for 5 rounds. That would prevent it from getting stuck in the transaction pool for too long. txn.LastValid = txn.FirstValid + 5 - // if cfg.MaxFee == 0, automatically adjust the fee amount to required min fee - if cfg.MaxFee == 0 { + // if pps.cfg.MaxFee == 0, automatically adjust the fee amount to required min fee + if pps.cfg.MaxFee == 0 { var suggestedFee uint64 suggestedFee, err = client.SuggestedFee() if err != nil { @@ -1115,85 +928,313 @@ func (pps *WorkerState) constructTxn(from, to string, fee, amt, aidx uint64, cli return } -// ConstructPayment builds a payment transaction to be signed -// If the fee is 0, the function will use the suggested one form the network -// Although firstValid and lastValid come pre-computed in a normal flow, -// additional validation is done by computeValidityRounds: -// if the lastValid is 0, firstValid + maxTxnLifetime will be used -// if the firstValid is 0, lastRound + 1 will be used -func (pps *WorkerState) constructPayment(from, to string, fee, amount uint64, note []byte, closeTo string, lease [32]byte) (transactions.Transaction, error) { - fromAddr, err := basics.UnmarshalChecksumAddress(from) - if err != nil { - return transactions.Transaction{}, err +type txnUpdate interface { + apply(pps *WorkerState) +} + +func (pps *WorkerState) constructPaymentTxn(from, to string, fee uint64, client *libgoal.Client, noteField []byte, lease [32]byte) (txn transactions.Transaction, sender string, update txnUpdate, err error) { + amt := pps.cfg.MaxAmt + if pps.cfg.RandomizeAmt { + amt = uint64(rand.Int63n(int64(pps.cfg.MaxAmt-1))) + 1 + } + txn, err = client.ConstructPayment(from, to, fee, amt, noteField, "", lease, 0, 0) + if !pps.cfg.Quiet { + _, _ = fmt.Fprintf(os.Stdout, "Sending %d : %s -> %s\n", amt, from, to) + } + update = &paymentUpdate{ + from: from, + to: to, + amt: amt, + fee: fee, } + return txn, from, update, err +} + +type paymentUpdate struct { + from string + to string + amt uint64 + fee uint64 +} - var toAddr basics.Address - if to != "" { - toAddr, err = basics.UnmarshalChecksumAddress(to) +func (au *paymentUpdate) apply(pps *WorkerState) { + pps.accounts[au.from].balance -= (au.fee + au.amt) + pps.accounts[au.to].balance += au.amt +} + +// return true with probability 1/i +func pReplace(i int) bool { + if i <= 1 { + return true + } + return rand.Intn(i) == 0 +} + +func (pps *WorkerState) constructAssetTxn(from, toUnused string, fee uint64, client *libgoal.Client, noteField []byte, lease [32]byte) (txn transactions.Transaction, sender string, update txnUpdate, err error) { + // select a pair of random opted-in accounts by aidx + // use them as from/to addresses + amt := uint64(1) + aidx := pps.randAssetID() + if aidx == 0 { + err = fmt.Errorf("no known assets") + return + } + if len(pps.cinfo.OptIns[aidx]) == 0 { + // Opt-in another + // TODO: continue opt-in up to some amount? gradually? + txn, err = pps.appOptIn(from, aidx, client) if err != nil { - return transactions.Transaction{}, err + return + } + update = &appOptInUpdate{ + addr: from, + aidx: aidx, } + return txn, from, update, nil } - // Get current round, protocol, genesis ID - var params v1.TransactionParams - for params.LastRound == 0 { - params = pps.getSuggestedParams() - } - - cp, ok := config.Consensus[protocol.ConsensusVersion(params.ConsensusVersion)] - if !ok { - return transactions.Transaction{}, fmt.Errorf("ConstructPayment: unknown consensus protocol %s", params.ConsensusVersion) - } - fv := params.LastRound + 1 - lv := fv + cp.MaxTxnLife - 1 - - tx := transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: fromAddr, - Fee: basics.MicroAlgos{Raw: fee}, - FirstValid: basics.Round(fv), - LastValid: basics.Round(lv), - Lease: lease, - Note: note, - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: toAddr, - Amount: basics.MicroAlgos{Raw: amount}, - }, - } - - // If requesting closing, put it in the transaction. The protocol might - // not support it, but in that case, better to fail the transaction, - // because the user explicitly asked for it, and it's not supported. - if closeTo != "" { - closeToAddr, err := basics.UnmarshalChecksumAddress(closeTo) - if err != nil { - return transactions.Transaction{}, err + optInsForAsset := pps.cinfo.OptIns[aidx] + + var richest *pingPongAccount + var richestv uint64 + var fromAcct *pingPongAccount + var toAcct *pingPongAccount + for i, addr := range optInsForAsset { + acct := pps.accounts[addr] + if acct.holdings[aidx] > richestv { + richestv = acct.holdings[aidx] + richest = acct + continue + } + if (acct.holdings[aidx] > 1000) && (fromAcct == nil || pReplace(i)) { + fromAcct = acct + continue + } + if toAcct == nil || pReplace(i) { + toAcct = acct + continue } + } + if richest == nil { + err = fmt.Errorf("don't know any account holding asset %d", aidx) + return + } + if fromAcct == nil { + fromAcct = richest + } + if toAcct == nil { + toAcct = fromAcct + } + + to := toAcct.pk.String() + from = fromAcct.pk.String() + sender = from + if to != from { + if toAcct.holdings[aidx] < 1000 && fromAcct.holdings[aidx] > 11000 { + amt = 10000 + } + } + txn, err = client.MakeUnsignedAssetSendTx(aidx, amt, to, "", "") + if err != nil { + _, _ = fmt.Fprintf(os.Stdout, "error making unsigned asset send tx %v\n", err) + return + } + txn.Note = noteField[:] + txn.Lease = lease + txn, err = client.FillUnsignedTxTemplate(sender, 0, 0, fee, txn) + if !pps.cfg.Quiet { + _, _ = fmt.Fprintf(os.Stdout, "Sending %d asset %d: %s -> %s\n", amt, aidx, sender, to) + } + update = &assetUpdate{ + from: from, + to: to, + aidx: aidx, + amt: amt, + fee: fee, + } + return txn, sender, update, err +} + +type appOptInUpdate struct { + addr string + aidx uint64 +} + +func (au *appOptInUpdate) apply(pps *WorkerState) { + pps.accounts[au.addr].holdings[au.aidx] = 0 + pps.cinfo.OptIns[au.aidx] = uniqueAppend(pps.cinfo.OptIns[au.aidx], au.addr) +} + +type nopUpdate struct { +} + +func (au *nopUpdate) apply(pps *WorkerState) { +} + +var nopUpdateSingleton = &nopUpdate{} + +type assetUpdate struct { + from string + to string + aidx uint64 + amt uint64 + fee uint64 +} - tx.PaymentTxnFields.CloseRemainderTo = closeToAddr +func (au *assetUpdate) apply(pps *WorkerState) { + pps.accounts[au.from].balance -= au.fee + pps.accounts[au.from].holdings[au.aidx] -= au.amt + to := pps.accounts[au.to] + if to.holdings == nil { + to.holdings = make(map[uint64]uint64) } + to.holdings[au.aidx] += au.amt +} + +func (pps *WorkerState) constructAppTxn(from, to string, fee uint64, client *libgoal.Client, noteField []byte, lease [32]byte) (txn transactions.Transaction, sender string, update txnUpdate, err error) { + // select opted-in accounts for Txn.Accounts field + var accounts []string + aidx := pps.randAppID() + if aidx == 0 { + err = fmt.Errorf("no known apps") + return + } + appOptIns := pps.cinfo.OptIns[aidx] + sender = from + if len(appOptIns) > 0 { + indices := rand.Perm(len(appOptIns)) + limit := 5 + if len(indices) < limit { + limit = len(indices) + } + for i := 0; i < limit; i++ { + idx := indices[i] + accounts = append(accounts, appOptIns[idx]) + } + if pps.cinfo.AppParams[aidx].Creator == from { + // if the application was created by the "from" account, then we don't need to worry about it being opted-in. + } else { + fromIsOptedIn := false + for i := 0; i < len(appOptIns); i++ { + if appOptIns[i] == from { + fromIsOptedIn = true + break + } + } + if !fromIsOptedIn { + sender = accounts[0] + from = sender + } + } + accounts = accounts[1:] + } + txn, err = client.MakeUnsignedAppNoOpTx(aidx, nil, accounts, nil, nil) + if err != nil { + return + } + txn.Note = noteField[:] + txn.Lease = lease + txn, err = client.FillUnsignedTxTemplate(from, 0, 0, fee, txn) + if !pps.cfg.Quiet { + _, _ = fmt.Fprintf(os.Stdout, "Calling app %d : %s\n", aidx, from) + } + update = &appUpdate{ + from: from, + fee: fee, + } + return txn, sender, update, err +} + +type appUpdate struct { + from string + fee uint64 +} - tx.Header.GenesisID = params.GenesisID +func (au *appUpdate) apply(pps *WorkerState) { + pps.accounts[au.from].balance -= au.fee +} + +func (pps *WorkerState) constructNFTGenTxn(from, to string, fee uint64, client *libgoal.Client, noteField []byte, lease [32]byte) (txn transactions.Transaction, sender string, update txnUpdate, err error) { + if (len(pps.nftHolders) == 0) || ((float64(int(pps.cfg.NftAsaAccountInFlight)-len(pps.nftHolders)) / float64(pps.cfg.NftAsaAccountInFlight)) >= rand.Float64()) { + var addr string - // Check if the protocol supports genesis hash - if cp.SupportGenesisHash { - copy(tx.Header.GenesisHash[:], params.GenesisHash) + var seed [32]byte + crypto.RandBytes(seed[:]) + privateKey := crypto.GenerateSignatureSecrets(seed) + publicKey := basics.Address(privateKey.SignatureVerifier) + + pps.accounts[publicKey.String()] = &pingPongAccount{ + sk: privateKey, + pk: publicKey, + } + addr = publicKey.String() + + fmt.Printf("new NFT holder %s\n", addr) + var proto config.ConsensusParams + proto, err = getProto(client) + if err != nil { + return + } + // enough for the per-asa minbalance and more than enough for the txns to create them + amount := proto.MinBalance * uint64(pps.cfg.NftAsaPerAccount+1) * 2 + pps.nftHolders[addr] = 0 + srcAcct := pps.acct(pps.cfg.SrcAccount) + sender = srcAcct.pk.String() + txn, err = client.ConstructPayment(sender, to, fee, amount, noteField, "", [32]byte{}, 0, 0) + update = &paymentUpdate{ + from: from, + to: to, + fee: fee, + amt: amount, + } + return txn, sender, update, err } + // pick a random sender from nft holder sub accounts + pick := rand.Intn(len(pps.nftHolders)) + pos := 0 + var senderNftCount int + for addr, nftCount := range pps.nftHolders { + sender = addr + senderNftCount = nftCount + if pos == pick { + break + } + pos++ - // Default to the suggested fee, if the caller didn't supply it - // Fee is tricky, should taken care last. We encode the final transaction to get the size post signing and encoding - // Then, we multiply it by the suggested fee per byte. - if fee == 0 { - tx.Fee = basics.MulAIntSaturate(basics.MicroAlgos{Raw: params.Fee}, tx.EstimateEncodedSize()) } - if tx.Fee.Raw < cp.MinTxnFee { - tx.Fee.Raw = cp.MinTxnFee + var meta [32]byte + rand.Read(meta[:]) + assetName := pps.nftSpamAssetName() + const totalSupply = 1 + txn, err = client.MakeUnsignedAssetCreateTx(totalSupply, false, sender, sender, sender, sender, "ping", assetName, "", meta[:], 0) + if err != nil { + fmt.Printf("Cannot make asset create txn with meta %v\n", meta) + return + } + txn, err = client.FillUnsignedTxTemplate(sender, 0, 0, fee, txn) + if err != nil { + fmt.Printf("Cannot fill asset creation txn\n") + return + } + if senderNftCount+1 >= int(pps.cfg.NftAsaPerAccount) { + delete(pps.nftHolders, sender) + } else { + pps.nftHolders[sender] = senderNftCount + 1 + } + update = &nftgenUpdate{ + from: from, + fee: fee, } + return txn, sender, update, err +} - return tx, nil +type nftgenUpdate struct { + from string + fee uint64 +} + +func (au *nftgenUpdate) apply(pps *WorkerState) { + pps.accounts[au.from].balance -= au.fee } func signTxn(signer *pingPongAccount, txn transactions.Transaction, cfg PpConfig) (stxn transactions.SignedTxn, err error) { @@ -1203,7 +1244,7 @@ func signTxn(signer *pingPongAccount, txn transactions.Transaction, cfg PpConfig if cfg.Rekey { stxn, err = txn.Sign(signer.sk), nil - } else if len(cfg.Program) > 0 { + } else if len(cfg.Program) > 0 && rand.Float64() < cfg.ProgramProbability { // If there's a program, sign it and use that in a lsig progb := logic.Program(cfg.Program) psig = signer.sk.Sign(&progb) @@ -1220,53 +1261,3 @@ func signTxn(signer *pingPongAccount, txn transactions.Transaction, cfg PpConfig } return } - -type timeCount struct { - when time.Time - count int -} - -type throttler struct { - times []timeCount - - next int - - // target x per-second - xps float64 - - // rough proportional + integral control - iterm float64 -} - -func newThrottler(windowSize int, targetPerSecond float64) *throttler { - return &throttler{times: make([]timeCount, windowSize), xps: targetPerSecond, iterm: 0.0} -} - -func (t *throttler) maybeSleep(count int) { - now := time.Now() - t.times[t.next].when = now - t.times[t.next].count = count - nn := (t.next + 1) % len(t.times) - t.next = nn - if t.times[nn].when.IsZero() { - return - } - dt := now.Sub(t.times[nn].when) - countsum := 0 - for i, tc := range t.times { - if i != nn { - countsum += tc.count - } - } - rate := float64(countsum) / dt.Seconds() - if rate > t.xps { - // rate too high, slow down - desiredSeconds := float64(countsum) / t.xps - extraSeconds := desiredSeconds - dt.Seconds() - t.iterm += 0.1 * extraSeconds / float64(len(t.times)) - util.NanoSleep(time.Duration(1000000000.0 * (extraSeconds + t.iterm) / float64(len(t.times)))) - - } else { - t.iterm *= 0.95 - } -} diff --git a/test/commandandcontrol/cc_agent/component/pingPongComponent.go b/test/commandandcontrol/cc_agent/component/pingPongComponent.go index 674c476f91..5bb4be8907 100644 --- a/test/commandandcontrol/cc_agent/component/pingPongComponent.go +++ b/test/commandandcontrol/cc_agent/component/pingPongComponent.go @@ -124,7 +124,7 @@ func (componentInstance *PingPongComponentInstance) startPingPong(cfg *pingpong. // Initialize accounts if necessary, this may take several attempts while previous transactions to settle for i := 0; i < 10; i++ { - err = pps.PrepareAccounts(ac) + err = pps.PrepareAccounts(&ac) if err == nil { break } else { @@ -143,7 +143,7 @@ func (componentInstance *PingPongComponentInstance) startPingPong(cfg *pingpong. componentInstance.ctx, componentInstance.cancelFunc = context.WithCancel(context.Background()) // Kick off the real processing - go pps.RunPingPong(componentInstance.ctx, ac) + go pps.RunPingPong(componentInstance.ctx, &ac) return } diff --git a/test/scripts/tps.py b/test/scripts/tps.py index 834cdbb7a4..103f83a7e6 100644 --- a/test/scripts/tps.py +++ b/test/scripts/tps.py @@ -37,14 +37,13 @@ def algod_client_for_dir(algorand_data, headers=None): def get_blockinfo_tps(algod, rounds=10): status = algod.status() - rounds = 10 ba = msgpack.loads(algod.block_info(status['last-round']-rounds, response_format='msgpack'), strict_map_key=False) bb = msgpack.loads(algod.block_info(status['last-round'], response_format='msgpack'), strict_map_key=False) ra = ba['block']['rnd'] rb = bb['block']['rnd'] assert(rb - ra == rounds) - tca = ba['block']['tc'] - tcb = bb['block']['tc'] + tca = ba['block'].get('tc',0) + tcb = bb['block'].get('tc',0) tsa = ba['block']['ts'] tsb = bb['block']['ts'] dt = tsb-tsa @@ -54,11 +53,57 @@ def get_blockinfo_tps(algod, rounds=10): logger.debug('(b[%d].TxnCounter %d) - (b[%d].TxnCounter %d) = %d txns', ra, tca, rb, tcb, dtxn) return tps +def mins(a,b): + if a is None: + return b + if b is None: + return a + return min(a,b) + +def maxs(a,b): + if a is None: + return b + if b is None: + return a + return max(a,b) + +def get_blockinfo_tps_with_types(algod, rounds=10, adir=''): + status = algod.status() + lastround = status['last-round'] + cround = lastround - rounds + bytxtype = {} + mintime = None + maxtime = None + mintc = None + maxtc = 0 + while cround <= lastround: + ba = msgpack.loads(algod.block_info(cround, response_format='msgpack'), strict_map_key=False) + #logger.debug('block keys %s', sorted(ba['block'].keys())) + mintime = mins(mintime, ba['block']['ts']) + maxtime = maxs(maxtime, ba['block']['ts']) + mintc = mins(mintc, ba['block'].get('tc')) + maxtc = maxs(maxtc, ba['block'].get('tc',0)) + txns = ba['block'].get('txns',[]) + for stxib in txns: + #logger.debug('txn keys %s', sorted(stxib['txn'].keys())) + tt = stxib['txn']['type'] + bytxtype[tt] = bytxtype.get(tt, 0) + 1 + cround += 1 + summary = [(count, tt) for tt,count in bytxtype.items()] + summary.sort(reverse=True) + print(summary) + dt = maxtime-mintime + dtxn = maxtc-mintc + logger.debug('%s ts=[%d..%d] (%ds), tc=[%d..%d] (%d txn)', adir, mintime, maxtime, dt, mintc, maxtc, dtxn) + tps = dtxn/dt + return tps + def main(): ap = argparse.ArgumentParser() ap.add_argument('data_dirs', nargs='*', help='list paths to algorand datadirs to grab heap profile from') ap.add_argument('-d', dest='algorand_data') - ap.add_argument('-r', '--rounds', type=int, help='number of rounds to calculate over') + ap.add_argument('-T', '--types', default=False, action='store_true', help='show txn types counts within round range') + ap.add_argument('-r', '--rounds', type=int, default=10, help='number of rounds to calculate over') ap.add_argument('--verbose', default=False, action='store_true') args = ap.parse_args() @@ -70,11 +115,21 @@ def main(): datadirs = args.data_dirs if args.algorand_data: datadirs = datadirs + [args.algorand_data] + if not datadirs: + ad = os.getenv('ALGORAND_DATA') + if ad: + datadirs.append(ad) + if not datadirs: + sys.stderr.write('no data dirs specified (positional file, -d AD, $ALGORAND_DATA)') + sys.exit(1) for adir in datadirs: algod = algod_client_for_dir(adir) - tps = get_blockinfo_tps(algod, rounds=args.rounds) - print('{:5.1f}\t{}'.format(tps, adir)) + if args.types: + tps = get_blockinfo_tps_with_types(algod, rounds=args.rounds) + else: + tps = get_blockinfo_tps(algod, rounds=args.rounds) + print('{:5.1f} TPS\t{}'.format(tps, adir)) return 0 if __name__ == '__main__':