From 295861adf1e71afd6c57ee0bce4a50db31e8ec00 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 21 Mar 2024 21:15:17 +0300 Subject: [PATCH] chore(transaction): Introduce RunCallback Signed-off-by: Vladislav Oleshko --- src/server/transaction.cc | 102 ++++++++++++++++++++++---------------- src/server/transaction.h | 6 +++ 2 files changed, 64 insertions(+), 44 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 57274438de6c..2be445561dc7 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -7,6 +7,7 @@ #include #include "base/logging.h" +#include "glog/logging.h" #include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/db_slice.h" @@ -573,7 +574,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { bool was_suspended = sd.local_mask & SUSPENDED_Q; bool awaked_prerun = sd.local_mask & AWAKED_Q; - bool is_concluding = coordinator_state_ & COORD_CONCLUDING; IntentLock::Mode mode = LockMode(); @@ -581,53 +581,14 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { DCHECK(!txq_ooo || (sd.local_mask & OUT_OF_ORDER)); /*************************************************************************/ - // Actually running the callback. - // If you change the logic here, also please change the logic - RunnableResult result; - try { - // if a transaction is suspended, we still run it because of brpoplpush/blmove case - // that needs to run lpush on its suspended shard. - result = (*cb_ptr_)(this, shard); - if (unique_shard_cnt_ == 1) { - cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback. - local_result_ = result; - } else { - if (result == OpStatus::OUT_OF_MEMORY) { - absl::base_internal::SpinLockHolder lk{&local_result_mu_}; - CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY); - local_result_ = result; - } else { - CHECK_EQ(OpStatus::OK, result); - } - } - } catch (std::bad_alloc&) { - LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec. - absl::base_internal::SpinLockHolder lk{&local_result_mu_}; - local_result_ = OpStatus::OUT_OF_MEMORY; - } catch (std::exception& e) { - LOG(FATAL) << "Unexpected exception " << e.what(); - } + RunCallback(shard); /*************************************************************************/ // at least the coordinator thread owns the reference. DCHECK_GE(GetUseCount(), 1u); - shard->db_slice().OnCbFinish(); - - // Handle result flags to alter behaviour. - if (result.flags & RunnableResult::AVOID_CONCLUDING) { - // Multi shard callbacks should either all or none choose to conclude. Because they can't - // communicate, the must know their decision ahead, consequently there is no point in using this - // flag. - CHECK_EQ(unique_shard_cnt_, 1u); - DCHECK(is_concluding || multi_->concluding); - is_concluding = false; - } - - // Log to jounrnal only once the command finished running - if (is_concluding || (multi_ && multi_->concluding)) - LogAutoJournalOnShard(shard, result); + bool is_concluding = coordinator_state_ & COORD_CONCLUDING; // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation // and successive hops are run by continuation_trans_ in engine shard. @@ -692,6 +653,54 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { return !is_concluding; } +void Transaction::RunCallback(EngineShard* shard) { + DCHECK_EQ(EngineShard::tlocal(), shard); + + // Actually running the callback. + // If you change the logic here, also please change the logic + RunnableResult result; + try { + // if a transaction is suspended, we still run it because of brpoplpush/blmove case + // that needs to run lpush on its suspended shard. + result = (*cb_ptr_)(this, shard); + + if (unique_shard_cnt_ == 1) { + cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback. + local_result_ = result; + } else { + if (result == OpStatus::OUT_OF_MEMORY) { + absl::base_internal::SpinLockHolder lk{&local_result_mu_}; + CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY); + local_result_ = result; + } else { + CHECK_EQ(OpStatus::OK, result); + } + } + } catch (std::bad_alloc&) { + LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec. + absl::base_internal::SpinLockHolder lk{&local_result_mu_}; + local_result_ = OpStatus::OUT_OF_MEMORY; + } catch (std::exception& e) { + LOG(FATAL) << "Unexpected exception " << e.what(); + } + + shard->db_slice().OnCbFinish(); + + // Handle result flags to alter behaviour. + if (result.flags & RunnableResult::AVOID_CONCLUDING) { + // Multi shard callbacks should either all or none choose to conclude. Because they can't + // communicate, the must know their decision ahead, consequently there is no point in using this + // flag. + CHECK_EQ(unique_shard_cnt_, 1u); + DCHECK((coordinator_state_ & COORD_CONCLUDING) || multi_->concluding); + coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard + } + + // Log to jounrnal only once the command finished running + if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) + LogAutoJournalOnShard(shard, result); +} + // TODO: For multi-transactions we should be able to deduce mode() at run-time based // on the context. For regular multi-transactions we can actually inspect all commands. // For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or @@ -942,8 +951,7 @@ void Transaction::ExecuteAsync() { }); auto* ss = ServerState::tlocal(); - if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ && - ss->AllowInlineScheduling()) { + if (CanRunInlined()) { DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId(); EngineShard::tlocal()->PollExecution("exec_cb", this); return; @@ -1597,6 +1605,12 @@ void Transaction::CancelBlocking(std::function status_cb) { blocking_barrier_.Close(); } +bool Transaction::CanRunInlined() const { + auto* ss = ServerState::tlocal(); + return unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() && + ss->AllowInlineScheduling(); +} + OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { KeyIndex key_index; diff --git a/src/server/transaction.h b/src/server/transaction.h index f7114692a0ba..6aa9d0dbc75f 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -520,6 +520,9 @@ class Transaction { // Finish hop, decrement run barrier void FinishHop(); + // Run actual callback on shard, store result if single shard or OOM was catched + void RunCallback(EngineShard* shard); + // Adds itself to watched queue in the shard. Must run in that shard thread. OpStatus WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc); @@ -553,6 +556,9 @@ class Transaction { // Should be called immediately after the last hop. void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result); + // Whether the callback can be run directly on this thread without dispatching on the shard queue + bool CanRunInlined() const; + uint32_t GetUseCount() const { return use_count_.load(std::memory_order_relaxed); }