Skip to content

Commit

Permalink
GH-1039 Move producer_plugin timers to their own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Nov 20, 2024
1 parent 276fd79 commit ca99a11
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,8 @@ struct implicit_production_pause_vote_tracker {
class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin_impl> {
public:
producer_plugin_impl()
: _timer(app().make_timer<boost::asio::deadline_timer>())
, _transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
, _ro_timer(app().make_timer<boost::asio::deadline_timer>()) {}
: _transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
{}

void schedule_production_loop();
void schedule_maybe_produce_block(bool exhausted);
Expand Down Expand Up @@ -683,11 +682,13 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
bool _production_enabled = false;
bool _pause_production = false;

eosio::chain::named_thread_pool<struct prod> _timer_thread;
boost::asio::deadline_timer _timer{_timer_thread.get_executor()};

using signature_provider_type = signature_provider_plugin::signature_provider_type;
std::map<chain::public_key_type, signature_provider_type> _signature_providers;
chain::bls_pub_priv_key_map_t _finalizer_keys; // public, private
std::set<chain::account_name> _producers;
boost::asio::deadline_timer _timer;
block_timing_util::producer_watermarks _producer_watermarks;
pending_block_mode _pending_block_mode = pending_block_mode::speculating;
unapplied_transaction_queue _unapplied_transactions;
Expand Down Expand Up @@ -800,7 +801,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// use atomic for simplicity and performance
fc::time_point _ro_read_window_start_time;
fc::time_point _ro_window_deadline; // only modified on app thread, read-window deadline or write-window deadline
boost::asio::deadline_timer _ro_timer; // only accessible from the main thread
boost::asio::deadline_timer _ro_timer{_timer_thread.get_executor()}; // only accessible from the main thread
fc::microseconds _ro_max_trx_time_us{0}; // calculated during option initialization
ro_trx_queue_t _ro_exhausted_trx_queue;
alignas(hardware_destructive_interference_sz)
Expand Down Expand Up @@ -1612,6 +1613,11 @@ void producer_plugin_impl::plugin_startup() {
start_write_window();
}

_timer_thread.start( 1, []( const fc::exception& e ) {
elog("Exception in producer timer thread, exiting: ${e}", ("e", e.to_detail_string()));
app().quit();
} );

schedule_production_loop();

dlog("producer plugin: plugin_startup() end");
Expand All @@ -1624,6 +1630,7 @@ void producer_plugin::plugin_startup() {
}

void producer_plugin_impl::plugin_shutdown() {
_timer_thread.stop();
_ro_thread_pool.stop();
// unapplied transaction queue holds lambdas that reference plugins
_unapplied_transactions.clear();
Expand Down Expand Up @@ -2742,13 +2749,13 @@ void producer_plugin_impl::schedule_production_loop() {
_timer.expires_from_now(boost::posix_time::microseconds(config::block_interval_us / 10));

// we failed to start a block, so try again later?
_timer.async_wait(
app().executor().wrap(priority::high, exec_queue::read_write,
[this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) {
schedule_production_loop();
}
}));
_timer.async_wait([this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) {
app().executor().post(priority::high, exec_queue::read_write, [this]() {
schedule_production_loop();
});
}
});
} else if (result == start_block_result::waiting_for_block) {
if (is_configured_producer() && !production_disabled_by_policy()) {
chain::controller& chain = chain_plug->chain();
Expand Down Expand Up @@ -2808,16 +2815,17 @@ void producer_plugin_impl::schedule_maybe_produce_block(bool exhausted) {
("num", chain.head().block_num() + 1)("desc", block_is_exhausted() ? "Exhausted" : "Deadline exceeded"));
}

_timer.async_wait(app().executor().wrap(priority::high, exec_queue::read_write,
[&chain, this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) {
_timer.async_wait([&chain, this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) {
app().executor().post(priority::high, exec_queue::read_write, [&chain, this]() {
// pending_block_state expected, but can't assert inside async_wait
auto block_num = chain.is_building_block() ? chain.head().block_num() + 1 : 0;
fc_dlog(_log, "Produce block timer for ${num} running at ${time}", ("num", block_num)("time", fc::time_point::now()));
auto res = maybe_produce_block();
fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", block_num)("res", res));
}
}));
});
}
});
}

void producer_plugin_impl::schedule_delayed_production_loop(const std::weak_ptr<producer_plugin_impl>& weak_this,
Expand All @@ -2826,12 +2834,13 @@ void producer_plugin_impl::schedule_delayed_production_loop(const std::weak_ptr<
fc_dlog(_log, "Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time));
static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
_timer.expires_at(epoch + boost::posix_time::microseconds(wake_up_time->time_since_epoch().count()));
_timer.async_wait(app().executor().wrap(priority::high, exec_queue::read_write,
[this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) {
_timer.async_wait([this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) {
app().executor().post(priority::high, exec_queue::read_write, [this]() {
schedule_production_loop();
}
}));
});
}
});
} else {
fc_dlog(_log, "Not Scheduling Speculative/Production, no local producers had valid wake up times");
}
Expand Down Expand Up @@ -2971,13 +2980,14 @@ void producer_plugin_impl::start_write_window() {
_ro_window_deadline = now + _ro_write_window_time_us; // not allowed on block producers, so no need to limit to block deadline
auto expire_time = boost::posix_time::microseconds(_ro_write_window_time_us.count());
_ro_timer.expires_from_now(expire_time);
_ro_timer.async_wait(app().executor().wrap( // stay on app thread
priority::high, exec_queue::read_write, // placed in read_write so only called from main thread
[this](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted) {
switch_to_read_window();
}
}));
_ro_timer.async_wait([this](const boost::system::error_code& ec) {
if (ec != boost::asio::error::operation_aborted) {
app().executor().post(priority::high, exec_queue::read_write, // placed in read_write so only called from main thread
[this]() {
switch_to_read_window();
});
}
});
}

// Called only from app thread
Expand Down Expand Up @@ -3017,8 +3027,8 @@ void producer_plugin_impl::switch_to_read_window() {
auto expire_time = boost::posix_time::microseconds(_ro_read_window_time_us.count());
_ro_timer.expires_from_now(expire_time);
// Needs to be on read_only because that is what is being processed until switch_to_write_window().
_ro_timer.async_wait(
app().executor().wrap(priority::high, exec_queue::read_only, [this](const boost::system::error_code& ec) {
_ro_timer.async_wait([this](const boost::system::error_code& ec) {
app().executor().post(priority::high, exec_queue::read_only, [this, ec]() {
if (ec != boost::asio::error::operation_aborted) {
// use future to make sure all read-only tasks finished before switching to write window
for (auto& task : _ro_exec_tasks_fut) {
Expand All @@ -3030,7 +3040,8 @@ void producer_plugin_impl::switch_to_read_window() {
} else {
_ro_exec_tasks_fut.clear();
}
}));
});
});
}

// Called from a read only thread. Run in parallel with app and other read only threads
Expand Down

0 comments on commit ca99a11

Please sign in to comment.