From 053b5c062131e56a84088c651a54a4633bceea5d Mon Sep 17 00:00:00 2001 From: Dmitriy Khaustov Date: Wed, 15 Nov 2023 15:36:08 +0300 Subject: [PATCH] Refactor: offchain worker (#1866) * refactor: offchain worker * fix: mock object leaks Signed-off-by: Dmitriy Khaustov aka xDimon --- core/api/service/mmr/rpc.cpp | 11 ++---- core/injector/application_injector.cpp | 1 + .../impl/offchain_worker_factory_impl.cpp | 10 +---- .../impl/offchain_worker_factory_impl.hpp | 7 +--- core/offchain/impl/offchain_worker_impl.cpp | 38 +++++-------------- core/offchain/impl/offchain_worker_impl.hpp | 10 +---- core/offchain/impl/runner.hpp | 10 ++++- core/offchain/offchain_worker.hpp | 2 +- core/offchain/offchain_worker_factory.hpp | 7 +--- core/parachain/pvf/precheck.cpp | 3 +- .../runtime_api/impl/offchain_worker_api.cpp | 38 ++++++++++++------- .../runtime_api/impl/offchain_worker_api.hpp | 6 +-- test/core/consensus/babe/babe_test.cpp | 10 +++-- .../timeline/block_executor_test.cpp | 10 +++-- .../core/offchain/offchain_worker_mock.hpp | 2 +- test/testutil/asio_wait.hpp | 8 ++-- 16 files changed, 72 insertions(+), 101 deletions(-) diff --git a/core/api/service/mmr/rpc.cpp b/core/api/service/mmr/rpc.cpp index e8f9726d0e..88a3319f1a 100644 --- a/core/api/service/mmr/rpc.cpp +++ b/core/api/service/mmr/rpc.cpp @@ -66,14 +66,9 @@ namespace kagome::api { // TODO(turuslan): simplify offchain ::libp2p::common::MovableFinalAction remove( [&] { offchain_worker_pool_.get()->removeWorker(); }); - outcome::result result{std::move(remove)}; - if (auto r = block_tree_.get()->getBlockHeader(at)) { - offchain_worker_pool_.get()->addWorker( - offchain_worker_factory_.get()->make(executor_.get(), r.value())); - return result; - } else { - return decltype(result){r.error()}; - } + offchain_worker_pool_.get()->addWorker( + offchain_worker_factory_.get()->make()); + return remove; } void MmrRpc::registerHandlers() { diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 42fc184458..7f2f60bcd8 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -120,6 +120,7 @@ #include "offchain/impl/offchain_worker_factory_impl.hpp" #include "offchain/impl/offchain_worker_impl.hpp" #include "offchain/impl/offchain_worker_pool_impl.hpp" +#include "offchain/impl/runner.hpp" #include "outcome/outcome.hpp" #include "parachain/approval/approval_distribution.hpp" #include "parachain/availability/bitfield/store_impl.hpp" diff --git a/core/offchain/impl/offchain_worker_factory_impl.cpp b/core/offchain/impl/offchain_worker_factory_impl.cpp index 23fec3376e..a0d1456533 100644 --- a/core/offchain/impl/offchain_worker_factory_impl.cpp +++ b/core/offchain/impl/offchain_worker_factory_impl.cpp @@ -14,7 +14,6 @@ namespace kagome::offchain { OffchainWorkerFactoryImpl::OffchainWorkerFactoryImpl( const application::AppConfiguration &app_config, std::shared_ptr clock, - std::shared_ptr hasher, std::shared_ptr storage, std::shared_ptr random_generator, std::shared_ptr author_api, @@ -23,7 +22,6 @@ namespace kagome::offchain { std::shared_ptr offchain_worker_pool) : app_config_(app_config), clock_(std::move(clock)), - hasher_(std::move(hasher)), storage_(std::move(storage)), random_generator_(std::move(random_generator)), author_api_(std::move(author_api)), @@ -31,7 +29,6 @@ namespace kagome::offchain { persistent_storage_(std::move(persistent_storage)), offchain_worker_pool_(std::move(offchain_worker_pool)) { BOOST_ASSERT(clock_); - BOOST_ASSERT(hasher_); BOOST_ASSERT(storage_); BOOST_ASSERT(random_generator_); BOOST_ASSERT(author_api_); @@ -39,19 +36,14 @@ namespace kagome::offchain { BOOST_ASSERT(offchain_worker_pool_); } - std::shared_ptr OffchainWorkerFactoryImpl::make( - std::shared_ptr executor, - const primitives::BlockHeader &header) { + std::shared_ptr OffchainWorkerFactoryImpl::make() { return std::make_shared(app_config_, clock_, - hasher_, storage_, random_generator_, author_api_, current_peer_info_, persistent_storage_, - executor, - header, offchain_worker_pool_); } diff --git a/core/offchain/impl/offchain_worker_factory_impl.hpp b/core/offchain/impl/offchain_worker_factory_impl.hpp index 45855862bb..a09fd2be94 100644 --- a/core/offchain/impl/offchain_worker_factory_impl.hpp +++ b/core/offchain/impl/offchain_worker_factory_impl.hpp @@ -10,6 +10,7 @@ #include +#include "clock/clock.hpp" #include "crypto/random_generator.hpp" #include "network/types/own_peer_info.hpp" #include "offchain/offchain_persistent_storage.hpp" @@ -35,7 +36,6 @@ namespace kagome::offchain { OffchainWorkerFactoryImpl( const application::AppConfiguration &app_config, std::shared_ptr clock, - std::shared_ptr hasher, std::shared_ptr storage, std::shared_ptr random_generator, std::shared_ptr author_api, @@ -43,14 +43,11 @@ namespace kagome::offchain { std::shared_ptr persistent_storage, std::shared_ptr offchain_worker_pool); - std::shared_ptr make( - std::shared_ptr executor, - const primitives::BlockHeader &header) override; + std::shared_ptr make() override; private: const application::AppConfiguration &app_config_; std::shared_ptr clock_; - std::shared_ptr hasher_; std::shared_ptr storage_; std::shared_ptr random_generator_; std::shared_ptr author_api_; diff --git a/core/offchain/impl/offchain_worker_impl.cpp b/core/offchain/impl/offchain_worker_impl.cpp index faf783a014..45786a9e71 100644 --- a/core/offchain/impl/offchain_worker_impl.cpp +++ b/core/offchain/impl/offchain_worker_impl.cpp @@ -22,74 +22,55 @@ namespace kagome::offchain { + size_t OffchainWorkerImpl::ocw_counter_ = 0; + OffchainWorkerImpl::OffchainWorkerImpl( const application::AppConfiguration &app_config, std::shared_ptr clock, - std::shared_ptr hasher, std::shared_ptr storage, std::shared_ptr random_generator, std::shared_ptr author_api, const network::OwnPeerInfo ¤t_peer_info, std::shared_ptr persistent_storage, - std::shared_ptr executor, - const primitives::BlockHeader &header, std::shared_ptr ocw_pool) : app_config_(app_config), clock_(std::move(clock)), - hasher_(std::move(hasher)), random_generator_(std::move(random_generator)), author_api_(std::move(author_api)), current_peer_info_(current_peer_info), persistent_storage_(std::move(persistent_storage)), - executor_(std::move(executor)), - header_(header), ocw_pool_(std::move(ocw_pool)), log_(log::createLogger( - "OffchainWorker#" + std::to_string(header_.number), "offchain")) { + "OffchainWorker#" + std::to_string(++ocw_counter_), "offchain")) { BOOST_ASSERT(clock_); - BOOST_ASSERT(hasher_); BOOST_ASSERT(storage); BOOST_ASSERT(random_generator_); BOOST_ASSERT(author_api_); BOOST_ASSERT(persistent_storage_); - BOOST_ASSERT(executor_); BOOST_ASSERT(ocw_pool_); - const_cast(block_) = header_.blockInfo(); - local_storage_ = std::make_shared(std::move(storage)); } - outcome::result OffchainWorkerImpl::run() { + void OffchainWorkerImpl::run(std::function &&func, + std::string label) { BOOST_ASSERT(not ocw_pool_->getWorker()); ::libp2p::common::FinalAction at_end( [prev_thread_name = soralog::util::getThreadName()] { soralog::util::setThreadName(prev_thread_name); }); - - soralog::util::setThreadName("ocw.#" + std::to_string(block_.number)); + soralog::util::setThreadName("ocw." + label); ocw_pool_->addWorker(shared_from_this()); ::libp2p::common::FinalAction remove([&] { ocw_pool_->removeWorker(); }); - SL_TRACE(log_, "Offchain worker is started for block {}", block_); - - auto res = runtime::callOffchainWorkerApi(*executor_, block_.hash, header_); - - if (res.has_error()) { - SL_ERROR(log_, - "Can't execute offchain worker for block {}: {}", - block_, - res.error()); - return res.error(); - } + SL_TRACE(log_, "Offchain worker with label {} is started", label); - SL_DEBUG( - log_, "Offchain worker is successfully executed for block {}", block_); + func(); - return outcome::success(); + SL_TRACE(log_, "Offchain worker with label {} is finished", label); } bool OffchainWorkerImpl::isValidator() const { @@ -329,7 +310,6 @@ namespace kagome::offchain { // issue: https://github.com/soramitsu/kagome/issues/998 throw std::runtime_error( "This method of OffchainWorkerImpl is not implemented yet"); - return; } } // namespace kagome::offchain diff --git a/core/offchain/impl/offchain_worker_impl.hpp b/core/offchain/impl/offchain_worker_impl.hpp index 19b0d0304e..809fee851b 100644 --- a/core/offchain/impl/offchain_worker_impl.hpp +++ b/core/offchain/impl/offchain_worker_impl.hpp @@ -46,17 +46,14 @@ namespace kagome::offchain { OffchainWorkerImpl( const application::AppConfiguration &app_config, std::shared_ptr clock, - std::shared_ptr hasher, std::shared_ptr storage, std::shared_ptr random_generator, std::shared_ptr author_api, const network::OwnPeerInfo ¤t_peer_info, std::shared_ptr persistent_storage, - std::shared_ptr executor, - const primitives::BlockHeader &header, std::shared_ptr ocw_pool); - outcome::result run() override; + void run(std::function &&func, std::string label) override; bool isValidator() const override; @@ -118,18 +115,15 @@ namespace kagome::offchain { const application::AppConfiguration &app_config_; std::shared_ptr clock_; - std::shared_ptr hasher_; std::shared_ptr random_generator_; std::shared_ptr author_api_; const network::OwnPeerInfo ¤t_peer_info_; std::shared_ptr persistent_storage_; std::shared_ptr local_storage_; - std::shared_ptr executor_; - const primitives::BlockHeader header_; - const primitives::BlockInfo block_; std::shared_ptr ocw_pool_; log::Logger log_; + static size_t ocw_counter_; int16_t request_id_ = 0; std::map> active_http_requests_; }; diff --git a/core/offchain/impl/runner.hpp b/core/offchain/impl/runner.hpp index 832cb0a4a8..98492bab13 100644 --- a/core/offchain/impl/runner.hpp +++ b/core/offchain/impl/runner.hpp @@ -14,6 +14,9 @@ #include "utils/thread_pool.hpp" namespace kagome::offchain { + constexpr size_t kMaxThreads = 3; + constexpr size_t kMaxTasks = 1000; + /** * Enqueue at most `max_tasks_` to run on number of `threads_`. * Old tasks do not run and are removed when queue is full. @@ -22,12 +25,17 @@ namespace kagome::offchain { public: using Task = std::function; - Runner(size_t threads, size_t max_tasks) + Runner(size_t threads = kMaxThreads, size_t max_tasks = kMaxTasks) : threads_{threads}, free_threads_{threads}, max_tasks_{max_tasks}, thread_pool_{"ocw", threads_} {} + struct Inject { + explicit Inject() = default; + }; + Runner(Inject, ...) : Runner(kMaxThreads, kMaxTasks) {} + void run(Task &&task) { std::unique_lock lock{mutex_}; if (tasks_.size() >= max_tasks_) { diff --git a/core/offchain/offchain_worker.hpp b/core/offchain/offchain_worker.hpp index 9932ca32ab..2e771e6c10 100644 --- a/core/offchain/offchain_worker.hpp +++ b/core/offchain/offchain_worker.hpp @@ -32,7 +32,7 @@ namespace kagome::offchain { public: virtual ~OffchainWorker() = default; - virtual outcome::result run() = 0; + virtual void run(std::function &&func, std::string label) = 0; // ------------------------- Off-Chain API methods ------------------------- diff --git a/core/offchain/offchain_worker_factory.hpp b/core/offchain/offchain_worker_factory.hpp index 821e9fe4f1..6e0cbaaed4 100644 --- a/core/offchain/offchain_worker_factory.hpp +++ b/core/offchain/offchain_worker_factory.hpp @@ -8,18 +8,13 @@ #include "offchain/offchain_worker.hpp" -#include "primitives/block_header.hpp" -#include "runtime/executor.hpp" - namespace kagome::offchain { class OffchainWorkerFactory { public: virtual ~OffchainWorkerFactory() = default; - virtual std::shared_ptr make( - std::shared_ptr executor, - const primitives::BlockHeader &header) = 0; + virtual std::shared_ptr make() = 0; }; } // namespace kagome::offchain diff --git a/core/parachain/pvf/precheck.cpp b/core/parachain/pvf/precheck.cpp index 9b395d4140..f54a81f66d 100644 --- a/core/parachain/pvf/precheck.cpp +++ b/core/parachain/pvf/precheck.cpp @@ -119,8 +119,7 @@ namespace kagome::parachain { signer->validatorIndex(), }; OUTCOME_TRY(signature, signer->signRaw(statement.signable())); - offchain_worker_pool_->addWorker( - offchain_worker_factory_->make(executor_, header)); + offchain_worker_pool_->addWorker(offchain_worker_factory_->make()); ::libp2p::common::FinalAction remove( [&] { offchain_worker_pool_->removeWorker(); }); OUTCOME_TRY(parachain_api_->submit_pvf_check_statement( diff --git a/core/runtime/runtime_api/impl/offchain_worker_api.cpp b/core/runtime/runtime_api/impl/offchain_worker_api.cpp index 28fc1750f3..c9f9a913dd 100644 --- a/core/runtime/runtime_api/impl/offchain_worker_api.cpp +++ b/core/runtime/runtime_api/impl/offchain_worker_api.cpp @@ -7,31 +7,24 @@ #include "runtime/runtime_api/impl/offchain_worker_api.hpp" #include "application/app_configuration.hpp" +#include "log/logger.hpp" #include "offchain/impl/runner.hpp" #include "offchain/offchain_worker_factory.hpp" #include "runtime/executor.hpp" namespace kagome::runtime { - constexpr size_t kMaxThreads = 3; - constexpr size_t kMaxTasks = 1000; - - outcome::result callOffchainWorkerApi( - Executor &executor, - const primitives::BlockHash &block, - const primitives::BlockHeader &header) { - return executor.callAt( - block, "OffchainWorkerApi_offchain_worker", header); - } OffchainWorkerApiImpl::OffchainWorkerApiImpl( const application::AppConfiguration &app_config, std::shared_ptr ocw_factory, + std::shared_ptr runner, std::shared_ptr executor) : app_config_(app_config), ocw_factory_(std::move(ocw_factory)), - runner_{std::make_shared(kMaxThreads, kMaxTasks)}, + runner_(std::move(runner)), executor_(std::move(executor)) { BOOST_ASSERT(ocw_factory_); + BOOST_ASSERT(runner_); BOOST_ASSERT(executor_); } @@ -53,9 +46,26 @@ namespace kagome::runtime { } } - runner_->run([worker = ocw_factory_->make(executor_, header)] { - std::ignore = worker->run(); - }); + auto label = fmt::format("#{}", block); + + auto func = [block = std::move(block), + header = std::move(header), + executor = executor_] { + auto res = executor->callAt( + block, "OffchainWorkerApi_offchain_worker", header); + + if (res.has_error()) { + auto log = log::createLogger("OffchainWorkerApi", "offchain"); + SL_ERROR(log, + "Can't execute offchain worker for block {}: {}", + block, + res.error()); + } + }; + + runner_->run([worker = ocw_factory_->make(), + label = std::move(label), + func = std::move(func)] { worker->run(func, label); }); return outcome::success(); } diff --git a/core/runtime/runtime_api/impl/offchain_worker_api.hpp b/core/runtime/runtime_api/impl/offchain_worker_api.hpp index 1d82286cd7..943db015bf 100644 --- a/core/runtime/runtime_api/impl/offchain_worker_api.hpp +++ b/core/runtime/runtime_api/impl/offchain_worker_api.hpp @@ -20,16 +20,12 @@ namespace kagome::runtime { class Executor; - outcome::result callOffchainWorkerApi( - Executor &executor, - const primitives::BlockHash &block, - const primitives::BlockHeader &header); - class OffchainWorkerApiImpl final : public OffchainWorkerApi { public: OffchainWorkerApiImpl( const application::AppConfiguration &app_config, std::shared_ptr ocw_factory, + std::shared_ptr runner, std::shared_ptr executor); outcome::result offchain_worker( diff --git a/test/core/consensus/babe/babe_test.cpp b/test/core/consensus/babe/babe_test.cpp index 2d9ac86464..7859fff07c 100644 --- a/test/core/consensus/babe/babe_test.cpp +++ b/test/core/consensus/babe/babe_test.cpp @@ -191,6 +191,8 @@ class BabeTest : public testing::Test { ON_CALL(*offchain_worker_api, offchain_worker(_, _)) .WillByDefault(Return(outcome::success())); + thread_pool_ = std::make_shared("test", 1); + babe = std::make_shared(app_config, clock, block_tree, @@ -208,8 +210,8 @@ class BabeTest : public testing::Test { chain_sub_engine, announce_transmitter, offchain_worker_api, - thread_pool_, - thread_pool_.io_context()); + *thread_pool_, + thread_pool_->io_context()); } AppConfigurationMock app_config; @@ -229,7 +231,7 @@ class BabeTest : public testing::Test { std::shared_ptr chain_sub_engine; std::shared_ptr announce_transmitter; std::shared_ptr offchain_worker_api; - ThreadPool thread_pool_{"test", 1}; + std::shared_ptr thread_pool_; Duration slot_duration = 3s; EpochLength epoch_length = 20; @@ -366,5 +368,5 @@ TEST_F(BabeTest, SlotLeader) { ASSERT_OUTCOME_SUCCESS_TRY(babe->processSlot(slot, best_block_info)); - testutil::wait(*thread_pool_.io_context()); + testutil::wait(*thread_pool_->io_context()); } diff --git a/test/core/consensus/timeline/block_executor_test.cpp b/test/core/consensus/timeline/block_executor_test.cpp index 9627a6d1b8..6e3edf1ac0 100644 --- a/test/core/consensus/timeline/block_executor_test.cpp +++ b/test/core/consensus/timeline/block_executor_test.cpp @@ -142,10 +142,12 @@ class BlockExecutorTest : public testing::Test { testutil::sptr_to_lazy(slots_util_), hasher_); + thread_pool_ = std::make_shared("test", 1); + block_executor_ = std::make_shared(block_tree_, - thread_pool_, - thread_pool_.io_context(), + *thread_pool_, + thread_pool_->io_context(), core_, tx_pool_, hasher_, @@ -168,7 +170,7 @@ class BlockExecutorTest : public testing::Test { std::shared_ptr offchain_worker_api_; kagome::primitives::events::StorageSubscriptionEnginePtr storage_sub_engine_; kagome::primitives::events::ChainSubscriptionEnginePtr chain_sub_engine_; - ThreadPool thread_pool_{"test", 1}; + std::shared_ptr thread_pool_; std::shared_ptr block_executor_; }; @@ -249,5 +251,5 @@ TEST_F(BlockExecutorTest, JustificationFollowDigests) { justification, [](auto &&result) { ASSERT_OUTCOME_SUCCESS_TRY(result); }); - testutil::wait(*thread_pool_.io_context()); + testutil::wait(*thread_pool_->io_context()); } diff --git a/test/mock/core/offchain/offchain_worker_mock.hpp b/test/mock/core/offchain/offchain_worker_mock.hpp index 4c9fae4e57..df33ebc3fe 100644 --- a/test/mock/core/offchain/offchain_worker_mock.hpp +++ b/test/mock/core/offchain/offchain_worker_mock.hpp @@ -14,7 +14,7 @@ namespace kagome::offchain { class OffchainWorkerMock : public OffchainWorker { public: - MOCK_METHOD(outcome::result, run, (), (override)); + MOCK_METHOD(void, run, (std::function &&, std::string), (override)); MOCK_METHOD(bool, isValidator, (), (const, override)); diff --git a/test/testutil/asio_wait.hpp b/test/testutil/asio_wait.hpp index 300c55feb6..390e36bf72 100644 --- a/test/testutil/asio_wait.hpp +++ b/test/testutil/asio_wait.hpp @@ -8,7 +8,7 @@ #include -#include +#include namespace testutil { @@ -16,9 +16,9 @@ namespace testutil { * Wait for all queued tasks. */ void wait(boost::asio::io_context &io) { - std::barrier barrier(2); - io.post([&] { barrier.arrive_and_wait(); }); - barrier.arrive_and_wait(); + std::latch latch(2); + io.post([&] { latch.arrive_and_wait(); }); + latch.arrive_and_wait(); } } // namespace testutil