Skip to content

Commit

Permalink
fix(transaction): Add special barrier for blocking tx
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Jan 31, 2024
1 parent 90a9f05 commit 396ca3d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 48 deletions.
102 changes: 60 additions & 42 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,40 @@ uint32_t Transaction::PhasedBarrier::DEBUG_Count() const {
return count_.load(memory_order_relaxed);
}

bool Transaction::SingleClaimBarrier::IsClaimed() const {
return claimed_.load(memory_order_relaxed);
}

bool Transaction::SingleClaimBarrier::TryClaim() {
return !claimed_.exchange(true, memory_order_relaxed); // false means first means success
}

void Transaction::SingleClaimBarrier::Release() {
DCHECK(claimed_.load(memory_order_relaxed));
released_.store(true, memory_order_relaxed);
ec_.notify(); // release
}

cv_status Transaction::SingleClaimBarrier::Wait(time_point tp) {
auto cb = [this] { return released_.load(memory_order_acquire); };

if (tp != time_point::max()) {
cv_status status = ec_.await_until(cb, tp);

if (status == cv_status::no_timeout) // We finished without a timeout due to a release
return cv_status::no_timeout;

if (!TryClaim()) // If we can't claim the barrier after a timeout, someone is modifying us
return Wait(time_point::max()); // wait for the modification to finish

Release(); // Purely a formal release
return cv_status::timeout;
}

ec_.await(cb);
return cv_status::no_timeout;
}

/**
* @brief Construct a new Transaction:: Transaction object
*
Expand Down Expand Up @@ -1211,13 +1245,6 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p

Execute(std::move(cb), true);

coordinator_state_ |= COORD_BLOCKED;

auto wake_cb = [this] {
return (coordinator_state_ & COORD_CANCELLED) ||
wakeup_requested_.load(memory_order_relaxed) > 0;
};

auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients;

Expand All @@ -1228,12 +1255,7 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId();
}

cv_status status = cv_status::no_timeout;
if (tp == time_point::max()) {
blocking_ec_.await(std::move(wake_cb));
} else {
status = blocking_ec_.await_until(std::move(wake_cb), tp);
}
cv_status status = blocking_barrier_.Wait(tp);

DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();

Expand All @@ -1249,7 +1271,6 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
if (result != OpStatus::OK)
ExpireBlocking(wkeys_provider);

coordinator_state_ &= ~COORD_BLOCKED;
return result;
}

Expand Down Expand Up @@ -1277,7 +1298,6 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {

unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
sd.local_mask &= ~KEYLOCK_ACQUIRED;

shard->blocking_controller()->FinalizeWatched(wkeys, this);
Expand Down Expand Up @@ -1357,40 +1377,31 @@ bool Transaction::IsGlobal() const {
// Returns true if the transacton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view key) {
unsigned idx = SidToId(sid);
auto& sd = shard_data_[idx];
unsigned local_mask = sd.local_mask;

if (local_mask & Transaction::EXPIRED_Q) {
return false;
}

// Wake a transaction only once on the first notify.
// We don't care about preserving the strict order with multiple operations running on blocking
// keys in parallel, because the internal order is not observable from outside either way.
if (wakeup_requested_.fetch_add(1, memory_order_relaxed) > 0)
if (!blocking_barrier_.TryClaim())
return false;

DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id "
<< committed_txid;
auto& sd = shard_data_[SidToId(sid)];

// local_mask could be awaked (i.e. not suspended) if the transaction has been
// awakened by another key or awakened by the same key multiple times.
if (local_mask & SUSPENDED_Q) {
DCHECK_EQ(0u, local_mask & AWAKED_Q);
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << sd.local_mask
<< " by commited_id " << committed_txid;

sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
// We're the first and only to wake this transaction, expect the shard to be suspended
CHECK(sd.local_mask & SUSPENDED_Q);
CHECK_EQ(sd.local_mask & AWAKED_Q, 0);

// Find index of awakened key.
auto args = GetShardArgs(sid);
auto it =
find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
DCHECK(it != args.end());
sd.wake_key_pos = it - args.begin();
}
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;

blocking_ec_.notify();
// Find index of awakened key.
auto args = GetShardArgs(sid);
auto it = find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
DCHECK(it != args.end());
sd.wake_key_pos = it - args.begin();

blocking_barrier_.Release();
return true;
}

Expand Down Expand Up @@ -1470,7 +1481,10 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
}

void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
if ((coordinator_state_ & COORD_BLOCKED) == 0)
// We're on the owning thread of this transaction, so we can safely access it's data below.
// We still need to claim the blocking barrier, but as this function is often called blindly, we
// want to check first if it makes sense to even proceed.
if (blocking_barrier_.IsClaimed())
return;

OpStatus status = OpStatus::CANCELLED;
Expand All @@ -1486,9 +1500,13 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
if (status == OpStatus::OK)
return;

// Check if someone else is about to wake us up
if (!blocking_barrier_.TryClaim())
return;

coordinator_state_ |= COORD_CANCELLED;
local_result_ = status;
blocking_ec_.notify();
blocking_barrier_.Release();
}

OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
Expand Down
29 changes: 23 additions & 6 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ class Transaction {
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped
UNLOCK_MULTI = 1 << 7, // Whether this shard executed UnlockMultiShardCb
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
};

public:
Expand Down Expand Up @@ -412,8 +411,7 @@ class Transaction {
enum CoordinatorState : uint8_t {
COORD_SCHED = 1,
COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction
COORD_BLOCKED = 1 << 2,
COORD_CANCELLED = 1 << 3,
COORD_CANCELLED = 1 << 2,
};

// Auxiliary structure used during initialization
Expand Down Expand Up @@ -443,6 +441,25 @@ class Transaction {
EventCount ec_{};
};

// "Single claim - single modification" barrier. Multiple threads might try to claim it, only one
// will succeed and will be allowed to modify the barrier until it releases it.
class SingleClaimBarrier {
public:
bool IsClaimed() const; // Return if barrier was claimed, only for peeking
bool TryClaim(); // Return if the barrier was claimed successfully
void Release(); // Release barrier after it was claimed

// Wait for barrier until time_point, indefinitely if time_point::max() was passed.
// Expiration plays by the same rules as all other threads, it will try to claim the barrier or
// wait for an ongoing modification to release.
std::cv_status Wait(time_point);

private:
std::atomic_bool claimed_{false};
std::atomic_bool released_{false};
EventCount ec_{};
};

private:
// Init basic fields and reset re-usable.
void InitBase(DbIndex dbid, CmdArgList args);
Expand Down Expand Up @@ -592,8 +609,8 @@ class Transaction {
ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1
UniqueSlotChecker unique_slot_checker_;

std::atomic_uint32_t wakeup_requested_{0}; // incremented when blocking transaction gets notified
EventCount blocking_ec_; // to wait for wakeup_requested > 0 (or cancelled)
// Barrier for waking blocking transactions that ensures exclusivity of waking operation.
SingleClaimBarrier blocking_barrier_{};

// Transaction coordinator state, written and read by coordinator thread.
uint8_t coordinator_state_ = 0;
Expand Down

0 comments on commit 396ca3d

Please sign in to comment.