Skip to content

Commit

Permalink
Merge pull request #885 from AntelopeIO/read_only_time_report
Browse files Browse the repository at this point in the history
[4.0] Add time summary logging for transient trx processing and read-only threads
  • Loading branch information
linh2931 authored Mar 24, 2023
2 parents 59cf021 + 44b56b3 commit f07e688
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 22 deletions.
10 changes: 7 additions & 3 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2680,7 +2680,7 @@ struct controller_impl {
// only called from non-main threads (read-only trx execution threads)
// when producer_plugin starts them
void init_thread_local_data() {
EOS_ASSERT( !is_main_thread(), misc_exception, "init_thread_local_data called on the main thread");
EOS_ASSERT( !is_on_main_thread(), misc_exception, "init_thread_local_data called on the main thread");
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
if ( is_eos_vm_oc_enabled() )
// EOSVMOC needs further initialization of its thread local data
Expand All @@ -2691,10 +2691,10 @@ struct controller_impl {
wasmif_thread_local = std::make_unique<wasm_interface>( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty());
}

bool is_main_thread() { return main_thread_id == std::this_thread::get_id(); };
bool is_on_main_thread() { return main_thread_id == std::this_thread::get_id(); };

wasm_interface& get_wasm_interface() {
if ( is_main_thread()
if ( is_on_main_thread()
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
|| is_eos_vm_oc_enabled()
#endif
Expand Down Expand Up @@ -3686,6 +3686,10 @@ void controller::init_thread_local_data() {
my->init_thread_local_data();
}

bool controller::is_on_main_thread() const {
return my->is_on_main_thread();
}

/// Protocol feature activation handlers:

template<>
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ namespace eosio { namespace chain {
void set_db_read_only_mode();
void unset_db_read_only_mode();
void init_thread_local_data();
bool is_on_main_thread() const;

private:
friend class apply_context;
Expand Down
74 changes: 55 additions & 19 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,39 +256,53 @@ struct block_time_tracker {
block_idle_time += idle;
}

void add_fail_time( const fc::microseconds& fail_time ) {
trx_fail_time += fail_time;
++trx_fail_num;
void add_fail_time( const fc::microseconds& fail_time, bool is_transient ) {
if( is_transient ) {
// transient time includes both success and fail time
transient_trx_time += fail_time;
++transient_trx_num;
} else {
trx_fail_time += fail_time;
++trx_fail_num;
}
}

void add_success_time( const fc::microseconds& time ) {
trx_success_time += time;
++trx_success_num;
void add_success_time( const fc::microseconds& time, bool is_transient ) {
if( is_transient ) {
transient_trx_time += time;
++transient_trx_num;
} else {
trx_success_time += time;
++trx_success_num;
}
}

void report( const fc::time_point& idle_trx_time, uint32_t block_num ) {
if( _log.is_enabled( fc::log_level::debug ) ) {
auto now = fc::time_point::now();
add_idle_time( now - idle_trx_time );
fc_dlog( _log, "Block #${n} trx idle: ${i}us out of ${t}us, success: ${sn}, ${s}us, fail: ${fn}, ${f}us, other: ${o}us",
fc_dlog( _log, "Block #${n} trx idle: ${i}us out of ${t}us, success: ${sn}, ${s}us, fail: ${fn}, ${f}us, transient: ${tn}, ${t}us, other: ${o}us",
("n", block_num)
("i", block_idle_time)("t", now - clear_time)("sn", trx_success_num)("s", trx_success_time)
("fn", trx_fail_num)("f", trx_fail_time)
("o", (now - clear_time) - block_idle_time - trx_success_time - trx_fail_time) );
("tn", transient_trx_num)("t", transient_trx_time)
("o", (now - clear_time) - block_idle_time - trx_success_time - trx_fail_time - transient_trx_time) );
}
}

void clear() {
block_idle_time = trx_fail_time = trx_success_time = fc::microseconds{};
trx_fail_num = trx_success_num = 0;
block_idle_time = trx_fail_time = trx_success_time = transient_trx_time = fc::microseconds{};
trx_fail_num = trx_success_num = transient_trx_num = 0;
clear_time = fc::time_point::now();
}

fc::microseconds block_idle_time;
uint32_t trx_success_num = 0;
uint32_t trx_fail_num = 0;
uint32_t transient_trx_num = 0;
fc::microseconds trx_success_time;
fc::microseconds trx_fail_time;
fc::microseconds transient_trx_time;
fc::time_point clear_time{fc::time_point::now()};
};

Expand Down Expand Up @@ -429,11 +443,13 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
fc::microseconds _ro_read_window_time_us{ 60000 };
static constexpr fc::microseconds _ro_read_window_minimum_time_us{ 10000 };
fc::microseconds _ro_read_window_effective_time_us{ 0 }; // calculated during option initialization
std::atomic<int64_t> _ro_all_threads_exec_time_us; // total time spent by all threads executing transactions. use atomic for simplicity and performance
fc::time_point _ro_read_window_start_time;
boost::asio::deadline_timer _ro_write_window_timer;
boost::asio::deadline_timer _ro_read_window_timer;
fc::microseconds _ro_max_trx_time_us{ 0 }; // calculated during option initialization
ro_trx_queue_t _ro_trx_queue;
std::atomic<uint32_t> _ro_num_active_trx_exec_tasks{ 0 };
std::atomic<uint32_t> _ro_num_active_trx_exec_tasks{ 0 };
std::vector<std::future<bool>> _ro_trx_exec_tasks_fut;

void start_write_window();
Expand Down Expand Up @@ -688,7 +704,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
self->log_trx_results( trx, nullptr, ex, 0, start, is_transient );
next( std::move(ex) );
self->_idle_trx_time = fc::time_point::now();
self->_time_tracker.add_fail_time(self->_idle_trx_time - start);
auto dur = self->_idle_trx_time - start;
self->_time_tracker.add_fail_time(dur, is_transient);
};
try {
auto result = future.get();
Expand Down Expand Up @@ -2214,7 +2231,7 @@ producer_plugin_impl::push_transaction( const fc::time_point& block_deadline,
log_trx_results( trx, except_ptr );
next( except_ptr );
}
_time_tracker.add_fail_time(fc::time_point::now() - start);
_time_tracker.add_fail_time(fc::time_point::now() - start, trx->is_transient());
return push_result{.failed = true};
}

Expand Down Expand Up @@ -2261,8 +2278,10 @@ producer_plugin_impl::handle_push_result( const transaction_metadata_ptr& trx,
auto end = fc::time_point::now();
push_result pr;
if( trace->except ) {
if ( !trx->is_read_only() )
_time_tracker.add_fail_time(end - start);
if ( chain.is_on_main_thread() ) {
auto dur = end - start;
_time_tracker.add_fail_time(dur, trx->is_transient());
}
if( exception_is_exhausted( *trace->except ) ) {
if( _pending_block_mode == pending_block_mode::producing ) {
fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
Expand Down Expand Up @@ -2302,8 +2321,10 @@ producer_plugin_impl::handle_push_result( const transaction_metadata_ptr& trx,
} else {
fc_tlog( _log, "Subjective bill for success ${a}: ${b} elapsed ${t}us, time ${r}us",
("a",first_auth)("b",sub_bill)("t",trace->elapsed)("r", end - start));
if ( !trx->is_read_only() )
_time_tracker.add_success_time(end - start);
if ( chain.is_on_main_thread() ) {
auto dur = end - start;
_time_tracker.add_success_time(dur, trx->is_transient());
}
log_trx_results( trx, trace, start );
// if producing then trx is in objective cpu account billing
if (!disable_subjective_enforcement && _pending_block_mode != pending_block_mode::producing) {
Expand Down Expand Up @@ -2446,7 +2467,7 @@ void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p
auto trace = chain.push_scheduled_transaction(trx_id, deadline, max_trx_time, 0, false);
auto end = fc::time_point::now();
if (trace->except) {
_time_tracker.add_fail_time(end - start);
_time_tracker.add_fail_time(end - start, false); // delayed transaction cannot be transient
if (exception_is_exhausted(*trace->except)) {
if( block_is_exhausted() ) {
exhausted = true;
Expand All @@ -2466,7 +2487,7 @@ void producer_plugin_impl::process_scheduled_and_incoming_trxs( const fc::time_p
num_failed++;
}
} else {
_time_tracker.add_success_time(end - start);
_time_tracker.add_success_time(end - start, false); // delayed transaction cannot be transient
fc_dlog(_trx_successful_trace_log,
"[TRX_TRACE] Block ${block_num} for producer ${prod} is ACCEPTING scheduled tx: ${txid}, time: ${r}, auth: ${a}, cpu: ${cpu}",
("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())
Expand Down Expand Up @@ -2766,6 +2787,14 @@ void producer_plugin::log_failed_transaction(const transaction_id_type& trx_id,

// Called from app thread
void producer_plugin_impl::switch_to_write_window() {
if ( _log.is_enabled( fc::log_level::debug ) ) {
auto now = fc::time_point::now();
fc_dlog( _log, "Read-only threads ${n}, read window ${r}us, total all threads ${t}us",
("n", _ro_thread_pool_size)
("r", now - _ro_read_window_start_time)
("t", _ro_all_threads_exec_time_us.load()));
}

// this method can be called from multiple places. it is possible
// we are already in write window.
if ( app().executor().is_write_window() ) {
Expand Down Expand Up @@ -2819,6 +2848,8 @@ void producer_plugin_impl::switch_to_read_window() {
app().executor().set_to_read_window();
chain_plug->chain().set_db_read_only_mode();
_received_block = false;
_ro_read_window_start_time = fc::time_point::now();
_ro_all_threads_exec_time_us = 0;

// start a read-only transaction execution task in each thread in the thread pool
_ro_num_active_trx_exec_tasks = _ro_thread_pool_size;
Expand Down Expand Up @@ -2933,7 +2964,12 @@ bool producer_plugin_impl::push_read_only_transaction(
auto remaining_time_in_read_window_us = _ro_read_window_time_us - time_used_in_read_window_us;
// Ensure the trx to finish by the end of read-window.
auto window_deadline = std::min( start + remaining_time_in_read_window_us, block_deadline );

auto trace = chain.push_transaction( trx, window_deadline, _ro_max_trx_time_us, 0, false, 0 );
if ( _log.is_enabled( fc::log_level::debug ) ) {
auto dur = fc::time_point::now() - start;
_ro_all_threads_exec_time_us += dur.count();
}
auto pr = handle_push_result(trx, next, start, chain, trace, true /*return_failure_trace*/, true /*disable_subjective_enforcement*/, {} /*first_auth*/, 0 /*sub_bill*/, 0 /*prev_billed_cpu_time_us*/);
// If a transaction was exhausted, that indicates we are close to
// the end of read window. Retry in next round.
Expand Down

0 comments on commit f07e688

Please sign in to comment.