Skip to content

Commit

Permalink
fix(server): Fix transaction index + shard_data multi re-use (#958)
Browse files Browse the repository at this point in the history
  • Loading branch information
dranikpg authored Mar 19, 2023
1 parent 9f0e269 commit 623c5a8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
29 changes: 18 additions & 11 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ void Transaction::InitByKeys(KeyIndex key_index) {
DCHECK_LT(key_index.start, args.size());

bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING;
bool single_key = key_index.HasSingleKey() && !IsAtomicMulti();
bool single_key = key_index.HasSingleKey();

if (single_key) {
if (single_key && !IsAtomicMulti()) {
DCHECK_GT(key_index.step, 0u);

// We don't have to split the arguments by shards, so we can copy them directly.
StoreKeysInArgs(key_index, needs_reverse_mapping);

shard_data_.resize(1);
shard_data_.resize(IsMulti() ? shard_set->size() : 1);
shard_data_.front().local_mask |= ACTIVE;

unique_shard_cnt_ = 1;
Expand Down Expand Up @@ -274,7 +274,7 @@ void Transaction::InitByKeys(KeyIndex key_index) {
// Compress shard data, if we occupy only one shard.
if (unique_shard_cnt_ == 1) {
PerShardData* sd;
if (IsAtomicMulti()) {
if (IsMulti()) {
sd = &shard_data_[unique_shard_id_];
} else {
shard_data_.resize(1);
Expand Down Expand Up @@ -380,7 +380,11 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
cb_ = nullptr;

if (multi_->mode == NON_ATOMIC) {
shard_data_.resize(0);
for (auto& sd : shard_data_) {
sd.arg_count = sd.arg_start = sd.local_mask = 0;
sd.pq_pos = TxQueue::kEnd;
DCHECK_EQ(sd.is_armed.load(memory_order_relaxed), false);
}
txid_ = 0;
coordinator_state_ = 0;
}
Expand Down Expand Up @@ -647,10 +651,11 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// and directly dispatch the task to its destination shard.
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti();
if (schedule_fast) {
DCHECK_EQ(1u, shard_data_.size());
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);

// IsArmedInShard() first checks run_count_ before shard_data, so use release ordering.
shard_data_[0].is_armed.store(true, memory_order_relaxed);
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);
run_count_.store(1, memory_order_release);

time_now_ms_ = GetCurrentTimeMs();
Expand Down Expand Up @@ -842,12 +847,13 @@ void Transaction::ExecuteAsync() {

void Transaction::RunQuickie(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK_EQ(1u, shard_data_.size());
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK_EQ(0u, txid_);

shard->IncQuickRun();

auto& sd = shard_data_[0];
auto& sd = shard_data_[SidToId(unique_shard_id_)];
DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER));

DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0];
Expand Down Expand Up @@ -910,12 +916,13 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK_EQ(0u, txid_);
DCHECK_EQ(1u, shard_data_.size());
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
DCHECK_NE(unique_shard_id_, kInvalidSid);

auto mode = Mode();
auto lock_args = GetLockArgs(shard->shard_id());

auto& sd = shard_data_.front();
auto& sd = shard_data_[SidToId(unique_shard_id_)];
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);

// Fast path - for uncontended keys, just run the callback.
Expand Down
11 changes: 8 additions & 3 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class Transaction {

// Accessed within shard thread.
// Bitmask of LocalState enums.
uint16_t local_mask{0};
uint16_t local_mask = 0;

// Needed to rollback inconsistent schedulings or remove OOO transactions from
// tx queue.
Expand Down Expand Up @@ -452,6 +452,11 @@ class Transaction {
// multiple threads access this array to synchronize between themselves using
// PerShardData.state, it can be tricky. The complication comes from multi_ transactions where
// scheduled transaction is accessed between operations as well.

// Stores per-shard data.
// For non-multi transactions, it can be of size one in case only one shard is active
// (unique_shard_cnt_ = 1).
// Never access directly with index, always use SidToId.
absl::InlinedVector<PerShardData, 4> shard_data_; // length = shard_count

// Stores arguments of the transaction (i.e. keys + values) partitioned by shards.
Expand Down Expand Up @@ -479,8 +484,8 @@ class Transaction {
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};

// unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread.
uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_
ShardId unique_shard_id_{kInvalidSid};
uint32_t unique_shard_cnt_{0}; // Number of unique shards active
ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1

util::fibers_ext::EventCount blocking_ec_; // Used to wake blocking transactions.
util::fibers_ext::EventCount run_ec_; // Used to wait for shard callbacks
Expand Down

0 comments on commit 623c5a8

Please sign in to comment.