diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index d59a7c4643..e3f7d940cf 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -204,9 +204,12 @@ class producer_plugin : public appbase::plugin { // thread-safe, called when a new block is received void received_block(); -const std::set& producer_accounts() const; + const std::set& producer_accounts() const; + static void set_test_mode(bool m) { test_mode_ = m; } private: + inline static bool test_mode_{false}; // to be moved into appbase (application_base) + std::shared_ptr my; }; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 641ec02f56..2e0a734b53 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -431,11 +431,87 @@ class producer_plugin_impl : public std::enable_shared_from_this queue; + class ro_trx_queue_t { + public: + void push_back(ro_trx_t&& trx) { + std::unique_lock g( mtx ); + queue.push_back(std::move(trx)); + if (num_waiting) + cond.notify_one(); + } + + void push_front(ro_trx_t&& trx) { + std::unique_lock g( mtx ); + queue.push_front(std::move(trx)); + if (num_waiting) + cond.notify_one(); + } + + bool empty() const { + std::lock_guard g( mtx ); + return queue.empty(); + } + + // may wait if the queue is empty, and not all other threads are already waiting. + // returns true if a transaction was dequeued and should be executed, or false + // if conditions are met to stop processing transactions. + bool pop_front(ro_trx_t& trx) { + std::unique_lock g( mtx ); + + ++num_waiting; + cond.wait(g, [this]() { + bool _should_exit = should_exit(); + bool _queue_empty = queue.empty(); + if (_queue_empty || _should_exit) { + if (((_queue_empty && num_waiting == max_waiting) || _should_exit) && !exiting_read_window) { + cond.notify_all(); + exiting_read_window = true; + } + return _should_exit || exiting_read_window; // same as calling should_exit(), but faster + } + return true; + }); + --num_waiting; + if (should_exit()) + return false; + + trx = std::move(queue.front()); + queue.pop_front(); + return true; + } + + // We exit the read window when either: + // - all threads would be idle + // - or the net_plugin received a block. + // - or we have reached the read_window_deadline + void set_exit_criteria(uint32_t num_tasks, std::atomic* received_block, fc::time_point deadline) { + std::lock_guard g( mtx ); // not strictly necessary with current usage from single thread + assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr); + assert(received_block && *received_block == false); + max_waiting = num_tasks; + num_waiting = 0; + received_block_ptr = received_block; + read_window_deadline = deadline; + exiting_read_window = false; + } + + private: + bool should_exit() { + return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr; + } + + mutable std::mutex mtx; + std::condition_variable cond; + deque queue; + uint32_t num_waiting{0}; + uint32_t max_waiting{0}; + bool exiting_read_window{false}; + std::atomic* received_block_ptr{nullptr}; + fc::time_point read_window_deadline; }; + uint16_t _ro_thread_pool_size{ 0 }; static constexpr uint16_t _ro_max_eos_vm_oc_threads_allowed{ 8 }; // Due to uncertainty to get total virtual memory size on a 5-level paging system, set a hard limit named_thread_pool _ro_thread_pool; @@ -455,7 +531,7 @@ class producer_plugin_impl : public std::enable_shared_from_this& next, const fc::time_point& read_window_start_time); @@ -659,8 +735,7 @@ class producer_plugin_impl : public std::enable_shared_from_this g( _ro_trx_queue.mtx ); - _ro_trx_queue.queue.push_back({trx, std::move(next)}); + _ro_trx_queue.push_back({trx, std::move(next)}); return; } @@ -1093,8 +1168,9 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_ my->_ro_thread_pool_size = options.at( "read-only-threads" ).as(); // only initialize other read-only options when read-only thread pool is enabled if ( my->_ro_thread_pool_size > 0 ) { - EOS_ASSERT( my->_producers.empty(), plugin_config_exception, "--read-only-threads not allowed on producer node" ); - + if (!test_mode_) + EOS_ASSERT( my->_producers.empty(), plugin_config_exception, "--read-only-threads not allowed on producer node" ); + #ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED if (chain.is_eos_vm_oc_enabled()) { // EOS VM OC requires 4.2TB Virtual for each executing thread. Make sure the memory @@ -2840,7 +2916,7 @@ void producer_plugin_impl::switch_to_read_window() { // we are in write window, so no read-only trx threads are processing transactions. // _ro_trx_queue is not being accessed. No need to lock. - if ( _ro_trx_queue.queue.empty() ) { // no read-only trxs to process. stay in write window + if ( _ro_trx_queue.empty() ) { // no read-only trxs to process. stay in write window start_write_window(); // restart write window timer for next round return; } @@ -2853,9 +2929,11 @@ void producer_plugin_impl::switch_to_read_window() { // start a read-only transaction execution task in each thread in the thread pool _ro_num_active_trx_exec_tasks = _ro_thread_pool_size; + auto start_time = fc::time_point::now(); + _ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us); for (auto i = 0; i < _ro_thread_pool_size; ++i ) { - _ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [self = this] () { - return self->read_only_trx_execution_task(); + _ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () { + return read_only_trx_execution_task(start_time); }) ); } @@ -2880,27 +2958,21 @@ void producer_plugin_impl::switch_to_read_window() { } // Called from a read only trx thread. Run in parallel with app and other read only trx threads -bool producer_plugin_impl::read_only_trx_execution_task() { - auto start = fc::time_point::now(); - auto read_window_deadline = start + _ro_read_window_effective_time_us; +bool producer_plugin_impl::read_only_trx_execution_task(fc::time_point start) { // We have 4 ways to break out the while loop: // 1. pass read window deadline // 2. Net_plugin receives a block // 3. No more transactions in the read-only trx queue // 4. A transaction execution is exhaused - while ( fc::time_point::now() < read_window_deadline && !_received_block ) { - std::unique_lock lck( _ro_trx_queue.mtx ); - if ( _ro_trx_queue.queue.empty() ) { - break; - } - auto trx = _ro_trx_queue.queue.front(); - _ro_trx_queue.queue.pop_front(); - lck.unlock(); - + ro_trx_t trx; + while ( _ro_trx_queue.pop_front(trx) ) { + // If the queue is empty, pop_front() waits on condition variable, and returns false + // when and only when all tasks must exit (i.e queue is empty and all tasks are idle, or + // we have reached the end of the read window, or net plugin received a block) + auto retry = process_read_only_transaction( trx.trx, trx.next, start ); if ( retry ) { - lck.lock(); - _ro_trx_queue.queue.push_front(trx); + _ro_trx_queue.push_front(std::move(trx)); // Do not schedule new execution break; } diff --git a/plugins/producer_plugin/test/test_read_only_trx.cpp b/plugins/producer_plugin/test/test_read_only_trx.cpp index e004e7a640..5bee8335de 100644 --- a/plugins/producer_plugin/test/test_read_only_trx.cpp +++ b/plugins/producer_plugin/test/test_read_only_trx.cpp @@ -94,6 +94,7 @@ void test_trxs_common(std::vector& specific_args) { auto[prod_plug, chain_plug] = plugin_fut.get(); auto chain_id = chain_plug->get_chain_id(); + prod_plug->set_test_mode(true); std::atomic next_calls = 0; std::atomic num_posts = 0; @@ -150,4 +151,27 @@ BOOST_AUTO_TEST_CASE(no_read_only_threads) { test_trxs_common(specific_args); } +// test read-only trxs on 1 threads (with --read-only-threads) +BOOST_AUTO_TEST_CASE(with_1_read_only_threads) { + std::vector specific_args = { "-p", "eosio", "-e", + "--read-only-threads=1", + "--max-transaction-time=10", + "--read-only-write-window-time-us=100000", + "--read-only-read-window-time-us=40000", + "--disable-subjective-billing=true" }; + test_trxs_common(specific_args); +} + +// test read-only trxs on 16 separate threads (with --read-only-threads) +BOOST_AUTO_TEST_CASE(with_16_read_only_threads) { + std::vector specific_args = { "-p", "eosio", "-e", + "--read-only-threads=16", + "--max-transaction-time=10", + "--read-only-write-window-time-us=100000", + "--read-only-read-window-time-us=40000", + "--disable-subjective-billing=true" }; + test_trxs_common(specific_args); +} + + BOOST_AUTO_TEST_SUITE_END()