Skip to content

Commit

Permalink
Integrate #904 to prevent exit from read window until all threads are…
Browse files Browse the repository at this point in the history
… idle.
  • Loading branch information
heifner committed Mar 28, 2023
1 parent 8eaa42e commit fa702aa
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
8 changes: 4 additions & 4 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ class two_queue_executor {
return !read_only_queue_.empty() || !read_write_queue_.empty();
} else {
// When in read window, multiple threads including main app thread are accessing two_queue_executor, locking required
return read_only_queue_.execute_highest_locked();
return read_only_queue_.execute_highest_locked(false);
}
}

bool execute_highest_read_only() {
return read_only_queue_.execute_highest_locked();
return read_only_queue_.execute_highest_locked(true);
}

template <typename Function>
Expand All @@ -85,9 +85,9 @@ class two_queue_executor {
read_write_queue_.clear();
}

void set_to_read_window() {
void set_to_read_window(uint32_t num_threads, std::function<bool()> should_exit) {
exec_window_ = exec_window::read;
read_only_queue_.enable_locking();
read_only_queue_.enable_locking(num_threads, std::move(should_exit));
}

void set_to_write_window() {
Expand Down
33 changes: 31 additions & 2 deletions libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <boost/asio.hpp>

#include <condition_variable>
#include <mutex>
#include <queue>

Expand All @@ -12,12 +13,15 @@ class exec_pri_queue : public boost::asio::execution_context
{
public:

void enable_locking() {
void enable_locking(uint32_t num_threads, std::function<bool()> should_exit) {
lock_enabled_ = true;
max_waiting_ = num_threads;
should_exit_ = std::move(should_exit);
}

void disable_locking() {
lock_enabled_ = false;
should_exit_ = [](){ assert(false); return true; }; // should not be called when locking is disabled
}

// called from appbase::application_base::exec poll_one() or run_one()
Expand All @@ -28,6 +32,8 @@ class exec_pri_queue : public boost::asio::execution_context
if (lock_enabled_) {
std::lock_guard g( mtx_ );
handlers_.push( std::move( handler ) );
if (num_waiting_)
cond_.notify_one();
} else {
handlers_.push( std::move( handler ) );
}
Expand Down Expand Up @@ -60,8 +66,26 @@ class exec_pri_queue : public boost::asio::execution_context

public:

bool execute_highest_locked() {
bool execute_highest_locked(bool should_block) {
std::unique_lock g(mtx_);
if (should_block) {
++num_waiting_;
cond_.wait(g, [this](){
bool exit = exiting_blocking_ || should_exit_();
bool empty = handlers_.empty();
if (empty || exit) {
if (((empty && num_waiting_ == max_waiting_) || exit) && !exiting_blocking_) {
cond_.notify_all();
exiting_blocking_ = true;
}
return exit || exiting_blocking_; // same as calling should_exit(), but faster
}
return true;
});
--num_waiting_;
if (exiting_blocking_ || should_exit_())
return false;
}
if( handlers_.empty() )
return false;
auto t = pop();
Expand Down Expand Up @@ -196,6 +220,11 @@ class exec_pri_queue : public boost::asio::execution_context

bool lock_enabled_ = false;
std::mutex mtx_;
std::condition_variable cond_;
uint32_t num_waiting_{0};
uint32_t max_waiting_{0};
bool exiting_blocking_{false};
std::function<bool()> should_exit_; // called holding mtx_
using prio_queue = std::priority_queue<std::unique_ptr<queued_handler_base>, std::deque<std::unique_ptr<queued_handler_base>>, deref_less>;
prio_queue handlers_;
};
Expand Down
4 changes: 2 additions & 2 deletions libraries/custom_appbase/tests/custom_appbase_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ BOOST_AUTO_TEST_CASE( execute_from_read_queue ) {
auto app_thread = start_app_thread(app);

// set to run functions from read_only queue only
app->executor().set_to_read_window();
app->executor().set_to_read_window(1, [](){return false;});

// post functions
std::map<int, int> rslts {};
Expand Down Expand Up @@ -115,7 +115,7 @@ BOOST_AUTO_TEST_CASE( execute_from_empty_read_queue ) {
auto app_thread = start_app_thread(app);

// set to run functions from read_only queue only
app->executor().set_to_read_window();
app->executor().set_to_read_window(1, [](){return false;});

// post functions
std::map<int, int> rslts {};
Expand Down
12 changes: 10 additions & 2 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2853,6 +2853,8 @@ void producer_plugin_impl::start_write_window() {
_ro_in_read_only_mode = false;
_idle_trx_time = fc::time_point::now();

wlog("starting write window");

_ro_window_deadline = _idle_trx_time + _ro_write_window_time_us; // not allowed on block producers, so no need to limit to block deadline
auto expire_time = boost::posix_time::microseconds(_ro_write_window_time_us.count());
_ro_timer.expires_from_now( expire_time );
Expand Down Expand Up @@ -2881,9 +2883,15 @@ void producer_plugin_impl::switch_to_read_window() {
return;
}

wlog("starting read window");

auto& chain = chain_plug->chain();
uint32_t pending_block_num = chain.head_block_num() + 1;
app().executor().set_to_read_window();
_ro_window_deadline = fc::time_point::now() + _ro_read_window_effective_time_us;
app().executor().set_to_read_window(_ro_thread_pool_size,
[received_block=&_received_block, pending_block_num, ro_window_deadline=_ro_window_deadline]() {
return fc::time_point::now() >= ro_window_deadline || (received_block->load() >= pending_block_num); // should_exit()
});
chain.set_db_read_only_mode();
_ro_in_read_only_mode = true;
_ro_read_window_start_time = fc::time_point::now();
Expand All @@ -2892,7 +2900,6 @@ void producer_plugin_impl::switch_to_read_window() {

// start a read-only transaction execution task in each thread in the thread pool
_ro_num_active_exec_tasks = _ro_thread_pool_size;
_ro_window_deadline = fc::time_point::now() + _ro_read_window_effective_time_us;
for (auto i = 0; i < _ro_thread_pool_size; ++i ) {
_ro_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [self = this, pending_block_num] () {
return self->read_only_execution_task(pending_block_num);
Expand Down Expand Up @@ -3010,6 +3017,7 @@ bool producer_plugin_impl::push_read_only_transaction(transaction_metadata_ptr t

// Ensure the trx to finish by the end of read-window.
auto trace = chain.push_transaction( trx, _ro_window_deadline, _ro_max_trx_time_us, 0, false, 0 );
_ro_all_threads_exec_time_us += (fc::time_point::now() - start).count();
auto pr = handle_push_result(trx, next, start, chain, trace, true /*return_failure_trace*/, true /*disable_subjective_enforcement*/, {} /*first_auth*/, 0 /*sub_bill*/, 0 /*prev_billed_cpu_time_us*/);
// If a transaction was exhausted, that indicates we are close to
// the end of read window. Retry in next round.
Expand Down

0 comments on commit fa702aa

Please sign in to comment.