Skip to content

Commit

Permalink
chore: introduce a secondary TaskQueue for shards (#3508)
Browse files Browse the repository at this point in the history
Also allow the TaskQueue to support multiple consumer fibers.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Aug 14, 2024
1 parent 5cfe415 commit fa0913e
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 27 deletions.
27 changes: 27 additions & 0 deletions src/core/task_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,35 @@

#include "core/task_queue.h"

#include <absl/strings/str_cat.h>

#include "base/logging.h"

using namespace std;
namespace dfly {

__thread unsigned TaskQueue::blocked_submitters_ = 0;

TaskQueue::TaskQueue(unsigned queue_size, unsigned start_size, unsigned pool_max_size)
: queue_(queue_size), consumer_fibers_(start_size), pool_max_size_(pool_max_size) {
CHECK_GT(start_size, 0u);
CHECK_LE(start_size, pool_max_size);
}

void TaskQueue::Start(std::string_view base_name) {
for (size_t i = 0; i < consumer_fibers_.size(); ++i) {
auto& fb = consumer_fibers_[i];
CHECK(!fb.IsJoinable());

string name = absl::StrCat(base_name, "/", i);
fb = util::fb2::Fiber(name, [this] { queue_.Run(); });
}
}

void TaskQueue::Shutdown() {
queue_.Shutdown();
for (auto& fb : consumer_fibers_)
fb.JoinIfNeeded();
}

} // namespace dfly
17 changes: 7 additions & 10 deletions src/core/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace dfly {
*/
class TaskQueue {
public:
explicit TaskQueue(unsigned queue_size = 128) : queue_(queue_size) {
}
// TODO: to add a mechanism to moderate pool size. Currently it's static with pool_start_size.
TaskQueue(unsigned queue_size, unsigned pool_start_size, unsigned pool_max_size);

template <typename F> bool TryAdd(F&& f) {
return queue_.TryAdd(std::forward<F>(f));
Expand Down Expand Up @@ -51,26 +51,23 @@ class TaskQueue {
* @brief Start running consumer loop in the caller thread by spawning fibers.
* Returns immediately.
*/
void Start(std::string_view base_name) {
consumer_fiber_ = util::fb2::Fiber(base_name, [this] { queue_.Run(); });
}
void Start(std::string_view base_name);

/**
* @brief Notifies Run() function to empty the queue and to exit and waits for the consumer
* fiber to finish.
*/
void Shutdown() {
queue_.Shutdown();
consumer_fiber_.JoinIfNeeded();
}
void Shutdown();

static unsigned blocked_submitters() {
return blocked_submitters_;
}

private:
util::fb2::FiberQueue queue_;
util::fb2::Fiber consumer_fiber_;
std::vector<util::fb2::Fiber> consumer_fibers_;
unsigned pool_max_size_;

static __thread unsigned blocked_submitters_;
};

Expand Down
6 changes: 4 additions & 2 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,16 @@ uint32_t EngineShard::DefragTask() {
}

EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
: queue_(kQueueLen),
: queue_(kQueueLen, 1, 1),
queue2_(kQueueLen / 2, 2, 2),
txq_([](const Transaction* t) { return t->txid(); }),
mi_resource_(heap),
shard_id_(pb->GetPoolIndex()) {
tmp_str1 = sdsempty();

defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
queue_.Start(absl::StrCat("shard_queue_", shard_id()));
queue2_.Start(absl::StrCat("l2_queue_", shard_id()));
}

EngineShard::~EngineShard() {
Expand All @@ -389,7 +391,7 @@ void EngineShard::Shutdown() {
DVLOG(1) << "EngineShard::Shutdown";

queue_.Shutdown();

queue2_.Shutdown();
DCHECK(!fiber_periodic_.IsJoinable());

ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
Expand Down
6 changes: 5 additions & 1 deletion src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class EngineShard {
return &queue_;
}

TaskQueue* GetSecondaryQueue() {
return &queue2_;
}

// Processes TxQueue, blocked transactions or any other execution state related to that
// shard. Tries executing the passed transaction if possible (does not guarantee though).
void PollExecution(const char* context, Transaction* trans);
Expand Down Expand Up @@ -223,7 +227,7 @@ class EngineShard {
// return true if we did not complete the shard scan
bool DoDefrag();

TaskQueue queue_;
TaskQueue queue_, queue2_;

TxQueue txq_;
MiMemoryResource mi_resource_;
Expand Down
12 changes: 6 additions & 6 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,19 @@ EngineShardSet* shard_set = nullptr;

void EngineShardSet::Init(uint32_t sz, std::function<void()> shard_handler) {
CHECK_EQ(0u, size());
shard_queue_.resize(sz);

shards_.reset(new EngineShard*[sz]);
size_ = sz;
size_t max_shard_file_size = GetTieredFileLimit(sz);
pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_queue_.size()) {
pp_->AwaitFiberOnAll([this](uint32_t index, ProactorBase* pb) {
if (index < size_) {
InitThreadLocal(pb);
}
});

namespaces.Init();

pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_queue_.size()) {
if (index < size_) {
auto* shard = EngineShard::tlocal();
shard->InitTieredStorage(pb, max_shard_file_size);

Expand Down Expand Up @@ -144,7 +144,7 @@ void EngineShardSet::Shutdown() {
void EngineShardSet::InitThreadLocal(ProactorBase* pb) {
EngineShard::InitThreadLocal(pb);
EngineShard* es = EngineShard::tlocal();
shard_queue_[es->shard_id()] = es->GetFiberQueue();
shards_[es->shard_id()] = es;
}

void EngineShardSet::TEST_EnableCacheMode() {
Expand Down
19 changes: 12 additions & 7 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class EngineShardSet {
}

uint32_t size() const {
return uint32_t(shard_queue_.size());
return size_;
}

util::ProactorPool* pool() {
Expand All @@ -63,13 +63,17 @@ class EngineShardSet {

// Uses a shard queue to dispatch. Callback runs in a dedicated fiber.
template <typename F> auto Await(ShardId sid, F&& f) {
return shard_queue_[sid]->Await(std::forward<F>(f));
return shards_[sid]->GetFiberQueue()->Await(std::forward<F>(f));
}

// Uses a shard queue to dispatch. Callback runs in a dedicated fiber.
template <typename F> auto Add(ShardId sid, F&& f) {
assert(sid < shard_queue_.size());
return shard_queue_[sid]->Add(std::forward<F>(f));
assert(sid < size_);
return shards_[sid]->GetFiberQueue()->Add(std::forward<F>(f));
}

template <typename F> auto AddL2(ShardId sid, F&& f) {
return shards_[sid]->GetSecondaryQueue()->Add(std::forward<F>(f));
}

// Runs a brief function on all shards. Waits for it to complete.
Expand All @@ -94,8 +98,8 @@ class EngineShardSet {
// The functions running inside the shard queue run atomically (sequentially)
// with respect each other on the same shard.
template <typename U> void AwaitRunningOnShardQueue(U&& func) {
util::fb2::BlockingCounter bc(shard_queue_.size());
for (size_t i = 0; i < shard_queue_.size(); ++i) {
util::fb2::BlockingCounter bc(size_);
for (size_t i = 0; i < size_; ++i) {
Add(i, [&func, bc]() mutable {
func(EngineShard::tlocal());
bc->Dec();
Expand All @@ -112,7 +116,8 @@ class EngineShardSet {
void InitThreadLocal(util::ProactorBase* pb);

util::ProactorPool* pp_;
std::vector<TaskQueue*> shard_queue_;
std::unique_ptr<EngineShard*[]> shards_;
uint32_t size_ = 0;
};

template <typename U, typename P>
Expand Down

0 comments on commit fa0913e

Please sign in to comment.