Skip to content

Commit

Permalink
refactor: Subscription Manager uses async framework (#1605)
Browse files Browse the repository at this point in the history
Fix #1209
  • Loading branch information
cindyyan317 authored Aug 16, 2024
1 parent 5332d3e commit 4cbd3f5
Show file tree
Hide file tree
Showing 33 changed files with 397 additions and 489 deletions.
3 changes: 0 additions & 3 deletions src/etl/ETLService.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@
struct AccountTransactionsData;
struct NFTTransactionsData;
struct NFTsData;
namespace feed {
class SubscriptionManager;
} // namespace feed

/**
* @brief This namespace contains everything to do with the ETL and ETL sources.
Expand Down
68 changes: 32 additions & 36 deletions src/feed/SubscriptionManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "feed/impl/LedgerFeed.hpp"
#include "feed/impl/ProposedTransactionFeed.hpp"
#include "feed/impl/TransactionFeed.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/log/Logger.hpp"

#include <boost/asio/executor_work_guard.hpp>
Expand All @@ -40,10 +42,8 @@
#include <xrpl/protocol/LedgerHeader.h>

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <vector>

/**
Expand All @@ -57,9 +57,8 @@ namespace feed {
* @brief A subscription manager is responsible for managing the subscriptions and publishing the feeds
*/
class SubscriptionManager : public SubscriptionManagerInterface {
std::reference_wrapper<boost::asio::io_context> ioContext_;
std::shared_ptr<data::BackendInterface const> backend_;

util::async::AnyExecutionContext ctx_;
impl::ForwardFeed manifestFeed_;
impl::ForwardFeed validationsFeed_;
impl::LedgerFeed ledgerFeed_;
Expand All @@ -71,24 +70,31 @@ class SubscriptionManager : public SubscriptionManagerInterface {
/**
* @brief Construct a new Subscription Manager object
*
* @param ioContext The io context to use
* @param executor The executor to use to publish the feeds
* @param backend The backend to use
*/
SubscriptionManager(
boost::asio::io_context& ioContext,
std::shared_ptr<data::BackendInterface const> const& backend
)
: ioContext_(ioContext)
, backend_(backend)
, manifestFeed_(ioContext, "manifest")
, validationsFeed_(ioContext, "validations")
, ledgerFeed_(ioContext)
, bookChangesFeed_(ioContext)
, transactionFeed_(ioContext)
, proposedTransactionFeed_(ioContext)
template <class ExecutorCtx>
SubscriptionManager(ExecutorCtx& executor, std::shared_ptr<data::BackendInterface const> const& backend)
: backend_(backend)
, ctx_(executor)
, manifestFeed_(ctx_, "manifest")
, validationsFeed_(ctx_, "validations")
, ledgerFeed_(ctx_)
, bookChangesFeed_(ctx_)
, transactionFeed_(ctx_)
, proposedTransactionFeed_(ctx_)
{
}

/**
* @brief Destructor of the SubscriptionManager object. It will block until all running jobs finished.
*/
~SubscriptionManager() override
{
ctx_.stop();
ctx_.join();
}

/**
* @brief Subscribe to the book changes feed.
* @param subscriber
Expand Down Expand Up @@ -286,16 +292,15 @@ class SubscriptionManager : public SubscriptionManagerInterface {
};

/**
* @brief The help class to run the subscription manager. The container of io_context which is used to publish the
* feeds.
* @brief The help class to run the subscription manager. The container of PoolExecutionContext which is used to publish
* the feeds.
*/
class SubscriptionManagerRunner {
boost::asio::io_context ioContext_;
std::uint64_t workersNum_;
using ActualExecutionCtx = util::async::PoolExecutionContext;
ActualExecutionCtx ctx_;
std::shared_ptr<SubscriptionManager> subscriptionManager_;
util::Logger logger_{"Subscriptions"};
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_ =
boost::asio::make_work_guard(ioContext_);
std::vector<std::thread> workers_;

public:
/**
Expand All @@ -305,13 +310,11 @@ class SubscriptionManagerRunner {
* @param backend The backend to use
*/
SubscriptionManagerRunner(util::Config const& config, std::shared_ptr<data::BackendInterface> const& backend)
: subscriptionManager_(std::make_shared<SubscriptionManager>(ioContext_, backend))
: workersNum_(config.valueOr<std::uint64_t>("subscription_workers", 1))
, ctx_(workersNum_)
, subscriptionManager_(std::make_shared<SubscriptionManager>(ctx_, backend))
{
auto numThreads = config.valueOr<uint64_t>("subscription_workers", 1);
LOG(logger_.info()) << "Starting subscription manager with " << numThreads << " workers";
workers_.reserve(numThreads);
for (auto i = numThreads; i > 0; --i)
workers_.emplace_back([&] { ioContext_.run(); });
LOG(logger_.info()) << "Starting subscription manager with " << workersNum_ << " workers";
}

/**
Expand All @@ -324,12 +327,5 @@ class SubscriptionManagerRunner {
{
return subscriptionManager_;
}

~SubscriptionManagerRunner()
{
work_.reset();
for (auto& worker : workers_)
worker.join();
}
};
} // namespace feed
3 changes: 2 additions & 1 deletion src/feed/impl/BookChangesFeed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "data/Types.hpp"
#include "feed/impl/SingleFeedBase.hpp"
#include "rpc/BookChangesHelper.hpp"
#include "util/async/AnyExecutionContext.hpp"

#include <boost/asio/io_context.hpp>
#include <boost/json/serialize.hpp>
Expand All @@ -37,7 +38,7 @@ namespace feed::impl {
* '0A5010342D8AAFABDCA58A68F6F588E1C6E58C21B63ED6CA8DB2478F58F3ECD5', 'ledger_time': 756395682, 'changes': []}
*/
struct BookChangesFeed : public SingleFeedBase {
BookChangesFeed(boost::asio::io_context& ioContext) : SingleFeedBase(ioContext, "book_changes")
BookChangesFeed(util::async::AnyExecutionContext& executionCtx) : SingleFeedBase(executionCtx, "book_changes")
{
}

Expand Down
5 changes: 3 additions & 2 deletions src/feed/impl/LedgerFeed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "data/BackendInterface.hpp"
#include "feed/Types.hpp"
#include "feed/impl/SingleFeedBase.hpp"
#include "util/async/AnyExecutionContext.hpp"

#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
Expand All @@ -46,9 +47,9 @@ class LedgerFeed : public SingleFeedBase {
public:
/**
* @brief Construct a new Ledger Feed object
* @param ioContext The actual publish will be called in the strand of this.
* @param executionCtx The actual publish will be called in the strand of this.
*/
LedgerFeed(boost::asio::io_context& ioContext) : SingleFeedBase(ioContext, "ledger")
LedgerFeed(util::async::AnyExecutionContext& executionCtx) : SingleFeedBase(executionCtx, "ledger")
{
}

Expand Down
23 changes: 12 additions & 11 deletions src/feed/impl/ProposedTransactionFeed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,18 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
auto const accounts = rpc::getAccountsFromTransaction(transaction);
auto affectedAccounts = std::unordered_set<ripple::AccountID>(accounts.cbegin(), accounts.cend());

boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
notified_.clear();
signal_.emit(pubMsg);
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
// However, if the same connection subscribe both stream and account, it will still receive the message twice.
// notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
// acts like this.
notified_.clear();
for (auto const& account : affectedAccounts)
accountSignal_.emit(account, pubMsg);
});
[[maybe_unused]] auto task =
strand_.execute([this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
notified_.clear();
signal_.emit(pubMsg);
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple
// accounts However, if the same connection subscribe both stream and account, it will still receive the
// message twice. notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for
// now, since rippled acts like this.
notified_.clear();
for (auto const& account : affectedAccounts)
accountSignal_.emit(account, pubMsg);
});
}

std::uint64_t
Expand Down
10 changes: 6 additions & 4 deletions src/feed/impl/ProposedTransactionFeed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "feed/impl/TrackableSignal.hpp"
#include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"

Expand Down Expand Up @@ -51,7 +53,7 @@ class ProposedTransactionFeed {

std::unordered_set<SubscriberPtr>
notified_; // Used by slots to prevent double notifications if tx contains multiple subscribed accounts
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
util::async::AnyStrand strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;

Expand All @@ -61,10 +63,10 @@ class ProposedTransactionFeed {
public:
/**
* @brief Construct a Proposed Transaction Feed object.
* @param ioContext The actual publish will be called in the strand of this.
* @param executionCtx The actual publish will be called in the strand of this.
*/
ProposedTransactionFeed(boost::asio::io_context& ioContext)
: strand_(boost::asio::make_strand(ioContext))
ProposedTransactionFeed(util::async::AnyExecutionContext& executionCtx)
: strand_(executionCtx.makeStrand())
, subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
, subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))

Expand Down
9 changes: 5 additions & 4 deletions src/feed/impl/SingleFeedBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "feed/Types.hpp"
#include "feed/impl/TrackableSignal.hpp"
#include "feed/impl/Util.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp"

#include <boost/asio/io_context.hpp>
Expand All @@ -35,8 +36,8 @@

namespace feed::impl {

SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
: strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
SingleFeedBase::SingleFeedBase(util::async::AnyExecutionContext& executionCtx, std::string const& name)
: strand_(executionCtx.makeStrand()), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
{
}

Expand Down Expand Up @@ -67,8 +68,8 @@ SingleFeedBase::unsub(SubscriberSharedPtr const& subscriber)
void
SingleFeedBase::pub(std::string msg) const
{
boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
auto const msgPtr = std::make_shared<std::string>(std::move(msg));
[[maybe_unused]] auto task = strand_.execute([this, msg = std::move(msg)]() {
auto const msgPtr = std::make_shared<std::string>(msg);
signal_.emit(msgPtr);
});
}
Expand Down
8 changes: 5 additions & 3 deletions src/feed/impl/SingleFeedBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include "feed/Types.hpp"
#include "feed/impl/TrackableSignal.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"

Expand All @@ -38,7 +40,7 @@ namespace feed::impl {
* @brief Base class for single feed.
*/
class SingleFeedBase {
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
util::async::AnyStrand strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
util::Logger logger_{"Subscriptions"};
Expand All @@ -47,10 +49,10 @@ class SingleFeedBase {
public:
/**
* @brief Construct a new Single Feed Base object
* @param ioContext The actual publish will be called in the strand of this.
* @param executionCtx The actual publish will be called in the strand of this.
* @param name The promethues counter name of the feed.
*/
SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name);
SingleFeedBase(util::async::AnyExecutionContext& executionCtx, std::string const& name);

/**
* @brief Subscribe the feed.
Expand Down
49 changes: 23 additions & 26 deletions src/feed/impl/TransactionFeed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,33 +276,30 @@ TransactionFeed::pub(
}
}

boost::asio::post(
strand_,
[this,
allVersionsMsgs = std::move(allVersionsMsgs),
affectedAccounts = std::move(affectedAccounts),
affectedBooks = std::move(affectedBooks)]() {
notified_.clear();
signal_.emit(allVersionsMsgs);
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
// rippled SENDS the same message twice
notified_.clear();
txProposedsignal_.emit(allVersionsMsgs);
notified_.clear();
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
// if it affects multiple accounts watched by the same connection
for (auto const& account : affectedAccounts) {
accountSignal_.emit(account, allVersionsMsgs);
accountProposedSignal_.emit(account, allVersionsMsgs);
}
notified_.clear();
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple
// books watched by the same connection
for (auto const& book : affectedBooks) {
bookSignal_.emit(book, allVersionsMsgs);
}
[[maybe_unused]] auto task = strand_.execute([this,
allVersionsMsgs = std::move(allVersionsMsgs),
affectedAccounts = std::move(affectedAccounts),
affectedBooks = std::move(affectedBooks)]() {
notified_.clear();
signal_.emit(allVersionsMsgs);
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
// rippled SENDS the same message twice
notified_.clear();
txProposedsignal_.emit(allVersionsMsgs);
notified_.clear();
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
// if it affects multiple accounts watched by the same connection
for (auto const& account : affectedAccounts) {
accountSignal_.emit(account, allVersionsMsgs);
accountProposedSignal_.emit(account, allVersionsMsgs);
}
notified_.clear();
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple
// books watched by the same connection
for (auto const& book : affectedBooks) {
bookSignal_.emit(book, allVersionsMsgs);
}
);
});
}

void
Expand Down
10 changes: 6 additions & 4 deletions src/feed/impl/TransactionFeed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "feed/impl/TrackableSignal.hpp"
#include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"

Expand Down Expand Up @@ -63,7 +65,7 @@ class TransactionFeed {

util::Logger logger_{"Subscriptions"};

boost::asio::strand<boost::asio::io_context::executor_type> strand_;
util::async::AnyStrand strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
Expand All @@ -82,10 +84,10 @@ class TransactionFeed {
public:
/**
* @brief Construct a new Transaction Feed object.
* @param ioContext The actual publish will be called in the strand of this.
* @param executionCtx The actual publish will be called in the strand of this.
*/
TransactionFeed(boost::asio::io_context& ioContext)
: strand_(boost::asio::make_strand(ioContext))
TransactionFeed(util::async::AnyExecutionContext& executionCtx)
: strand_(executionCtx.makeStrand())
, subAllCount_(getSubscriptionsGaugeInt("tx"))
, subAccountCount_(getSubscriptionsGaugeInt("account"))
, subBookCount_(getSubscriptionsGaugeInt("book"))
Expand Down
3 changes: 0 additions & 3 deletions src/rpc/RPCEngine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@
#include <utility>

// forward declarations
namespace feed {
class SubscriptionManager;
} // namespace feed
namespace etl {
class LoadBalancer;
class ETLService;
Expand Down
Loading

0 comments on commit 4cbd3f5

Please sign in to comment.