Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(transaction): Introduce RunCallback #2760

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 58 additions & 45 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <absl/strings/match.h>

#include "base/logging.h"
#include "glog/logging.h"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's clangd 🙄 Will remove it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how the fuck did you succeed to use clangd? It does not work for me due to confusion with stdlib++ implementations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romange I use clangd as well? What are you using? Isn't this the goto language server?

#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/db_slice.h"
Expand Down Expand Up @@ -573,61 +574,21 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {

bool was_suspended = sd.local_mask & SUSPENDED_Q;
bool awaked_prerun = sd.local_mask & AWAKED_Q;
bool is_concluding = coordinator_state_ & COORD_CONCLUDING;

IntentLock::Mode mode = LockMode();

DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
DCHECK(!txq_ooo || (sd.local_mask & OUT_OF_ORDER));

/*************************************************************************/
// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
result = (*cb_ptr_)(this, shard);

if (unique_shard_cnt_ == 1) {
cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = result;
} else {
if (result == OpStatus::OUT_OF_MEMORY) {
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY);
local_result_ = result;
} else {
CHECK_EQ(OpStatus::OK, result);
}
}
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec.
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
local_result_ = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
RunCallback(shard);

/*************************************************************************/
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);

shard->db_slice().OnCbFinish();

// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK(is_concluding || multi_->concluding);
is_concluding = false;
}

// Log to jounrnal only once the command finished running
if (is_concluding || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard, result);
bool is_concluding = coordinator_state_ & COORD_CONCLUDING;

// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard.
Expand Down Expand Up @@ -692,6 +653,54 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
return !is_concluding;
}

void Transaction::RunCallback(EngineShard* shard) {
DCHECK_EQ(EngineShard::tlocal(), shard);

// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
result = (*cb_ptr_)(this, shard);

if (unique_shard_cnt_ == 1) {
cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = result;
} else {
if (result == OpStatus::OUT_OF_MEMORY) {
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY);
local_result_ = result;
} else {
CHECK_EQ(OpStatus::OK, result);
}
}
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec.
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
local_result_ = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}

shard->db_slice().OnCbFinish();

// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK((coordinator_state_ & COORD_CONCLUDING) || multi_->concluding);
coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard
}

// Log to jounrnal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard, result);
}

// TODO: For multi-transactions we should be able to deduce mode() at run-time based
// on the context. For regular multi-transactions we can actually inspect all commands.
// For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or
Expand Down Expand Up @@ -941,9 +950,7 @@ void Transaction::ExecuteAsync() {
poll_flags.set(i, true);
});

auto* ss = ServerState::tlocal();
if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ &&
ss->AllowInlineScheduling()) {
if (CanRunInlined()) {
DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId();
EngineShard::tlocal()->PollExecution("exec_cb", this);
return;
Expand Down Expand Up @@ -1597,6 +1604,12 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
blocking_barrier_.Close();
}

bool Transaction::CanRunInlined() const {
auto* ss = ServerState::tlocal();
return unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() &&
ss->AllowInlineScheduling();
}

OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
KeyIndex key_index;

Expand Down
6 changes: 6 additions & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ class Transaction {
// Finish hop, decrement run barrier
void FinishHop();

// Run actual callback on shard, store result if single shard or OOM was catched
void RunCallback(EngineShard* shard);

// Adds itself to watched queue in the shard. Must run in that shard thread.
OpStatus WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc);

Expand Down Expand Up @@ -553,6 +556,9 @@ class Transaction {
// Should be called immediately after the last hop.
void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result);

// Whether the callback can be run directly on this thread without dispatching on the shard queue
bool CanRunInlined() const;

uint32_t GetUseCount() const {
return use_count_.load(std::memory_order_relaxed);
}
Expand Down
Loading