diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 45017c2769..6a8f15ce79 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -557,9 +557,8 @@ struct implicit_production_pause_vote_tracker { class producer_plugin_impl : public std::enable_shared_from_this { public: producer_plugin_impl() - : _timer(app().make_timer()) - , _transaction_ack_channel(app().get_channel()) - , _ro_timer(app().make_timer()) {} + : _transaction_ack_channel(app().get_channel()) + {} void schedule_production_loop(); void schedule_maybe_produce_block(bool exhausted); @@ -683,11 +682,13 @@ class producer_plugin_impl : public std::enable_shared_from_this _timer_thread; + boost::asio::deadline_timer _timer{_timer_thread.get_executor()}; + using signature_provider_type = signature_provider_plugin::signature_provider_type; std::map _signature_providers; chain::bls_pub_priv_key_map_t _finalizer_keys; // public, private std::set _producers; - boost::asio::deadline_timer _timer; block_timing_util::producer_watermarks _producer_watermarks; pending_block_mode _pending_block_mode = pending_block_mode::speculating; unapplied_transaction_queue _unapplied_transactions; @@ -800,7 +801,7 @@ class producer_plugin_impl : public std::enable_shared_from_thischain(); @@ -2808,16 +2815,17 @@ void producer_plugin_impl::schedule_maybe_produce_block(bool exhausted) { ("num", chain.head().block_num() + 1)("desc", block_is_exhausted() ? "Exhausted" : "Deadline exceeded")); } - _timer.async_wait(app().executor().wrap(priority::high, exec_queue::read_write, - [&chain, this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) { - if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) { + _timer.async_wait([&chain, this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) { + if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) { + app().executor().post(priority::high, exec_queue::read_write, [&chain, this]() { // pending_block_state expected, but can't assert inside async_wait auto block_num = chain.is_building_block() ? chain.head().block_num() + 1 : 0; fc_dlog(_log, "Produce block timer for ${num} running at ${time}", ("num", block_num)("time", fc::time_point::now())); auto res = maybe_produce_block(); fc_dlog(_log, "Producing Block #${num} returned: ${res}", ("num", block_num)("res", res)); - } - })); + }); + } + }); } void producer_plugin_impl::schedule_delayed_production_loop(const std::weak_ptr& weak_this, @@ -2826,12 +2834,13 @@ void producer_plugin_impl::schedule_delayed_production_loop(const std::weak_ptr< fc_dlog(_log, "Scheduling Speculative/Production Change at ${time}", ("time", wake_up_time)); static const boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1)); _timer.expires_at(epoch + boost::posix_time::microseconds(wake_up_time->time_since_epoch().count())); - _timer.async_wait(app().executor().wrap(priority::high, exec_queue::read_write, - [this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) { - if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) { + _timer.async_wait([this, cid = ++_timer_corelation_id](const boost::system::error_code& ec) { + if (ec != boost::asio::error::operation_aborted && cid == _timer_corelation_id) { + app().executor().post(priority::high, exec_queue::read_write, [this]() { schedule_production_loop(); - } - })); + }); + } + }); } else { fc_dlog(_log, "Not Scheduling Speculative/Production, no local producers had valid wake up times"); } @@ -2971,13 +2980,14 @@ void producer_plugin_impl::start_write_window() { _ro_window_deadline = now + _ro_write_window_time_us; // not allowed on block producers, so no need to limit to block deadline auto expire_time = boost::posix_time::microseconds(_ro_write_window_time_us.count()); _ro_timer.expires_from_now(expire_time); - _ro_timer.async_wait(app().executor().wrap( // stay on app thread - priority::high, exec_queue::read_write, // placed in read_write so only called from main thread - [this](const boost::system::error_code& ec) { - if (ec != boost::asio::error::operation_aborted) { - switch_to_read_window(); - } - })); + _ro_timer.async_wait([this](const boost::system::error_code& ec) { + if (ec != boost::asio::error::operation_aborted) { + app().executor().post(priority::high, exec_queue::read_write, // placed in read_write so only called from main thread + [this]() { + switch_to_read_window(); + }); + } + }); } // Called only from app thread @@ -3017,8 +3027,8 @@ void producer_plugin_impl::switch_to_read_window() { auto expire_time = boost::posix_time::microseconds(_ro_read_window_time_us.count()); _ro_timer.expires_from_now(expire_time); // Needs to be on read_only because that is what is being processed until switch_to_write_window(). - _ro_timer.async_wait( - app().executor().wrap(priority::high, exec_queue::read_only, [this](const boost::system::error_code& ec) { + _ro_timer.async_wait([this](const boost::system::error_code& ec) { + app().executor().post(priority::high, exec_queue::read_only, [this, ec]() { if (ec != boost::asio::error::operation_aborted) { // use future to make sure all read-only tasks finished before switching to write window for (auto& task : _ro_exec_tasks_fut) { @@ -3030,7 +3040,8 @@ void producer_plugin_impl::switch_to_read_window() { } else { _ro_exec_tasks_fut.clear(); } - })); + }); + }); } // Called from a read only thread. Run in parallel with app and other read only threads