Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.0 -> main] PH: Reliability improvements #1852

Merged
merged 12 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/PerformanceHarness/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TpsTestConfig:
numTrxGensUsed: int = 0
targetTpsPerGenList: List[int] = field(default_factory=list)
quiet: bool = False
printMissingTransactions: bool=False
printMissingTransactions: bool=True

@dataclass
class stats():
Expand Down
8 changes: 4 additions & 4 deletions tests/PerformanceHarness/performance_test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class PtbConfig:
quiet: bool=False
delPerfLogs: bool=False
expectedTransactionsSent: int = field(default_factory=int, init=False)
printMissingTransactions: bool=False
printMissingTransactions: bool=True
userTrxDataFile: Path=None
endpointMode: str="p2p"
apiEndpoint: str=None
Expand Down Expand Up @@ -491,12 +491,12 @@ def configureConnections():
completedRun = True

# Get stats after transaction generation stops
endBlock = self.waitForEmptyBlocks(self.validationNode, self.emptyBlockGoal)
trxSent = {}
scrapeTrxGenTrxSentDataLogs(trxSent, self.trxGenLogDirPath, self.ptbConfig.quiet)
if len(trxSent) != self.ptbConfig.expectedTransactionsSent:
print(f"ERROR: Transactions generated: {len(trxSent)} does not match the expected number of transactions: {self.ptbConfig.expectedTransactionsSent}")
blocksToWait = 2 * self.ptbConfig.testTrxGenDurationSec + 10
trxNotFound = self.validationNode.waitForTransactionsInBlockRange(trxSent, self.data.startBlock, blocksToWait)
trxNotFound = self.validationNode.waitForTransactionsInBlockRange(trxSent, self.data.startBlock, endBlock)
self.data.ceaseBlock = self.validationNode.getHeadBlockNum()

