From 6680d3e14ee876b4c2fca2e396821923cd473bd3 Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Sat, 11 Jan 2025 00:36:35 +0100 Subject: [PATCH] [LoadTest] Revamp load test suite (#1002) ## Summary This pull request includes several updates to the load-testing configuration and test files, as well as improvements to the relay stress test suite. The most important changes include updating funding account addresses, modifying gateway configurations, and refactoring various test functions for better accuracy. ## Type of change Select one or more from the following: - [ ] New feature, functionality or library - [ ] Consensus breaking; add the `consensus-breaking` label if so. See #791 for details - [x] Bug fix - [ ] Code health or cleanup - [ ] Documentation - [ ] Other (specify) ## Testing - [x] **Unit Tests**: `make go_develop_and_test` - [x] **LocalNet E2E Tests**: `make test_e2e` - [ ] **DevNet E2E Tests**: Add the `devnet-test-e2e` label to the PR. ## Sanity Checklist - [x] I have tested my changes using the available tooling - [ ] I have commented my code - [x] I have performed a self-review of my own code; both comments & source code - [ ] I create and reference any new tickets, if applicable - [ ] I have left TODOs throughout the codebase, if applicable --- Tiltfile | 3 + .../config/load_test_manifest_reader.go | 10 +- load-testing/loadtest_manifest_example.yaml | 8 +- load-testing/loadtest_manifest_localnet.yaml | 14 +- ...est_manifest_localnet_single_supplier.yaml | 14 +- load-testing/tests/relays_stress.feature | 10 +- .../tests/relays_stress_helpers_test.go | 586 +++++++++++------- ... => relays_stress_single_supplier.feature} | 13 +- load-testing/tests/relays_stress_test.go | 180 ++---- makefiles/tests.mk | 4 +- pkg/client/interface.go | 5 +- pkg/client/query/appquerier.go | 10 + testutil/delays/waitall.go | 23 + testutil/events/filter.go | 24 + testutil/keeper/tokenomics.go | 5 + x/proof/types/application_query_client.go | 6 + x/proof/types/expected_keepers.go | 1 + x/tokenomics/keeper/settle_pending_claims.go | 1 + x/tokenomics/types/expected_keepers.go | 1 + 19 files changed, 553 insertions(+), 365 deletions(-) rename load-testing/tests/{relays_stress_single_suppier.feature => relays_stress_single_supplier.feature} (51%) create mode 100644 testutil/delays/waitall.go diff --git a/Tiltfile b/Tiltfile index 4c1d74b1c..b765db324 100644 --- a/Tiltfile +++ b/Tiltfile @@ -320,6 +320,9 @@ for x in range(localnet_config["path_gateways"]["count"]): "--set=metrics.serviceMonitor.enabled=" + str(localnet_config["observability"]["enabled"]), "--set=path.mountConfigMaps[0].name=path-config-" + str(actor_number), "--set=path.mountConfigMaps[0].mountPath=/app/config/", + "--set=fullnameOverride=path" + str(actor_number), + "--set=nameOverride=path" + str(actor_number), + "--set=global.serviceAccount.name=path" + str(actor_number), ] if localnet_config["path_local_repo"]["enabled"]: diff --git a/load-testing/config/load_test_manifest_reader.go b/load-testing/config/load_test_manifest_reader.go index db20b21fc..81b94a630 100644 --- a/load-testing/config/load_test_manifest_reader.go +++ b/load-testing/config/load_test_manifest_reader.go @@ -22,7 +22,7 @@ type LoadTestManifestYAML struct { // IsEphemeralChain is a flag that indicates whether the test is expected to be // run on LocalNet or long-living remote chain (i.e. TestNet/DevNet). IsEphemeralChain bool `yaml:"is_ephemeral_chain"` - TestNetNode string `yaml:"testnet_node"` + RPCNode string `yaml:"rpc_node"` ServiceId string `yaml:"service_id"` Suppliers []ProvisionedActorConfig `yaml:"suppliers"` Gateways []ProvisionedActorConfig `yaml:"gateways"` @@ -67,6 +67,10 @@ func validatedEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTestM return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty funding account address") } + if len(manifest.RPCNode) == 0 { + return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty rpc node url") + } + for _, gateway := range manifest.Gateways { if len(gateway.Address) == 0 { return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty gateway address") @@ -107,8 +111,8 @@ func validatedNonEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTe return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("suppliers entry forbidden") } - if len(manifest.TestNetNode) == 0 { - return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty testnet node url") + if len(manifest.RPCNode) == 0 { + return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty rpc node url") } if len(manifest.ServiceId) == 0 { diff --git a/load-testing/loadtest_manifest_example.yaml b/load-testing/loadtest_manifest_example.yaml index 840fafdab..47c33f07c 100644 --- a/load-testing/loadtest_manifest_example.yaml +++ b/load-testing/loadtest_manifest_example.yaml @@ -2,16 +2,16 @@ # It is intended to target a remote environment, such as a devnet or testnet. is_ephemeral_chain: false -# testnet_node is the URL of the node that the load test will use to query the +# rpc_node is the URL of the RPC node that the load test will use to query the # chain and submit transactions. -testnet_node: https://devnet-sophon-validator-rpc.poktroll.com +rpc_node: https://devnet-sophon-validator-rpc.poktroll.com # The service ID to request relays from. service_id: "anvil" # The address of the account that will be used to fund the application accounts -# so that they can stake on the network. -funding_account_address: pokt1awtlw5sjmw2f5lgj8ekdkaqezphgz88rdk93sk # address for faucet account +# so that they can stake on the local network. +funding_account_address: pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw # address for faucet account # In non-ephemeral chains, the gateways are identified by their address. gateways: diff --git a/load-testing/loadtest_manifest_localnet.yaml b/load-testing/loadtest_manifest_localnet.yaml index 763771576..b2afa88f8 100644 --- a/load-testing/loadtest_manifest_localnet.yaml +++ b/load-testing/loadtest_manifest_localnet.yaml @@ -3,12 +3,16 @@ is_ephemeral_chain: true # This should be `true` for LocalNet as it is an ephemeral network +# rpc_node is the URL of the RPC node that the load test will use to query the +# chain and submit transactions. +rpc_node: http://localhost:26657 + # The service ID to use for the load test. service_id: anvil # The address of the account that will be used to fund the application, -# gateway and supplier accounts so that they can stake on the network. -funding_account_address: pokt1awtlw5sjmw2f5lgj8ekdkaqezphgz88rdk93sk # address for faucet account +# gateway and supplier accounts so that they can stake on the local network. +funding_account_address: pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw # address for faucet account # List of pre-provisioned suppliers used for load testing. # These suppliers will be progressively staked during the load test, according @@ -48,12 +52,12 @@ gateways: # Gateway 1; http://localhost:10350/r/gateway1/overview - address: pokt15vzxjqklzjtlz7lahe8z2dfe9nm5vxwwmscne4 - exposed_url: http://anvil.localhost/v1:3000 # The gateway url that the user sends relays to (e.g. curl) + exposed_url: http://localhost:3000/v1/ # The gateway url that the user sends relays to (e.g. curl) # Gateway 2; http://localhost:10350/r/gateway2/overview - address: pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz - exposed_url: http://anvil.localhost/v1:3001 + exposed_url: http://localhost:3001/v1/ # Gateway 3; http://localhost:10350/r/gateway3/overview - address: pokt1zhmkkd0rh788mc9prfq0m2h88t9ge0j83gnxya - exposed_url: http://anvil.localhost/v1:3002 + exposed_url: http://localhost:3002/v1/ diff --git a/load-testing/loadtest_manifest_localnet_single_supplier.yaml b/load-testing/loadtest_manifest_localnet_single_supplier.yaml index c455eaa8f..3e2112cea 100644 --- a/load-testing/loadtest_manifest_localnet_single_supplier.yaml +++ b/load-testing/loadtest_manifest_localnet_single_supplier.yaml @@ -3,12 +3,16 @@ is_ephemeral_chain: true # This should be `true` for LocalNet as it is an ephemeral network +# rpc_node is the URL of the RPC node that the load test will use to query the +# chain and submit transactions. +rpc_node: http://localhost:26657 + # The service ID to use for the load test. service_id: anvil # The address of the account that will be used to fund the application, -# gateway and supplier accounts so that they can stake on the network. -funding_account_address: pokt1awtlw5sjmw2f5lgj8ekdkaqezphgz88rdk93sk # address for faucet account +# gateway and supplier accounts so that they can stake on the local network. +funding_account_address: pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw # address for faucet account # List of pre-provisioned suppliers used for load testing. # These suppliers will be progressively staked during the load test, according @@ -40,12 +44,12 @@ gateways: # Gateway 1; http://localhost:10350/r/gateway1/overview - address: pokt15vzxjqklzjtlz7lahe8z2dfe9nm5vxwwmscne4 - exposed_url: http://anvil.localhost/v1:3000 # The gateway url that the user sends relays to (e.g. curl) + exposed_url: http://localhost:3000/v1/ # The gateway url that the user sends relays to (e.g. curl) # Gateway 2; http://localhost:10350/r/gateway2/overview - address: pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz - exposed_url: http://anvil.localhost/v1:3001 + exposed_url: http://localhost:3001/v1/ # Gateway 3; http://localhost:10350/r/gateway3/overview - address: pokt1zhmkkd0rh788mc9prfq0m2h88t9ge0j83gnxya - exposed_url: http://anvil.localhost/v1:3002 + exposed_url: http://localhost:3002/v1/ diff --git a/load-testing/tests/relays_stress.feature b/load-testing/tests/relays_stress.feature index 5aa3f63a0..d6981c95e 100644 --- a/load-testing/tests/relays_stress.feature +++ b/load-testing/tests/relays_stress.feature @@ -14,4 +14,12 @@ Feature: Loading gateway server with relays | gateway | 1 | 10 | 3 | | supplier | 1 | 10 | 3 | When a load of concurrent relay requests are sent from the applications - Then the correct pairs count of claim and proof messages should be committed on-chain \ No newline at end of file + Then the number of failed relay requests is "0" + # TODO_FOLLOWUP(@red-0ne): Implement the following steps + # Then "0" over servicing events are observed + # And "0" slashing events are observed + # And "0" expired claim events are observed + # And there is as many reimbursement requests as the number of settled claims + # And the number of claims submitted and claims settled is the same + # And the number of proofs submitted and proofs required is the same + # And the actors onchain balances are as expected \ No newline at end of file diff --git a/load-testing/tests/relays_stress_helpers_test.go b/load-testing/tests/relays_stress_helpers_test.go index aa55dcf49..88a5cae30 100644 --- a/load-testing/tests/relays_stress_helpers_test.go +++ b/load-testing/tests/relays_stress_helpers_test.go @@ -5,10 +5,11 @@ package tests import ( "context" "fmt" + "io" "net/http" - "net/url" "os" "path/filepath" + "slices" "strings" "sync" "testing" @@ -17,6 +18,9 @@ import ( "cosmossdk.io/depinject" "cosmossdk.io/math" "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/libs/json" + cmtcoretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" sdkclient "github.com/cosmos/cosmos-sdk/client" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" @@ -25,23 +29,26 @@ import ( "github.com/cosmos/cosmos-sdk/x/authz" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" govtypes "github.com/cosmos/cosmos-sdk/x/gov/types" + "github.com/cosmos/gogoproto/proto" "github.com/regen-network/gocuke" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "github.com/pokt-network/poktroll/load-testing/config" "github.com/pokt-network/poktroll/pkg/client" - "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" - "github.com/pokt-network/poktroll/pkg/client/tx" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/pkg/sync2" - testsession "github.com/pokt-network/poktroll/testutil/session" + testdelays "github.com/pokt-network/poktroll/testutil/delays" + "github.com/pokt-network/poktroll/testutil/events" "github.com/pokt-network/poktroll/testutil/testclient" - "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" + "github.com/pokt-network/poktroll/testutil/testclient/testblock" apptypes "github.com/pokt-network/poktroll/x/application/types" gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" + prooftypes "github.com/pokt-network/poktroll/x/proof/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" suppliertypes "github.com/pokt-network/poktroll/x/supplier/types" + tokenomicstypes "github.com/pokt-network/poktroll/x/tokenomics/types" ) // actorLoadTestIncrementPlans is a struct that holds the parameters for incrementing @@ -75,29 +82,53 @@ type actorLoadTestIncrementPlan struct { maxActorCount int64 } -// setupTxEventListeners sets up the transaction event listeners to observe the -// transactions committed on-chain. -func (s *relaysSuite) setupTxEventListeners() { - eventsQueryClient := testeventsquery.NewLocalnetClient(s.TestingT.(*testing.T)) +// setupEventListeners sets up the event listeners for the relays suite. +// It listens to both tx and block events to keep track of the events that are happening +// onchain. +func (s *relaysSuite) setupEventListeners(rpcNode string) { + // Set up the blockClient that will be notifying the suite about the committed blocks. + eventsObs, eventsObsCh := channel.NewObservable[[]types.Event]() + s.committedEventsObs = eventsObs + + extractBlockEvents := func(ctx context.Context, block client.Block) { + // Query the block results endpoint for each observed block to get the tx and block events. + // Ref: https://docs.cometbft.com/main/rpc/#/Info/block_results + blockResultsUrl := fmt.Sprintf("%s/block_results?height=%d", rpcNode, block.Height()) + blockResultsResp, err := http.DefaultClient.Get(blockResultsUrl) + require.NoError(s, err) - deps := depinject.Supply(eventsQueryClient) - eventsReplayClient, err := events.NewEventsReplayClient( - s.ctx, - deps, - newTxEventSubscriptionQuery, - tx.UnmarshalTxResult, - eventsReplayClientBufferSize, - ) - require.NoError(s, err) + defer blockResultsResp.Body.Close() + + blockResultsRespBz, err := io.ReadAll(blockResultsResp.Body) + require.NoError(s, err) + + var rpcResponse rpctypes.RPCResponse + err = json.Unmarshal(blockResultsRespBz, &rpcResponse) + require.NoError(s, err) + + var blockResults cmtcoretypes.ResultBlockResults + err = json.Unmarshal(rpcResponse.Result, &blockResults) + require.NoError(s, err) - // Map the eventsReplayClient.EventsSequence which is a replay observable - // to a regular observable to avoid replaying txResults from old blocks. - s.newTxEventsObs = channel.Map( + numEvents := len(blockResults.TxsResults) + len(blockResults.FinalizeBlockEvents) + events := make([]types.Event, 0, numEvents) + + // Flatten all tx result events and block event results into one slice. + for _, txResult := range blockResults.TxsResults { + events = append(events, txResult.Events...) + } + + events = append(events, blockResults.FinalizeBlockEvents...) + + s.latestBlock = block + eventsObsCh <- events + } + + s.blockClient = testblock.NewLocalnetClient(s.ctx, s.TestingT.(*testing.T)) + channel.ForEach( s.ctx, - eventsReplayClient.EventsSequence(s.ctx), - func(ctx context.Context, txResult *types.TxResult) (*types.TxResult, bool) { - return txResult, false - }, + s.blockClient.CommittedBlocksSequence(s.ctx), + extractBlockEvents, ) } @@ -173,9 +204,9 @@ func (s *relaysSuite) mapSessionInfoForLoadTestDurationFn( sessionInfo := &sessionInfoNotif{ blockHeight: blockHeight, - sessionNumber: testsession.GetSessionNumberWithDefaultParams(blockHeight), - sessionStartBlockHeight: testsession.GetSessionStartHeightWithDefaultParams(blockHeight), - sessionEndBlockHeight: testsession.GetSessionEndHeightWithDefaultParams(blockHeight), + sessionNumber: sharedtypes.GetSessionNumber(s.sharedParams, blockHeight), + sessionStartBlockHeight: sharedtypes.GetSessionStartHeight(s.sharedParams, blockHeight), + sessionEndBlockHeight: sharedtypes.GetSessionEndHeight(s.sharedParams, blockHeight), } infoLogger := logger.Info(). @@ -231,10 +262,12 @@ func (s *relaysSuite) mapSessionInfoForLoadTestDurationFn( testProgressBlocksRelativeToTestStartHeight, s.relayLoadDurationBlocks, ) - if sessionInfo.blockHeight == sessionInfo.sessionEndBlockHeight { - newSessionsCount := len(s.activeApplications) * len(s.activeSuppliers) - s.expectedClaimsAndProofsCount = s.expectedClaimsAndProofsCount + newSessionsCount - } + logger.Info().Msgf( + "Relays sent: %d; Success: %d; Failed: %d", + s.numRelaysSent.Load(), + s.successfulRelays.Load(), + s.failedRelays.Load(), + ) // If the current block is the start of any new session, activate the prepared // actors to be used in the current session. @@ -457,16 +490,17 @@ func (s *relaysSuite) mapSessionInfoWhenStakingNewSuppliersAndGatewaysFn() chann // For each notification received, it waits for the new actors' staking/funding // txs to be committed before sending staking & delegation txs for new applications. func (s *relaysSuite) mapStakingInfoWhenStakingAndDelegatingNewApps( - _ context.Context, + ctx context.Context, notif *stakingInfoNotif, ) (*stakingInfoNotif, bool) { // Ensure that new gateways and suppliers are staked. // Ensure that new applications are funded and have an account entry on-chain // so that they can stake and delegate in the next block. - txResults := s.waitForTxsToBeCommitted() - s.ensureFundedActors(txResults, notif.newApps) - s.ensureStakedActors(txResults, EventActionMsgStakeGateway, notif.newGateways) - s.ensureStakedActors(txResults, EventActionMsgStakeSupplier, notif.newSuppliers) + testdelays.WaitAll( + func() { s.ensureStakedActors(ctx, notif.newSuppliers) }, + func() { s.ensureStakedActors(ctx, notif.newGateways) }, + func() { s.ensureFundedActors(ctx, notif.newApps) }, + ) // Update the list of staked suppliers. s.activeSuppliers = append(s.activeSuppliers, notif.newSuppliers...) @@ -627,11 +661,13 @@ func (s *relaysSuite) createApplicationAccount( // cost, and the block duration. func (s *relaysSuite) getAppFundingAmount(currentBlockHeight int64) sdk.Coin { currentTestDuration := s.testStartHeight + s.relayLoadDurationBlocks - currentBlockHeight + // Compute the cost of all relays throughout the test duration. + totalRelayCostDuringTestUPOKT := s.relayRatePerApp * s.relayCoinAmountCost * currentTestDuration * blockDurationSec // Multiply by 2 to make sure the application does not run out of funds // based on the number of relays it needs to send. Theoretically, `+1` should // be enough, but probabilistic and time based mechanisms make it hard // to predict exactly. - appFundingAmount := s.relayRatePerApp * s.relayCoinAmountCost * currentTestDuration * blockDuration * 2 + appFundingAmount := math.Max(totalRelayCostDuringTestUPOKT, s.appParams.MinStake.Amount.Int64()*2) return sdk.NewCoin("upokt", math.NewInt(appFundingAmount)) } @@ -724,7 +760,7 @@ func (plan *actorLoadTestIncrementPlan) shouldIncrementActorCount( return false } - initialSessionNumber := testsession.GetSessionNumberWithDefaultParams(startBlockHeight) + initialSessionNumber := sharedtypes.GetSessionNumber(sharedParams, startBlockHeight) actorSessionIncRate := plan.blocksPerIncrement / int64(sharedParams.GetNumBlocksPerSession()) nextSessionNumber := sessionInfo.sessionNumber + 1 - initialSessionNumber isSessionStartHeight := sessionInfo.blockHeight == sessionInfo.sessionStartBlockHeight @@ -750,7 +786,7 @@ func (plan *actorLoadTestIncrementPlan) shouldIncrementSupplierCount( return false } - initialSessionNumber := testsession.GetSessionNumberWithDefaultParams(startBlockHeight) + initialSessionNumber := sharedtypes.GetSessionNumber(sharedParams, startBlockHeight) supplierSessionIncRate := plan.blocksPerIncrement / int64(sharedParams.GetNumBlocksPerSession()) nextSessionNumber := sessionInfo.sessionNumber + 1 - initialSessionNumber isSessionEndHeight := sessionInfo.blockHeight == sessionInfo.sessionEndBlockHeight @@ -798,6 +834,9 @@ func (s *relaysSuite) addPendingStakeSupplierMsg(supplier *accountInfo) { RpcType: sharedtypes.RPCType_JSON_RPC, }, }, + RevShare: []*sharedtypes.ServiceRevenueShare{ + {Address: supplier.address, RevSharePercentage: 100}, + }, }, }, )) @@ -943,7 +982,9 @@ func (s *relaysSuite) sendPendingMsgsTx(actor *accountInfo) { err := txBuilder.SetMsgs(actor.pendingMsgs...) require.NoError(s, err) - txBuilder.SetTimeoutHeight(uint64(s.latestBlock.Height() + 1)) + // Set the transaction timeout height to 2 blocks beyond the current block height. + // This ensures the transaction won't be rejected if the next block commit is imminent. + txBuilder.SetTimeoutHeight(uint64(s.latestBlock.Height() + 2)) txBuilder.SetGasLimit(690000042) accAddress := sdk.MustAccAddressFromBech32(actor.address) @@ -973,33 +1014,6 @@ func (s *relaysSuite) sendPendingMsgsTx(actor *accountInfo) { }() } -// waitForTxsToBeCommitted waits for transactions to be observed on-chain. -// It is used to ensure that the transactions are committed before taking -// dependent actions. -func (s *relaysSuite) waitForTxsToBeCommitted() (txResults []*types.TxResult) { - ctx, cancel := context.WithCancel(s.ctx) - defer cancel() - - ch := s.newTxEventsObs.Subscribe(ctx).Ch() - for { - txResult := <-ch - txResults = append(txResults, txResult) - - // The number of transactions to be observed is not available in the TxResult - // event, so this number is taken from the last block event. - // The block received from s.latestBlock may be the previous one, it is - // necessary to wait until the block matching the txResult height is received - // in order to get the right number of transaction events to collect. - numTxs := s.waitUntilLatestBlockHeightEquals(txResult.Height) - - // If all transactions are observed, break the loop. - if len(txResults) == numTxs { - break - } - } - return txResults -} - // waitUntilLatestBlockHeightEquals blocks until s.latestBlock.Height() equals the targetHeight. // NB: s.latestBlock is updated asynchronously via a subscription to the block client observable. func (s *relaysSuite) waitUntilLatestBlockHeightEquals(targetHeight int64) int { @@ -1034,161 +1048,215 @@ func (s *relaysSuite) sendRelay(iteration uint64, relayPayload string) (appAddre gateway := s.activeGateways[iteration%uint64(len(s.activeGateways))] application := s.activeApplications[iteration%uint64(len(s.activeApplications))] - gatewayUrl, err := url.Parse(s.gatewayUrls[gateway.address]) - require.NoError(s, err) - - // Include the application address in the query to the gateway. - query := gatewayUrl.Query() - query.Add("applicationAddr", application.address) - query.Add("relayCount", fmt.Sprintf("%d", iteration)) - gatewayUrl.RawQuery = query.Encode() - - // Use the pre-defined service ID that all application and suppliers are staking for. - gatewayUrl.Path = testedServiceId - // TODO_MAINNET: Capture the relay response to check for failing relays. // Send the relay request within a goroutine to avoid blocking the test batches // when suppliers or gateways are unresponsive. - go func(gwURL, payload string) { - _, err = http.DefaultClient.Post( - gwURL, - "application/json", - strings.NewReader(payload), - ) + sendRelayRequest := func(gatewayURL, appAddr, payload string) { + req, err := http.NewRequest("POST", gatewayURL, strings.NewReader(payload)) + + // TODO_TECHDEBT(red-0ne): Use 'app-address' instead of 'X-App-Address' once PATH Gateway + // deprecates the X-App-Address header. + // This is needed by the PATH Gateway's trusted mode to identify the application + // that is sending the relay request. + req.Header.Add("X-App-Address", appAddr) + req.Header.Add("target-service-id", "anvil") + res, err := http.DefaultClient.Do(req) require.NoError(s, err) - }(gatewayUrl.String(), relayPayload) + + if res.StatusCode == http.StatusOK { + s.successfulRelays.Add(1) + } else { + s.failedRelays.Add(1) + } + } + + gatewayURL := s.gatewayUrls[gateway.address] + go sendRelayRequest(gatewayURL, application.address, relayPayload) return application.address, gateway.address } // ensureFundedActors checks if the actors are funded by observing the transfer events // in the transactions results. -func (s *relaysSuite) ensureFundedActors( - txResults []*types.TxResult, - actors []*accountInfo, -) { - for _, actor := range actors { - actorFunded := false - for _, txResult := range txResults { - for _, event := range txResult.Result.Events { - // Skip non-relevant events. - if event.Type != "transfer" { - continue - } - - attrs := event.Attributes - // Check if the actor is the recipient of the transfer event. - if actorFunded = hasEventAttr(attrs, "recipient", actor.address); actorFunded { - break - } +func (s *relaysSuite) ensureFundedActors(ctx context.Context, actors []*accountInfo) { + if len(actors) == 0 { + s.Logf("No actors to fund") + return + } + + fundedActors := make(map[string]struct{}) + actorsAddrs := make([]string, len(actors)) + for i, actor := range actors { + actorsAddrs[i] = actor.address + } + + // Add 1 second to the block duration to make sure the deadline is after the next block. + deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1)) + ctx, cancel := context.WithDeadline(ctx, deadline) + channel.ForEach(ctx, s.committedEventsObs, func(ctx context.Context, events []types.Event) { + for _, event := range events { + // In the context of ensuring the actors are funded, only the transfer events + // are relevant; filtering out the other events. + if event.GetType() != "transfer" { + continue + } + + attrs := event.GetAttributes() + // Check if the actor is the recipient of the transfer event. + fundedActorAddr, ok := getEventAttr(attrs, "recipient") + if !ok { + continue } - // If the actor is funded, no need to check the other transactions. - if actorFunded { - break + if !slices.Contains(actorsAddrs, fundedActorAddr) { + continue } + + fundedActors[fundedActorAddr] = struct{}{} } - // If no transfer event is found for the actor, the test is canceled. - if !actorFunded { - s.logAndAbortTest(txResults, "actor not funded") - return + // Cancel this scope once all expected actors are successfully funded before + // the deadline was reached. + if allActorsFunded(actors, fundedActors) { + cancel() } + }) + + <-ctx.Done() + if !allActorsFunded(actors, fundedActors) { + s.logAndAbortTest("at least one actor was not funded successfully") } } +// allActorsFunded checks if all the expected actors are funded. +// An error is returned if any (at least one) of the expected actors was not funded. +func allActorsFunded(expectedActors []*accountInfo, fundedActors map[string]struct{}) bool { + for _, actor := range expectedActors { + if _, ok := fundedActors[actor.address]; !ok { + return false + } + } + + return true +} + // ensureStakedActors checks if the actors are staked by observing the message events // in the transactions results. func (s *relaysSuite) ensureStakedActors( - txResults []*types.TxResult, - msg string, + ctx context.Context, actors []*accountInfo, ) { - for _, actor := range actors { - actorStaked := false - for _, txResult := range txResults { - for _, event := range txResult.Result.Events { - // Skip non-relevant events. - if event.Type != "message" { - continue - } - - attrs := event.Attributes - // Check if the actor is the sender of the message event. - if hasEventAttr(attrs, "action", msg) && hasEventAttr(attrs, "sender", actor.address) { - actorStaked = true - break - } - } + if len(actors) == 0 { + return + } - // If the actor is staked, no need to check the other transactions. - if actorStaked { - break + stakedActors := make(map[string]struct{}) + + // Add 1 second to the block duration to make sure the deadline is after the next block. + deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1)) + ctx, cancel := context.WithDeadline(ctx, deadline) + typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs) + channel.ForEach(ctx, typedEventsObs, func(ctx context.Context, blockEvents []proto.Message) { + for _, event := range blockEvents { + switch e := event.(type) { + case *suppliertypes.EventSupplierStaked: + stakedActors[e.Supplier.GetOperatorAddress()] = struct{}{} + case *gatewaytypes.EventGatewayStaked: + stakedActors[e.Gateway.GetAddress()] = struct{}{} + case *apptypes.EventApplicationStaked: + stakedActors[e.Application.GetAddress()] = struct{}{} } } - // If no message event is found for the actor, log the transaction results - // and cancel the test. - if !actorStaked { - s.logAndAbortTest(txResults, fmt.Sprintf("actor not staked: %s", actor.address)) - return + // Cancel this scope once all expected actors are successfully staked before + // the deadline was reached. + if allActorsStaked(actors, stakedActors) { + cancel() + } + }) + + <-ctx.Done() + if !allActorsStaked(actors, stakedActors) { + s.logAndAbortTest("at least one actor was not staked successfully") + return + } +} + +// allActorsStaked checks if all the expected actors are staked. +// An error is returned if any of the expected actors was not staked. +func allActorsStaked(expectedActors []*accountInfo, stakedActors map[string]struct{}) bool { + for _, actor := range expectedActors { + if _, ok := stakedActors[actor.address]; !ok { + return false } } + + return true } // ensureDelegatedActors checks if the actors are delegated by observing the // delegation events in the transactions results. func (s *relaysSuite) ensureDelegatedApps( - txResults []*types.TxResult, + ctx context.Context, applications, gateways []*accountInfo, ) { - for _, application := range applications { - numDelegatees := 0 - for _, txResult := range txResults { - for _, event := range txResult.Result.Events { - // Skip non-EventDelegation events. - if event.Type != EventTypeRedelegation { - continue - } - - attrs := event.Attributes - appAddr := fmt.Sprintf("%q", application.address) - // Skip the event if the application is not the delegator. - if !hasEventAttr(attrs, "app_address", appAddr) { - break - } - - // Check if the application is delegated to each of the gateways. - for _, gateway := range gateways { - gwAddr := fmt.Sprintf("%q", gateway.address) - if hasEventAttr(attrs, "gateway_address", gwAddr) { - numDelegatees++ - break - } - } + if len(applications) == 0 || len(gateways) == 0 { + return + } + + appsToGateways := make(map[string][]string) + + deadline := time.Now().Add(time.Second * time.Duration(blockDurationSec+1)) + ctx, cancel := context.WithDeadline(ctx, deadline) + typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs) + channel.ForEach(ctx, typedEventsObs, func(ctx context.Context, blockEvents []proto.Message) { + for _, event := range blockEvents { + redelegationEvent, ok := event.(*apptypes.EventRedelegation) + if ok { + app := redelegationEvent.GetApplication() + appsToGateways[app.GetAddress()] = app.GetDelegateeGatewayAddresses() } } - // If the number of delegatees is not equal to the number of gateways, - // the test is canceled. - if numDelegatees != len(gateways) { - s.logAndAbortTest(txResults, "applications not delegated to all gateways") - return + // Cancel this scope once all expected applications are successfully delegated + // to the expected gateways before the deadline was reached. + if allAppsDelegatedToAllGateways(applications, gateways, appsToGateways) { + cancel() } + }) + + <-ctx.Done() + if !allAppsDelegatedToAllGateways(applications, gateways, appsToGateways) { + s.logAndAbortTest("applications not delegated to all gateways") + return } } +// allAppsDelegatedToAllGateways checks if all applications are delegated to all gateways. +func allAppsDelegatedToAllGateways( + applications, gateways []*accountInfo, + appsToGateways map[string][]string, +) bool { + for _, app := range applications { + if _, ok := appsToGateways[app.address]; !ok { + return false + } + + for _, gateway := range gateways { + if !slices.Contains(appsToGateways[app.address], gateway.address) { + return false + } + } + } + + return true +} + // getRelayCost fetches the relay cost from the tokenomics module. func (s *relaysSuite) getRelayCost() int64 { - // Set up the tokenomics client. - flagSet := testclient.NewLocalnetFlagSet(s) - clientCtx := testclient.NewLocalnetClientCtx(s, flagSet) - sharedClient := sharedtypes.NewQueryClient(clientCtx) - - res, err := sharedClient.Params(s.ctx, &sharedtypes.QueryParamsRequest{}) - require.NoError(s, err) + relayCost := s.testedService.ComputeUnitsPerRelay * s.sharedParams.ComputeUnitsToTokensMultiplier - return int64(res.Params.ComputeUnitsToTokensMultiplier) + return int64(relayCost) } // getProvisionedActorsCurrentStakedAmount fetches the current stake amount of @@ -1242,15 +1310,15 @@ func (s *relaysSuite) activatePreparedActors(notif *sessionInfoNotif) { } } -// hasEventAttr checks if the event attributes contain a given key-value pair. -func hasEventAttr(attributes []types.EventAttribute, key, value string) bool { +// getEventAttr returns the event attribute value corresponding to the provided key. +func getEventAttr(attributes []types.EventAttribute, key string) (value string, found bool) { for _, attribute := range attributes { - if attribute.Key == key && attribute.Value == value { - return true + if attribute.Key == key { + return attribute.Value, true } } - return false + return "", false } // sendAdjustMaxDelegationsParamTx sends a transaction to adjust the max_delegated_gateways @@ -1258,20 +1326,17 @@ func hasEventAttr(attributes []types.EventAttribute, key, value string) bool { func (s *relaysSuite) sendAdjustMaxDelegationsParamTx(maxGateways int64) { authority := authtypes.NewModuleAddress(govtypes.ModuleName).String() - appMsgUpdateParams := &apptypes.MsgUpdateParams{ + appMsgUpdateMaxDelegatedGatewaysParam := &apptypes.MsgUpdateParam{ Authority: authority, - Params: apptypes.Params{ - // Set the max_delegated_gateways parameter to the number of gateways - // that are currently used in the test. - MaxDelegatedGateways: uint64(maxGateways), - }, + Name: "max_delegated_gateways", + AsType: &apptypes.MsgUpdateParam_AsUint64{AsUint64: uint64(maxGateways)}, } - appMsgUpdateParamsAny, err := codectypes.NewAnyWithValue(appMsgUpdateParams) + appMsgUpdateParamAny, err := codectypes.NewAnyWithValue(appMsgUpdateMaxDelegatedGatewaysParam) require.NoError(s, err) authzExecMsg := &authz.MsgExec{ Grantee: s.fundingAccountInfo.address, - Msgs: []*codectypes.Any{appMsgUpdateParamsAny}, + Msgs: []*codectypes.Any{appMsgUpdateParamAny}, } s.fundingAccountInfo.addPendingMsg(authzExecMsg) @@ -1334,27 +1399,15 @@ func (s *relaysSuite) parseActorLoadTestIncrementPlans( return actorPlans } -// countClaimAndProofs asynchronously counts the number of claim and proof messages -// in the observed transaction events. -func (s *relaysSuite) countClaimAndProofs() { +// forEachSettlement asynchronously captures the settlement events and processes them. +func (s *relaysSuite) forEachSettlement(ctx context.Context) { + typedEventsObs := events.AbciEventsToTypedEvents(ctx, s.committedEventsObs) channel.ForEach( s.ctx, - s.newTxEventsObs, - func(ctx context.Context, txEvent *types.TxResult) { - for _, event := range txEvent.Result.Events { - if event.Type != "message" { - continue - } - - if hasEventAttr(event.Attributes, "action", EventActionMsgCreateClaim) { - s.currentClaimCount++ - } - - if hasEventAttr(event.Attributes, "action", EventActionMsgSubmitProof) { - s.currentProofCount++ - } - - } + typedEventsObs, + func(_ context.Context, _ []proto.Message) { + // TODO_FOLLOWUP(@red-0ne): Capture all settlement related events and use + // them to calculate the expected actor balances. }, ) } @@ -1379,19 +1432,107 @@ func (s *relaysSuite) querySharedParams(queryNodeRPCURL string) { s.sharedParams = sharedParams } +// queryAppParams queries the current on-chain application module parameters for use +// over the duration of the test. +func (s *relaysSuite) queryAppParams(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + appQueryclient, err := query.NewApplicationQuerier(deps) + require.NoError(s, err) + + appParams, err := appQueryclient.GetParams(s.ctx) + require.NoError(s, err) + + s.appParams = appParams +} + +// queryProofParams queries the current on-chain proof module parameters for use +// over the duration of the test. +func (s *relaysSuite) queryProofParams(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + proofQueryclient, err := query.NewProofQuerier(deps) + require.NoError(s, err) + + params, err := proofQueryclient.GetParams(s.ctx) + require.NoError(s, err) + + // The proofQueryclient#GetParams returns an Params interface to avoid a circular + // dependency between the proof module and the query module, so it needs to be casted + // to the actual prooftypes.Params type. + proofParams, ok := params.(*prooftypes.Params) + require.True(s, ok) + + s.proofParams = proofParams +} + +// queryTokenomicsParams queries the current on-chain tokenomics module parameters for use +// over the duration of the test. +func (s *relaysSuite) queryTokenomicsParams(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + // TODO_TECHDEBT(red-0ne): Use tokenomics client querier instead of the grpc client + // once implemented. + var clientConn *grpc.ClientConn + err = depinject.Inject(deps, clientConn) + require.NoError(s, err) + + tokenomicsQuerier := tokenomicstypes.NewQueryClient(clientConn) + res, err := tokenomicsQuerier.Params(s.ctx, &tokenomicstypes.QueryParamsRequest{}) + require.NoError(s, err) + + s.tokenomicsParams = &res.Params +} + +// queryTestedService queries the current service being tested. +func (s *relaysSuite) queryTestedService(queryNodeRPCURL string) { + s.Helper() + + deps := depinject.Supply(s.txContext.GetClientCtx()) + + blockQueryClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL) + require.NoError(s, err) + deps = depinject.Configs(deps, depinject.Supply(blockQueryClient)) + + serviceQueryclient, err := query.NewServiceQuerier(deps) + require.NoError(s, err) + + service, err := serviceQueryclient.GetService(s.ctx, "anvil") + require.NoError(s, err) + + s.testedService = &service +} + // forEachStakedAndDelegatedAppPrepareApp is a ForEachFn that waits for txs which // were broadcast in previous pipeline stages have been committed. It ensures that // new applications were successfully staked and all application actors are delegated // to all gateways. Then it adds the new application actors to the prepared set, to // be activated & used in the next session. -func (s *relaysSuite) forEachStakedAndDelegatedAppPrepareApp(_ context.Context, notif *stakingInfoNotif) { - // Wait for the next block to commit staking and delegation transactions - // and be able to send relay requests evenly distributed across all gateways. - txResults := s.waitForTxsToBeCommitted() - s.ensureStakedActors(txResults, EventActionMsgStakeApplication, notif.newApps) - s.ensureDelegatedApps(txResults, s.activeApplications, notif.newGateways) - s.ensureDelegatedApps(txResults, notif.newApps, notif.newGateways) - s.ensureDelegatedApps(txResults, notif.newApps, s.activeGateways) +func (s *relaysSuite) forEachStakedAndDelegatedAppPrepareApp(ctx context.Context, notif *stakingInfoNotif) { + testdelays.WaitAll( + func() { s.ensureStakedActors(ctx, notif.newApps) }, + func() { s.ensureDelegatedApps(ctx, s.activeApplications, notif.newGateways) }, + func() { s.ensureDelegatedApps(ctx, notif.newApps, notif.newGateways) }, + func() { s.ensureDelegatedApps(ctx, notif.newApps, s.activeGateways) }, + ) // Add the new applications to the list of prepared applications to be activated in // the next session. @@ -1415,9 +1556,9 @@ func (s *relaysSuite) forEachRelayBatchSendBatch(_ context.Context, relayBatchIn relayInterval := time.Second / time.Duration(relaysPerSec) batchWaitGroup := new(sync.WaitGroup) - batchWaitGroup.Add(relaysPerSec * int(blockDuration)) + batchWaitGroup.Add(relaysPerSec * int(blockDurationSec)) - for i := 0; i < relaysPerSec*int(blockDuration); i++ { + for i := 0; i < relaysPerSec*int(blockDurationSec); i++ { iterationTime := relayBatchInfo.nextBatchTime.Add(time.Duration(i+1) * relayInterval) batchLimiter.Go(s.ctx, func() { @@ -1453,17 +1594,12 @@ func (s *relaysSuite) forEachRelayBatchSendBatch(_ context.Context, relayBatchIn batchWaitGroup.Wait() } -func (s *relaysSuite) logAndAbortTest(txResults []*types.TxResult, errorMsg string) { - for _, txResult := range txResults { - if txResult.Result.Log != "" { - logger.Error().Msgf("tx result log: %s", txResult.Result.Log) - } - } +func (s *relaysSuite) logAndAbortTest(errorMsg string) { s.cancelCtx() s.Fatal(errorMsg) } -// populateWithKnownApplications creates a list of gateways based on the gatewayUrls +// populateWithKnownGateways creates a list of gateways based on the gatewayUrls // provided in the test manifest. It is used in non-ephemeral chain tests where the // gateways are not under the test's control and are expected to be already staked. func (s *relaysSuite) populateWithKnownGateways() (gateways []*accountInfo) { diff --git a/load-testing/tests/relays_stress_single_suppier.feature b/load-testing/tests/relays_stress_single_supplier.feature similarity index 51% rename from load-testing/tests/relays_stress_single_suppier.feature rename to load-testing/tests/relays_stress_single_supplier.feature index 253b82615..34d51a6dd 100644 --- a/load-testing/tests/relays_stress_single_suppier.feature +++ b/load-testing/tests/relays_stress_single_supplier.feature @@ -11,7 +11,16 @@ Feature: Loading gateway server with relays And more actors are staked as follows: | actor | actor inc amount | blocks per inc | max actors | | application | 4 | 10 | 12 | - | gateway | 1 | 10 | 3 | + | gateway | 1 | 10 | 1 | | supplier | 1 | 10 | 1 | When a load of concurrent relay requests are sent from the applications - Then the correct pairs count of claim and proof messages should be committed on-chain \ No newline at end of file + Then the number of failed relay requests is "0" + # TODO_FOLLOWUP(@red-0ne): Implement the following steps + # Then "0" over servicing events are observed + # And "0" slashing events are observed + # And "0" expired claim events are observed + # And there are as many reimbursement requests as the number of settled claims + # And the number of claims submitted and claims settled is the same + # And the number of proofs submitted and proofs required is the same + # And the actors onchain balances are as expected + # TODO_CONSIDERATION: Revisit for additional interesting test cases. \ No newline at end of file diff --git a/load-testing/tests/relays_stress_test.go b/load-testing/tests/relays_stress_test.go index ae7f07a12..7a455f896 100644 --- a/load-testing/tests/relays_stress_test.go +++ b/load-testing/tests/relays_stress_test.go @@ -23,23 +23,11 @@ import ( "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/testutil/testclient" - "github.com/pokt-network/poktroll/testutil/testclient/testblock" "github.com/pokt-network/poktroll/testutil/testclient/testtx" + apptypes "github.com/pokt-network/poktroll/x/application/types" + prooftypes "github.com/pokt-network/poktroll/x/proof/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" -) - -// The following constants are used to identify the different types of transactions, -// once committed, which are expected to be observed on-chain during the test. -// NB: The TxResult Events' #Type values are not prefixed with a slash, -// unlike TxResult Events' "action" attribute value. -const ( - EventActionMsgStakeApplication = "/poktroll.application.MsgStakeApplication" - EventActionMsgStakeGateway = "/poktroll.gateway.MsgStakeGateway" - EventActionMsgStakeSupplier = "/poktroll.supplier.MsgStakeSupplier" - EventActionMsgCreateClaim = "/poktroll.proof.MsgCreateClaim" - EventActionMsgSubmitProof = "/poktroll.proof.MsgSubmitProof" - EventActionAppMsgUpdateParams = "/poktroll.application.MsgUpdateParams" - EventTypeRedelegation = "poktroll.application.EventRedelegation" + tokenomicstypes "github.com/pokt-network/poktroll/x/tokenomics/types" ) // The following constants define the expected ordering of the actors when @@ -77,8 +65,6 @@ var ( // maxConcurrentRequestLimit is the maximum number of concurrent requests that can be made. // By default, it is set to the number of logical CPUs available to the process. maxConcurrentRequestLimit = runtime.GOMAXPROCS(0) - // fundingAccountAddress is the address of the account used to fund other accounts. - fundingAccountAddress string // supplierStakeAmount is the amount of tokens to stake by suppliers. supplierStakeAmount sdk.Coin // gatewayStakeAmount is the amount of tokens to stake by gateways. @@ -86,15 +72,9 @@ var ( // testedServiceId is the service ID for that all applications and suppliers will // be using in this test. testedServiceId string - // blockDuration is the duration of a block in seconds. + // blockDurationSec is the duration of a block in seconds. // NB: This value SHOULD be equal to `timeout_propose` in `config.yml`. - blockDuration = int64(2) - // newTxEventSubscriptionQuery is the format string which yields a subscription - // query to listen for on-chain Tx events. - newTxEventSubscriptionQuery = "tm.event='Tx'" - // eventsReplayClientBufferSize is the buffer size for the events replay client - // for the subscriptions above. - eventsReplayClientBufferSize = 100 + blockDurationSec = int64(2) // relayPayloadFmt is the JSON-RPC request relayPayloadFmt to send a relay request. relayPayloadFmt = `{"jsonrpc":"2.0","method":"%s","params":[],"id":%d}` // relayRequestMethod is the method of the JSON-RPC request to be relayed. @@ -127,15 +107,17 @@ type relaysSuite struct { // batchInfoObs is the observable mapping session information to batch information. // It is used to determine when to send a batch of relay requests to the network. batchInfoObs observable.Observable[*relayBatchInfoNotif] - // newTxEventsObs is the observable that notifies the test suite of new - // transactions committed on-chain. - // It is used to check the results of the transactions sent by the test suite. - newTxEventsObs observable.Observable[*types.TxResult] // txContext is the transaction context used to sign and send transactions. txContext client.TxContext - // sharedParams is the shared on-chain parameters used in the test. + + // Protocol governance params used in the test. // It is queried at the beginning of the test. - sharedParams *sharedtypes.Params + sharedParams *sharedtypes.Params + appParams *apptypes.Params + proofParams *prooftypes.Params + tokenomicsParams *tokenomicstypes.Params + + testedService *sharedtypes.Service // numRelaysSent is the number of relay requests sent during the test. numRelaysSent atomic.Uint64 @@ -213,17 +195,17 @@ type relaysSuite struct { // ready to handle relay requests. activeSuppliers []*accountInfo - // Number of claims and proofs observed on-chain during the test. - currentProofCount int - currentClaimCount int - - // expectedClaimsAndProofsCount is the expected number of claims and proofs - // to be committed on-chain during the test. - expectedClaimsAndProofsCount int - // isEphemeralChain is a flag that indicates whether the test is expected to be // run on ephemeral chain setups like localnet or long living ones (i.e. Test/DevNet). isEphemeralChain bool + + // committedEventsObs is the observable that maps committed blocks to on-chain events. + committedEventsObs observable.Observable[[]types.Event] + + // successfulRelays is the number of relay requests that returned 200 status code. + successfulRelays atomic.Uint64 + // failedRelays is the number of relay requests that returned non-200 status code. + failedRelays atomic.Uint64 } // accountInfo contains the account info needed to build and send transactions. @@ -270,8 +252,8 @@ func TestLoadRelays(t *testing.T) { gocuke.NewRunner(t, &relaysSuite{}).Path(filepath.Join(".", "relays_stress.feature")).Run() } -func TestLoadRelaysSingleSupplier(t *testing.T) { - gocuke.NewRunner(t, &relaysSuite{}).Path(filepath.Join(".", "relays_stress_single_suppier.feature")).Run() +func TestSingleSupplierLoadRelays(t *testing.T) { + gocuke.NewRunner(t, &relaysSuite{}).Path(filepath.Join(".", "relays_stress_single_supplier.feature")).Run() } func (s *relaysSuite) LocalnetIsRunning() { @@ -318,9 +300,9 @@ func (s *relaysSuite) LocalnetIsRunning() { // CometLocalWebsocketURL to the TestNetNode URL. These variables are used // by the testtx txClient to send transactions to the network. if !s.isEphemeralChain { - testclient.CometLocalTCPURL = loadTestParams.TestNetNode + testclient.CometLocalTCPURL = loadTestParams.RPCNode - webSocketURL, err := url.Parse(loadTestParams.TestNetNode) + webSocketURL, err := url.Parse(loadTestParams.RPCNode) require.NoError(s, err) // TestNet nodes may be exposed over HTTPS, so adjust the scheme accordingly. @@ -332,37 +314,31 @@ func (s *relaysSuite) LocalnetIsRunning() { testclient.CometLocalWebsocketURL = webSocketURL.String() + "/websocket" // Update the block duration when running the test on a non-ephemeral chain. - // TODO_TECHDEBT: Get the block duration value from the chain or the manifest. - blockDuration = 60 + // TODO_TECHDEBT: Get the block duration value from the chain. + blockDurationSec = 60 } - // Set up the blockClient that will be notifying the suite about the committed blocks. - s.blockClient = testblock.NewLocalnetClient(s.ctx, s.TestingT.(*testing.T)) - channel.ForEach( - s.ctx, - s.blockClient.CommittedBlocksSequence(s.ctx), - func(ctx context.Context, block client.Block) { - s.latestBlock = block - }, - ) - // Setup the txContext that will be used to send transactions to the network. s.txContext = testtx.NewLocalnetContext(s.TestingT.(*testing.T)) - // Get the relay cost from the tokenomics module. - s.relayCoinAmountCost = s.getRelayCost() - - // Setup the tx listener for on-chain events to check and assert on transactions results. - s.setupTxEventListeners() + // Setup the event listener for on-chain events to check and assert on transactions + // and finalized blocks results. + s.setupEventListeners(loadTestParams.RPCNode) // Initialize the funding account. s.initFundingAccount(loadTestParams.FundingAccountAddress) - // Initialize the on-chain claims and proofs counter. - s.countClaimAndProofs() + // Initialize the on-chain settlement events listener. + s.forEachSettlement(s.ctx) - // Query for the current shared on-chain params. - s.querySharedParams(loadTestParams.TestNetNode) + // Query for the current network on-chain params. + s.querySharedParams(loadTestParams.RPCNode) + s.queryAppParams(loadTestParams.RPCNode) + s.queryProofParams(loadTestParams.RPCNode) + s.queryTestedService(loadTestParams.RPCNode) + + // Get the relay cost from the tokenomics module. + s.relayCoinAmountCost = s.getRelayCost() // Some suppliers may already be staked at genesis, ensure that staking during // this test succeeds by increasing the sake amount. @@ -402,62 +378,50 @@ func (s *relaysSuite) MoreActorsAreStakedAsFollows(table gocuke.DataTable) { // increment the actor count to the maximum. s.relayLoadDurationBlocks = s.plans.maxActorBlocksToFinalIncrementEnd() - if s.isEphemeralChain { - // Adjust the max delegations parameter to the max gateways to permit all - // applications to delegate to all gateways. - // This is to ensure that requests are distributed evenly across all gateways - // at any given time. - s.sendAdjustMaxDelegationsParamTx(s.plans.gateways.maxActorCount) - s.waitForTxsToBeCommitted() - s.ensureUpdatedMaxDelegations(s.plans.gateways.maxActorCount) - } - // Fund all the provisioned suppliers and gateways since their addresses are // known and they are not created on the fly, while funding only the initially // created applications. fundedSuppliers, fundedGateways, fundedApplications := s.sendFundAvailableActorsTx() // Funding messages are sent in a single transaction by the funding account, // only one transaction is expected to be committed. - txResults := s.waitForTxsToBeCommitted() - s.ensureFundedActors(txResults, fundedSuppliers) - s.ensureFundedActors(txResults, fundedGateways) - s.ensureFundedActors(txResults, fundedApplications) + fundedActors := append(fundedSuppliers, fundedGateways...) + fundedActors = append(fundedActors, fundedApplications...) + s.ensureFundedActors(s.ctx, fundedActors) logger.Info().Msg("Actors funded") // The initial actors are the first actors to stake. - suppliers := fundedSuppliers[:s.supplierInitialCount] - gateways := fundedGateways[:s.gatewayInitialCount] - applications := fundedApplications[:s.appInitialCount] + stakedSuppliers := fundedSuppliers[:s.supplierInitialCount] + stakedGateways := fundedGateways[:s.gatewayInitialCount] + stakedApplications := fundedApplications[:s.appInitialCount] + + stakedActors := append(stakedSuppliers, stakedGateways...) + stakedActors = append(stakedActors, stakedApplications...) - s.sendInitialActorsStakeMsgs(suppliers, gateways, applications) - txResults = s.waitForTxsToBeCommitted() - s.ensureStakedActors(txResults, EventActionMsgStakeSupplier, suppliers) - s.ensureStakedActors(txResults, EventActionMsgStakeGateway, gateways) - s.ensureStakedActors(txResults, EventActionMsgStakeApplication, applications) + s.sendInitialActorsStakeMsgs(stakedSuppliers, stakedGateways, stakedApplications) + s.ensureStakedActors(s.ctx, stakedActors) logger.Info().Msg("Actors staked") // Update the list of staked suppliers. - s.activeSuppliers = append(s.activeSuppliers, suppliers...) + s.activeSuppliers = append(s.activeSuppliers, stakedSuppliers...) // In the case of non-ephemeral chain load tests, the available gateways are // not incrementally staked, but are already staked and delegated to, add all // of them to the list of active gateways at the beginning of the test. if !s.isEphemeralChain { - gateways = s.populateWithKnownGateways() + stakedGateways = s.populateWithKnownGateways() } // Delegate the initial applications to the initial gateways - s.sendDelegateInitialAppsTxs(applications, gateways) - txResults = s.waitForTxsToBeCommitted() - s.ensureDelegatedApps(txResults, applications, gateways) + s.sendDelegateInitialAppsTxs(stakedApplications, stakedGateways) + s.ensureDelegatedApps(s.ctx, stakedApplications, stakedGateways) logger.Info().Msg("Apps delegated") // Applications and gateways are now ready and will be active in the next session. - s.preparedApplications = append(s.preparedApplications, applications...) - s.preparedGateways = append(s.preparedGateways, gateways...) + s.preparedApplications = append(s.preparedApplications, stakedApplications...) + s.preparedGateways = append(s.preparedGateways, stakedGateways...) // relayBatchInfoObs maps session information to batch information used to schedule // the relay requests to be sent on the current block. @@ -508,29 +472,11 @@ func (s *relaysSuite) ALoadOfConcurrentRelayRequestsAreSentFromTheApplications() // Block the feature step until the test is done. <-s.ctx.Done() } -func (s *relaysSuite) TheCorrectPairsCountOfClaimAndProofMessagesShouldBeCommittedOnchain() { - logger.Info(). - Int("claims", s.currentClaimCount). - Int("proofs", s.currentProofCount). - Msg("Claims and proofs count") - - require.Equal(s, - s.currentClaimCount, - s.currentProofCount, - "claims and proofs count mismatch", - ) - // TODO_TECHDEBT: The current counting mechanism for the expected claims and proofs - // is not accurate. The expected claims and proofs count should be calculated based - // on a function of(time_per_block, num_blocks_per_session) -> num_claims_and_proofs. - // The reason (time_per_block) is one of the parameters is because claims and proofs - // are removed from the on-chain state after sessions are settled, only leaving - // events behind. The final solution needs to either account for this timing - // carefully (based on sessions that have passed), or be event driven using - // a replay client of on-chain messages and/or events. - //require.Equal(s, - // s.expectedClaimsAndProofsCount, - // s.currentProofCount, - // "unexpected claims and proofs count", - //) +func (s *relaysSuite) TheNumberOfFailedRelayRequestsIs(expectedFailedRelays string) { + expectedFailedRelaysCount, err := strconv.ParseUint(expectedFailedRelays, 10, 64) + require.NoError(s, err) + + require.EqualValues(s, expectedFailedRelaysCount, s.failedRelays.Load()) + require.EqualValues(s, s.numRelaysSent.Load(), s.successfulRelays.Load()) } diff --git a/makefiles/tests.mk b/makefiles/tests.mk index 7bb292e3d..b1962c845 100644 --- a/makefiles/tests.mk +++ b/makefiles/tests.mk @@ -53,13 +53,13 @@ test_load_relays_stress_custom: ## Run the stress test for E2E relays using cust .PHONY: test_load_relays_stress_localnet test_load_relays_stress_localnet: test_e2e_env warn_message_local_stress_test ## Run the stress test for E2E relays on LocalNet. go test -v -count=1 ./load-testing/tests/... \ - -tags=load,test -run LoadRelays --log-level=debug --timeout=30m \ + -tags=load,test -run TestLoadRelays --log-level=debug --timeout=30m \ --manifest ./load-testing/loadtest_manifest_localnet.yaml .PHONY: test_load_relays_stress_localnet_single_supplier test_load_relays_stress_localnet_single_supplier: test_e2e_env warn_message_local_stress_test ## Run the stress test for E2E relays on LocalNet using exclusively one supplier. go test -v -count=1 ./load-testing/tests/... \ - -tags=load,test -run TestLoadRelaysSingleSupplier --log-level=debug --timeout=30m \ + -tags=load,test -run TestSingleSupplierLoadRelays --log-level=debug --timeout=30m \ --manifest ./load-testing/loadtest_manifest_localnet_single_supplier.yaml .PHONY: test_verbose diff --git a/pkg/client/interface.go b/pkg/client/interface.go index f98c39fef..953b6b47a 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -279,6 +279,9 @@ type ApplicationQueryClient interface { // GetAllApplications queries all on-chain applications GetAllApplications(ctx context.Context) ([]apptypes.Application, error) + + // GetParams queries the chain for the application module parameters. + GetParams(ctx context.Context) (*apptypes.Params, error) } // SupplierQueryClient defines an interface that enables the querying of the @@ -349,7 +352,7 @@ type ProofParams interface { // ProofQueryClient defines an interface that enables the querying of the // on-chain proof module params. type ProofQueryClient interface { - // GetParams queries the chain for the current shared module parameters. + // GetParams queries the chain for the current proof module parameters. GetParams(ctx context.Context) (ProofParams, error) } diff --git a/pkg/client/query/appquerier.go b/pkg/client/query/appquerier.go index 9477c35f9..9150af87c 100644 --- a/pkg/client/query/appquerier.go +++ b/pkg/client/query/appquerier.go @@ -62,3 +62,13 @@ func (aq *appQuerier) GetAllApplications(ctx context.Context) ([]apptypes.Applic } return res.Applications, nil } + +// GetParams returns the application module parameters +func (aq *appQuerier) GetParams(ctx context.Context) (*apptypes.Params, error) { + req := apptypes.QueryParamsRequest{} + res, err := aq.applicationQuerier.Params(ctx, &req) + if err != nil { + return nil, err + } + return &res.Params, nil +} diff --git a/testutil/delays/waitall.go b/testutil/delays/waitall.go new file mode 100644 index 000000000..30ae68287 --- /dev/null +++ b/testutil/delays/waitall.go @@ -0,0 +1,23 @@ +package testdelays + +import "sync" + +// WaitAll waits for all the provided functions to complete. +// It is used to wait for multiple goroutines to complete before proceeding. +func WaitAll(waitFuncs ...func()) { + if len(waitFuncs) == 0 { + return + } + + var wg sync.WaitGroup + wg.Add(len(waitFuncs)) + + for _, fn := range waitFuncs { + go func(f func()) { + defer wg.Done() + f() + }(fn) + } + + wg.Wait() +} diff --git a/testutil/events/filter.go b/testutil/events/filter.go index ced4617af..5f16aa1ea 100644 --- a/testutil/events/filter.go +++ b/testutil/events/filter.go @@ -1,6 +1,7 @@ package events import ( + "context" "strconv" "strings" "testing" @@ -8,6 +9,8 @@ import ( abci "github.com/cometbft/cometbft/abci/types" cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/stretchr/testify/require" ) @@ -74,3 +77,24 @@ func NewEventTypeMatchFn(matchEventType string) func(*cosmostypes.Event) bool { return strings.Trim(event.Type, "/") == strings.Trim(matchEventType, "/") } } + +// AbciEventsToTypedEvents converts the abci events to typed events. +func AbciEventsToTypedEvents( + ctx context.Context, + abciEventObs observable.Observable[[]abci.Event], +) observable.Observable[[]proto.Message] { + return channel.Map(ctx, abciEventObs, func(ctx context.Context, events []abci.Event) ([]proto.Message, bool) { + var typedEvents []proto.Message + for _, event := range events { + // TODO_TECHDEBT: Filter out events by event.Type before parsing them. + typedEvent, err := cosmostypes.ParseTypedEvent(event) + if err != nil { + continue + } + + typedEvents = append(typedEvents, typedEvent) + } + + return typedEvents, false + }) +} diff --git a/testutil/keeper/tokenomics.go b/testutil/keeper/tokenomics.go index e4cef7328..409a92f4e 100644 --- a/testutil/keeper/tokenomics.go +++ b/testutil/keeper/tokenomics.go @@ -182,6 +182,11 @@ func TokenomicsKeeperWithActorAddrs(t testing.TB) ( Return(nil). AnyTimes() + mockApplicationKeeper.EXPECT(). + GetParams(gomock.Any()). + Return(apptypes.Params{}). + AnyTimes() + // Mock the supplier keeper. mockSupplierKeeper := mocks.NewMockSupplierKeeper(ctrl) // Mock SetSupplier. diff --git a/x/proof/types/application_query_client.go b/x/proof/types/application_query_client.go index 1cd887314..c9599f2df 100644 --- a/x/proof/types/application_query_client.go +++ b/x/proof/types/application_query_client.go @@ -42,3 +42,9 @@ func (appQueryClient *AppKeeperQueryClient) GetApplication( func (appQueryClient *AppKeeperQueryClient) GetAllApplications(ctx context.Context) ([]apptypes.Application, error) { return appQueryClient.keeper.GetAllApplications(ctx), nil } + +// GetParams returns the application module parameters. +func (appQueryClient *AppKeeperQueryClient) GetParams(ctx context.Context) (*apptypes.Params, error) { + params := appQueryClient.keeper.GetParams(ctx) + return ¶ms, nil +} diff --git a/x/proof/types/expected_keepers.go b/x/proof/types/expected_keepers.go index 9d1fd765e..e2981ce1a 100644 --- a/x/proof/types/expected_keepers.go +++ b/x/proof/types/expected_keepers.go @@ -49,6 +49,7 @@ type ApplicationKeeper interface { GetApplication(ctx context.Context, address string) (app apptypes.Application, found bool) GetAllApplications(ctx context.Context) []apptypes.Application SetApplication(context.Context, apptypes.Application) + GetParams(ctx context.Context) (params apptypes.Params) } // SharedKeeper defines the expected interface needed to retrieve shared information. diff --git a/x/tokenomics/keeper/settle_pending_claims.go b/x/tokenomics/keeper/settle_pending_claims.go index fa1995de2..6ff10b702 100644 --- a/x/tokenomics/keeper/settle_pending_claims.go +++ b/x/tokenomics/keeper/settle_pending_claims.go @@ -233,6 +233,7 @@ func (k Keeper) SettlePendingClaims(ctx cosmostypes.Context) ( NumEstimatedComputeUnits: numEstimatedComputeUnits, ClaimedUpokt: &claimeduPOKT, ProofRequirement: proofRequirement, + SettlementResult: *ClaimSettlementResult, } if err = ctx.EventManager().EmitTypedEvent(&claimSettledEvent); err != nil { diff --git a/x/tokenomics/types/expected_keepers.go b/x/tokenomics/types/expected_keepers.go index c7c0de0c5..9d555f4e2 100644 --- a/x/tokenomics/types/expected_keepers.go +++ b/x/tokenomics/types/expected_keepers.go @@ -46,6 +46,7 @@ type ApplicationKeeper interface { GetAllApplications(ctx context.Context) []apptypes.Application UnbondApplication(ctx context.Context, app *apptypes.Application) error EndBlockerUnbondApplications(ctx context.Context) error + GetParams(ctx context.Context) (params apptypes.Params) } type ProofKeeper interface {