Skip to content

Commit

Permalink
fix: Add selective non-atomic squash dispatch (#1641)
Browse files Browse the repository at this point in the history
fix: Add selecting non-atomic squash dispatch

Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg authored Aug 6, 2023
1 parent 25c3449 commit 6faa530
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
18 changes: 14 additions & 4 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,17 @@ class EngineShardSet {
RunBriefInParallel(std::forward<U>(func), [](auto i) { return true; });
}

// Runs a brief function on selected shard thread. Waits for it to complete.
// Runs a brief function on selected shards. Waits for it to complete.
// `func` must not preempt.
template <typename U, typename P> void RunBriefInParallel(U&& func, P&& pred) const;

template <typename U> void RunBlockingInParallel(U&& func);
// Runs a possibly blocking function on all shards. Waits for it to complete.
template <typename U> void RunBlockingInParallel(U&& func) {
RunBlockingInParallel(std::forward<U>(func), [](auto i) { return true; });
}

// Runs a possibly blocking function on selected shards. Waits for it to complete.
template <typename U, typename P> void RunBlockingInParallel(U&& func, P&& pred);

// Runs func on all shards via the same shard queue that's been used by transactions framework.
// The functions running inside the shard queue run atomically (sequentially)
Expand Down Expand Up @@ -329,14 +335,18 @@ void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const {
bc.Wait();
}

template <typename U> void EngineShardSet::RunBlockingInParallel(U&& func) {
BlockingCounter bc{size()};
template <typename U, typename P> void EngineShardSet::RunBlockingInParallel(U&& func, P&& pred) {
BlockingCounter bc{0};
static_assert(std::is_invocable_v<U, EngineShard*>,
"Argument must be invocable EngineShard* as argument.");
static_assert(std::is_void_v<std::invoke_result_t<U, EngineShard*>>,
"Callable must not have a return value!");

for (uint32_t i = 0; i < size(); ++i) {
if (!pred(i))
continue;

bc.Add(1);
util::ProactorBase* dest = pp_->at(i);

// the "Dispatch" call spawns a fiber underneath.
Expand Down
6 changes: 2 additions & 4 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ bool MultiCommandSquasher::ExecuteSquashed() {
tx->PrepareSquashedMultiHop(base_cid_, cb);
tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); });
} else {
shard_set->RunBlockingInParallel([this, tx](auto* es) {
if (!sharded_[es->shard_id()].cmds.empty())
SquashedHopCb(tx, es);
});
shard_set->RunBlockingInParallel([this, tx](auto* es) { SquashedHopCb(tx, es); },
[this](auto sid) { return !sharded_[sid].cmds.empty(); });
}

bool aborted = false;
Expand Down

0 comments on commit 6faa530

Please sign in to comment.