return PerformanceTestBasic.PtbTpsTestResult(completedRun=completedRun, numGeneratorsUsed=tpsTrxGensConfig.numGenerators,
Expand Down Expand Up @@ -733,7 +733,7 @@ def _createBaseArgumentParser(defEndpointApiDef: str, defProdNodeCnt: int, defVa
ptbBaseParserGroup.add_argument("--save-state", help=argparse.SUPPRESS if suppressHelp else "Whether to save node state. (Warning: large disk usage)", action='store_true')
ptbBaseParserGroup.add_argument("--quiet", help=argparse.SUPPRESS if suppressHelp else "Whether to quiet printing intermediate results and reports to stdout", action='store_true')
ptbBaseParserGroup.add_argument("--prods-enable-trace-api", help=argparse.SUPPRESS if suppressHelp else "Determines whether producer nodes should have eosio::trace_api_plugin enabled", action='store_true')
ptbBaseParserGroup.add_argument("--print-missing-transactions", help=argparse.SUPPRESS if suppressHelp else "Toggles if missing transactions are be printed upon test completion.", action='store_true')
ptbBaseParserGroup.add_argument("--print-missing-transactions", type=bool, help=argparse.SUPPRESS if suppressHelp else "Print missing transactions upon test completion.", default=True)
ptbBaseParserGroup.add_argument("--account-name", type=str, help=argparse.SUPPRESS if suppressHelp else "Name of the account to create and assign a contract to", default="eosio")
ptbBaseParserGroup.add_argument("--contract-dir", type=str, help=argparse.SUPPRESS if suppressHelp else "Path to contract dir", default="unittests/contracts/eosio.system")
ptbBaseParserGroup.add_argument("--wasm-file", type=str, help=argparse.SUPPRESS if suppressHelp else "WASM file name for contract", default="eosio.system.wasm")
Expand Down
5 changes: 5 additions & 0 deletions tests/TestHarness/Cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ def connectGroup(group, producerNodes, bridgeNodes) :
if "--plugin eosio::history_api_plugin" in args:
argsArr.append("--is-nodeos-v2")
break

# Handle common case of specifying no block offset for older versions
if "v2" in self.nodeosVers or "v3" in self.nodeosVers or "v4" in self.nodeosVers:
argsArr = list(map(lambda st: str.replace(st, "--produce-block-offset-ms 0", "--last-block-time-offset-us 0 --last-block-cpu-effort-percent 100"), argsArr))

Cluster.__LauncherCmdArr = argsArr.copy()

launcher = cluster_generator(argsArr)
Expand Down
9 changes: 4 additions & 5 deletions tests/TestHarness/Node.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,18 @@ def checkBlockForTransactions(self, transIds, blockNum):
transIds.pop(self.fetchTransactionFromTrace(trx))
return transIds

def waitForTransactionsInBlockRange(self, transIds, startBlock=2, maxFutureBlocks=0):
def waitForTransactionsInBlockRange(self, transIds, startBlock, endBlock):
nextBlockToProcess = startBlock
overallEndBlock = startBlock + maxFutureBlocks
while len(transIds) > 0:
currentLoopEndBlock = self.getHeadBlockNum()
if currentLoopEndBlock > overallEndBlock:
currentLoopEndBlock = overallEndBlock
if currentLoopEndBlock > endBlock:
currentLoopEndBlock = endBlock
for blockNum in range(nextBlockToProcess, currentLoopEndBlock + 1):
transIds = self.checkBlockForTransactions(transIds, blockNum)
if len(transIds) == 0:
return transIds
nextBlockToProcess = currentLoopEndBlock + 1
if currentLoopEndBlock == overallEndBlock:
if currentLoopEndBlock == endBlock:
Utils.Print("ERROR: Transactions were missing upon expiration of waitOnblockTransactions")
break
self.waitForHeadToAdvance()
Expand Down
26 changes: 21 additions & 5 deletions tests/trx_generator/trx_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,31 @@ namespace eosio::testing {
}

void p2p_connection::disconnect() {
ilog("Closing socket.");
_p2p_socket.close();
ilog("Socket closed.");
int max = 30;
int waited = 0;
for (uint64_t sent = _sent.load(), sent_callback_num = _sent_callback_num.load();
sent != sent_callback_num && waited < max;
sent = _sent.load(), sent_callback_num = _sent_callback_num.load()) {
ilog("disconnect waiting on ack - sent ${s} | acked ${a} | waited ${w}",
("s", sent)("a", sent_callback_num)("w", waited));
sleep(1);
++waited;
}
if (waited == max) {
elog("disconnect failed to receive all acks in time - sent ${s} | acked ${a} | waited ${w}",
("s", _sent.load())("a", _sent_callback_num.load())("w", waited));
}
}

void p2p_connection::send_transaction(const chain::packed_transaction& trx) {
send_buffer_type msg = create_send_buffer(trx);
_p2p_socket.send(boost::asio::buffer(*msg));
trx_acknowledged(trx.id(), fc::time_point::min()); //using min to identify ack time as not applicable for p2p

++_sent;
_strand.post( [this, msg{std::move(msg)}, id{trx.id()}]() {
boost::asio::write(_p2p_socket, boost::asio::buffer(*msg));
trx_acknowledged(id, fc::time_point::min()); //using min to identify ack time as not applicable for p2p
++_sent_callback_num;
} );
}

acked_trx_trace_info p2p_connection::get_acked_trx_trace_info(const eosio::chain::transaction_id_type& trx_id) {
Expand Down
13 changes: 10 additions & 3 deletions tests/trx_generator/trx_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

#include<eosio/chain/transaction.hpp>
#include<eosio/chain/block.hpp>
#include<boost/asio/ip/tcp.hpp>
#include<fc/network/message_buffer.hpp>
#include<eosio/chain/thread_utils.hpp>

#include<boost/asio/ip/tcp.hpp>
#include<boost/asio/strand.hpp>

#include<chrono>
#include<thread>
#include<variant>
Expand Down Expand Up @@ -104,10 +106,15 @@ namespace eosio::testing {

struct p2p_connection : public provider_connection {
boost::asio::ip::tcp::socket _p2p_socket;
boost::asio::io_context::strand _strand;
std::atomic<uint64_t> _sent_callback_num{0};
std::atomic<uint64_t> _sent{0};


explicit p2p_connection(const provider_base_config& provider_config)
: provider_connection(provider_config)
, _p2p_socket(_connection_thread_pool.get_executor()) {}
, _p2p_socket(_connection_thread_pool.get_executor())
, _strand(_connection_thread_pool.get_executor()){}

void send_transaction(const chain::packed_transaction& trx) final;

Expand Down