Skip to content

Commit

Permalink
Refactor: offchain worker (#1866)
Browse files Browse the repository at this point in the history
* refactor: offchain worker
* fix: mock object leaks

Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>
  • Loading branch information
xDimon authored Nov 15, 2023
1 parent bd93022 commit 053b5c0
Show file tree
Hide file tree
Showing 16 changed files with 72 additions and 101 deletions.
11 changes: 3 additions & 8 deletions core/api/service/mmr/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,9 @@ namespace kagome::api {
// TODO(turuslan): simplify offchain
::libp2p::common::MovableFinalAction remove(
[&] { offchain_worker_pool_.get()->removeWorker(); });
outcome::result<decltype(remove)> result{std::move(remove)};
if (auto r = block_tree_.get()->getBlockHeader(at)) {
offchain_worker_pool_.get()->addWorker(
offchain_worker_factory_.get()->make(executor_.get(), r.value()));
return result;
} else {
return decltype(result){r.error()};
}
offchain_worker_pool_.get()->addWorker(
offchain_worker_factory_.get()->make());
return remove;
}

void MmrRpc::registerHandlers() {
Expand Down
1 change: 1 addition & 0 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
#include "offchain/impl/offchain_worker_factory_impl.hpp"
#include "offchain/impl/offchain_worker_impl.hpp"
#include "offchain/impl/offchain_worker_pool_impl.hpp"
#include "offchain/impl/runner.hpp"
#include "outcome/outcome.hpp"
#include "parachain/approval/approval_distribution.hpp"
#include "parachain/availability/bitfield/store_impl.hpp"
Expand Down
10 changes: 1 addition & 9 deletions core/offchain/impl/offchain_worker_factory_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ namespace kagome::offchain {
OffchainWorkerFactoryImpl::OffchainWorkerFactoryImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<clock::SystemClock> clock,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::CSPRNG> random_generator,
std::shared_ptr<api::AuthorApi> author_api,
Expand All @@ -23,35 +22,28 @@ namespace kagome::offchain {
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool)
: app_config_(app_config),
clock_(std::move(clock)),
hasher_(std::move(hasher)),
storage_(std::move(storage)),
random_generator_(std::move(random_generator)),
author_api_(std::move(author_api)),
current_peer_info_(current_peer_info),
persistent_storage_(std::move(persistent_storage)),
offchain_worker_pool_(std::move(offchain_worker_pool)) {
BOOST_ASSERT(clock_);
BOOST_ASSERT(hasher_);
BOOST_ASSERT(storage_);
BOOST_ASSERT(random_generator_);
BOOST_ASSERT(author_api_);
BOOST_ASSERT(persistent_storage_);
BOOST_ASSERT(offchain_worker_pool_);
}

std::shared_ptr<OffchainWorker> OffchainWorkerFactoryImpl::make(
std::shared_ptr<runtime::Executor> executor,
const primitives::BlockHeader &header) {
std::shared_ptr<OffchainWorker> OffchainWorkerFactoryImpl::make() {
return std::make_shared<OffchainWorkerImpl>(app_config_,
clock_,
hasher_,
storage_,
random_generator_,
author_api_,
current_peer_info_,
persistent_storage_,
executor,
header,
offchain_worker_pool_);
}

Expand Down
7 changes: 2 additions & 5 deletions core/offchain/impl/offchain_worker_factory_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <libp2p/host/host.hpp>

#include "clock/clock.hpp"
#include "crypto/random_generator.hpp"
#include "network/types/own_peer_info.hpp"
#include "offchain/offchain_persistent_storage.hpp"
Expand All @@ -35,22 +36,18 @@ namespace kagome::offchain {
OffchainWorkerFactoryImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<clock::SystemClock> clock,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::CSPRNG> random_generator,
std::shared_ptr<api::AuthorApi> author_api,
const network::OwnPeerInfo &current_peer_info,
std::shared_ptr<offchain::OffchainPersistentStorage> persistent_storage,
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool);

std::shared_ptr<OffchainWorker> make(
std::shared_ptr<runtime::Executor> executor,
const primitives::BlockHeader &header) override;
std::shared_ptr<OffchainWorker> make() override;

private:
const application::AppConfiguration &app_config_;
std::shared_ptr<clock::SystemClock> clock_;
std::shared_ptr<crypto::Hasher> hasher_;
std::shared_ptr<storage::SpacedStorage> storage_;
std::shared_ptr<crypto::CSPRNG> random_generator_;
std::shared_ptr<api::AuthorApi> author_api_;
Expand Down
38 changes: 9 additions & 29 deletions core/offchain/impl/offchain_worker_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,74 +22,55 @@

namespace kagome::offchain {

size_t OffchainWorkerImpl::ocw_counter_ = 0;

OffchainWorkerImpl::OffchainWorkerImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<clock::SystemClock> clock,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::CSPRNG> random_generator,
std::shared_ptr<api::AuthorApi> author_api,
const network::OwnPeerInfo &current_peer_info,
std::shared_ptr<OffchainPersistentStorage> persistent_storage,
std::shared_ptr<runtime::Executor> executor,
const primitives::BlockHeader &header,
std::shared_ptr<OffchainWorkerPool> ocw_pool)
: app_config_(app_config),
clock_(std::move(clock)),
hasher_(std::move(hasher)),
random_generator_(std::move(random_generator)),
author_api_(std::move(author_api)),
current_peer_info_(current_peer_info),
persistent_storage_(std::move(persistent_storage)),
executor_(std::move(executor)),
header_(header),
ocw_pool_(std::move(ocw_pool)),
log_(log::createLogger(
"OffchainWorker#" + std::to_string(header_.number), "offchain")) {
"OffchainWorker#" + std::to_string(++ocw_counter_), "offchain")) {
BOOST_ASSERT(clock_);
BOOST_ASSERT(hasher_);
BOOST_ASSERT(storage);
BOOST_ASSERT(random_generator_);
BOOST_ASSERT(author_api_);
BOOST_ASSERT(persistent_storage_);
BOOST_ASSERT(executor_);
BOOST_ASSERT(ocw_pool_);

const_cast<primitives::BlockInfo &>(block_) = header_.blockInfo();

local_storage_ =
std::make_shared<OffchainLocalStorageImpl>(std::move(storage));
}

outcome::result<void> OffchainWorkerImpl::run() {
void OffchainWorkerImpl::run(std::function<void()> &&func,
std::string label) {
BOOST_ASSERT(not ocw_pool_->getWorker());

::libp2p::common::FinalAction at_end(
[prev_thread_name = soralog::util::getThreadName()] {
soralog::util::setThreadName(prev_thread_name);
});

soralog::util::setThreadName("ocw.#" + std::to_string(block_.number));
soralog::util::setThreadName("ocw." + label);

ocw_pool_->addWorker(shared_from_this());
::libp2p::common::FinalAction remove([&] { ocw_pool_->removeWorker(); });

SL_TRACE(log_, "Offchain worker is started for block {}", block_);

auto res = runtime::callOffchainWorkerApi(*executor_, block_.hash, header_);

if (res.has_error()) {
SL_ERROR(log_,
"Can't execute offchain worker for block {}: {}",
block_,
res.error());
return res.error();
}
SL_TRACE(log_, "Offchain worker with label {} is started", label);

SL_DEBUG(
log_, "Offchain worker is successfully executed for block {}", block_);
func();

return outcome::success();
SL_TRACE(log_, "Offchain worker with label {} is finished", label);
}

bool OffchainWorkerImpl::isValidator() const {
Expand Down Expand Up @@ -329,7 +310,6 @@ namespace kagome::offchain {
// issue: https://github.com/soramitsu/kagome/issues/998
throw std::runtime_error(
"This method of OffchainWorkerImpl is not implemented yet");
return;
}

} // namespace kagome::offchain
10 changes: 2 additions & 8 deletions core/offchain/impl/offchain_worker_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,14 @@ namespace kagome::offchain {
OffchainWorkerImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<clock::SystemClock> clock,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<storage::SpacedStorage> storage,
std::shared_ptr<crypto::CSPRNG> random_generator,
std::shared_ptr<api::AuthorApi> author_api,
const network::OwnPeerInfo &current_peer_info,
std::shared_ptr<OffchainPersistentStorage> persistent_storage,
std::shared_ptr<runtime::Executor> executor,
const primitives::BlockHeader &header,
std::shared_ptr<OffchainWorkerPool> ocw_pool);

outcome::result<void> run() override;
void run(std::function<void()> &&func, std::string label) override;

bool isValidator() const override;

Expand Down Expand Up @@ -118,18 +115,15 @@ namespace kagome::offchain {

const application::AppConfiguration &app_config_;
std::shared_ptr<clock::SystemClock> clock_;
std::shared_ptr<crypto::Hasher> hasher_;
std::shared_ptr<crypto::CSPRNG> random_generator_;
std::shared_ptr<api::AuthorApi> author_api_;
const network::OwnPeerInfo &current_peer_info_;
std::shared_ptr<offchain::OffchainPersistentStorage> persistent_storage_;
std::shared_ptr<offchain::OffchainLocalStorage> local_storage_;
std::shared_ptr<runtime::Executor> executor_;
const primitives::BlockHeader header_;
const primitives::BlockInfo block_;
std::shared_ptr<OffchainWorkerPool> ocw_pool_;
log::Logger log_;

static size_t ocw_counter_;
int16_t request_id_ = 0;
std::map<RequestId, std::shared_ptr<HttpRequest>> active_http_requests_;
};
Expand Down
10 changes: 9 additions & 1 deletion core/offchain/impl/runner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include "utils/thread_pool.hpp"

namespace kagome::offchain {
constexpr size_t kMaxThreads = 3;
constexpr size_t kMaxTasks = 1000;

/**
* Enqueue at most `max_tasks_` to run on number of `threads_`.
* Old tasks do not run and are removed when queue is full.
Expand All @@ -22,12 +25,17 @@ namespace kagome::offchain {
public:
using Task = std::function<void()>;

Runner(size_t threads, size_t max_tasks)
Runner(size_t threads = kMaxThreads, size_t max_tasks = kMaxTasks)
: threads_{threads},
free_threads_{threads},
max_tasks_{max_tasks},
thread_pool_{"ocw", threads_} {}

struct Inject {
explicit Inject() = default;
};
Runner(Inject, ...) : Runner(kMaxThreads, kMaxTasks) {}

void run(Task &&task) {
std::unique_lock lock{mutex_};
if (tasks_.size() >= max_tasks_) {
Expand Down
2 changes: 1 addition & 1 deletion core/offchain/offchain_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace kagome::offchain {
public:
virtual ~OffchainWorker() = default;

virtual outcome::result<void> run() = 0;
virtual void run(std::function<void()> &&func, std::string label) = 0;

// ------------------------- Off-Chain API methods -------------------------

Expand Down
7 changes: 1 addition & 6 deletions core/offchain/offchain_worker_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@

#include "offchain/offchain_worker.hpp"

#include "primitives/block_header.hpp"
#include "runtime/executor.hpp"

namespace kagome::offchain {

class OffchainWorkerFactory {
public:
virtual ~OffchainWorkerFactory() = default;

virtual std::shared_ptr<OffchainWorker> make(
std::shared_ptr<runtime::Executor> executor,
const primitives::BlockHeader &header) = 0;
virtual std::shared_ptr<OffchainWorker> make() = 0;
};

} // namespace kagome::offchain
3 changes: 1 addition & 2 deletions core/parachain/pvf/precheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ namespace kagome::parachain {
signer->validatorIndex(),
};
OUTCOME_TRY(signature, signer->signRaw(statement.signable()));
offchain_worker_pool_->addWorker(
offchain_worker_factory_->make(executor_, header));
offchain_worker_pool_->addWorker(offchain_worker_factory_->make());
::libp2p::common::FinalAction remove(
[&] { offchain_worker_pool_->removeWorker(); });
OUTCOME_TRY(parachain_api_->submit_pvf_check_statement(
Expand Down
38 changes: 24 additions & 14 deletions core/runtime/runtime_api/impl/offchain_worker_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,24 @@
#include "runtime/runtime_api/impl/offchain_worker_api.hpp"

#include "application/app_configuration.hpp"
#include "log/logger.hpp"
#include "offchain/impl/runner.hpp"
#include "offchain/offchain_worker_factory.hpp"
#include "runtime/executor.hpp"

namespace kagome::runtime {
constexpr size_t kMaxThreads = 3;
constexpr size_t kMaxTasks = 1000;

outcome::result<void> callOffchainWorkerApi(
Executor &executor,
const primitives::BlockHash &block,
const primitives::BlockHeader &header) {
return executor.callAt<void>(
block, "OffchainWorkerApi_offchain_worker", header);
}

OffchainWorkerApiImpl::OffchainWorkerApiImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<offchain::OffchainWorkerFactory> ocw_factory,
std::shared_ptr<offchain::Runner> runner,
std::shared_ptr<Executor> executor)
: app_config_(app_config),
ocw_factory_(std::move(ocw_factory)),
runner_{std::make_shared<offchain::Runner>(kMaxThreads, kMaxTasks)},
runner_(std::move(runner)),
executor_(std::move(executor)) {
BOOST_ASSERT(ocw_factory_);
BOOST_ASSERT(runner_);
BOOST_ASSERT(executor_);
}

Expand All @@ -53,9 +46,26 @@ namespace kagome::runtime {
}
}

runner_->run([worker = ocw_factory_->make(executor_, header)] {
std::ignore = worker->run();
});
auto label = fmt::format("#{}", block);

auto func = [block = std::move(block),
header = std::move(header),
executor = executor_] {
auto res = executor->callAt<void>(
block, "OffchainWorkerApi_offchain_worker", header);

if (res.has_error()) {
auto log = log::createLogger("OffchainWorkerApi", "offchain");
SL_ERROR(log,
"Can't execute offchain worker for block {}: {}",
block,
res.error());
}
};

runner_->run([worker = ocw_factory_->make(),
label = std::move(label),
func = std::move(func)] { worker->run(func, label); });

return outcome::success();
}
Expand Down
6 changes: 1 addition & 5 deletions core/runtime/runtime_api/impl/offchain_worker_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@ namespace kagome::runtime {

class Executor;

outcome::result<void> callOffchainWorkerApi(
Executor &executor,
const primitives::BlockHash &block,
const primitives::BlockHeader &header);

class OffchainWorkerApiImpl final : public OffchainWorkerApi {
public:
OffchainWorkerApiImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<offchain::OffchainWorkerFactory> ocw_factory,
std::shared_ptr<offchain::Runner> runner,
std::shared_ptr<Executor> executor);

outcome::result<void> offchain_worker(
Expand Down
Loading

0 comments on commit 053b5c0

Please sign in to comment.