Skip to content

Commit

Permalink
fix: fixes and comments
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Feb 1, 2024
1 parent 396ca3d commit f8a8e76
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 46 deletions.
78 changes: 39 additions & 39 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inl
ss->stats.tx_width_freq_arr[0]++;
}

std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) {
using namespace chrono;
if (tp == Transaction::time_point::max())
return os << "inf";
size_t ms = duration_cast<milliseconds>(tp - Transaction::time_point::clock::now()).count();
return os << ms << "ms";
}

} // namespace

IntentLock::Mode Transaction::LockMode() const {
Expand Down Expand Up @@ -124,26 +132,27 @@ bool Transaction::SingleClaimBarrier::TryClaim() {
return !claimed_.exchange(true, memory_order_relaxed); // false means first means success
}

void Transaction::SingleClaimBarrier::Release() {
void Transaction::SingleClaimBarrier::Close() {
DCHECK(claimed_.load(memory_order_relaxed));
released_.store(true, memory_order_relaxed);
closed_.store(true, memory_order_relaxed);
ec_.notify(); // release
}

cv_status Transaction::SingleClaimBarrier::Wait(time_point tp) {
auto cb = [this] { return released_.load(memory_order_acquire); };
auto cb = [this] { return closed_.load(memory_order_acquire); };

if (tp != time_point::max()) {
cv_status status = ec_.await_until(cb, tp);

if (status == cv_status::no_timeout) // We finished without a timeout due to a release
// Wait until timepoint and return immediately if we finished without a timeout
if (ec_.await_until(cb, tp) == cv_status::no_timeout)
return cv_status::no_timeout;

if (!TryClaim()) // If we can't claim the barrier after a timeout, someone is modifying us
return Wait(time_point::max()); // wait for the modification to finish
// We timed out and claimed the barrier, so no one will be able to claim it anymore
if (TryClaim()) {
closed_.store(true, memory_order_relaxed); // Purely formal
return cv_status::timeout;
}

Release(); // Purely a formal release
return cv_status::timeout;
// fallthrough: otherwise a modification is in progress, wait for it below
}

ec_.await(cb);
Expand Down Expand Up @@ -1235,30 +1244,24 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {

OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider,
KeyReadyChecker krc) {
DVLOG(2) << "WaitOnWatch " << DebugId();
using namespace chrono;
DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used

// Register keys on active shards blocking controllers and mark shard state as suspended.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto keys = wkeys_provider(t, shard);
return t->WatchInShard(keys, shard, krc);
};

Execute(std::move(cb), true);

auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients;
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();

if (DCHECK_IS_ON()) {
int64_t ms = -1;
if (tp != time_point::max())
ms = duration_cast<milliseconds>(tp - time_point::clock::now()).count();
DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId();
}

// Wait for the blocking barrier to be closed.
// Note: It might return immediately if another thread already notified us.
cv_status status = blocking_barrier_.Wait(tp);

DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();

--stats->num_blocked_clients;

OpStatus result = OpStatus::OK;
Expand All @@ -1268,36 +1271,32 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
result = local_result_;
}

// If we don't follow up with an "action" hop, we must clean up manually on all shards.
if (result != OpStatus::OK)
ExpireBlocking(wkeys_provider);

return result;
}

// Runs only in the shard thread.
OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[SidToId(shard->shard_id())];

auto& sd = shard_data_[idx];
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);

auto* bc = shard->EnsureBlockingController();
bc->AddWatched(keys, std::move(krc), this);

sd.local_mask |= SUSPENDED_Q;
sd.local_mask &= ~OUT_OF_ORDER;
DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask
<< ", first_key:" << keys.front();

shard->EnsureBlockingController()->AddWatched(keys, std::move(krc), this);
DVLOG(2) << "WatchInShard " << DebugId() << ", first_key:" << keys.front();

return OpStatus::OK;
}

void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
// Blocking transactions don't release keys when suspending, release them now.
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(LockMode(), lock_args);

unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
auto& sd = shard_data_[SidToId(shard->shard_id())];
sd.local_mask &= ~KEYLOCK_ACQUIRED;

shard->blocking_controller()->FinalizeWatched(wkeys, this);
Expand Down Expand Up @@ -1392,16 +1391,17 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view
CHECK(sd.local_mask & SUSPENDED_Q);
CHECK_EQ(sd.local_mask & AWAKED_Q, 0);

sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;

// Find index of awakened key.
// Find index of awakened key
auto args = GetShardArgs(sid);
auto it = find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
DCHECK(it != args.end());
CHECK(it != args.end());

// Change state to awaked and store index of awakened key
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
sd.wake_key_pos = it - args.begin();

blocking_barrier_.Release();
blocking_barrier_.Close();
return true;
}

Expand Down Expand Up @@ -1506,7 +1506,7 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {

coordinator_state_ |= COORD_CANCELLED;
local_result_ = status;
blocking_barrier_.Release();
blocking_barrier_.Close();
}

OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
Expand Down
15 changes: 8 additions & 7 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,21 +442,21 @@ class Transaction {
};

// "Single claim - single modification" barrier. Multiple threads might try to claim it, only one
// will succeed and will be allowed to modify the barrier until it releases it.
// will succeed and will be allowed to modify the guarded object until it closes the barrier.
// A closed barrier can't be claimed again or re-used in any way.
class SingleClaimBarrier {
public:
bool IsClaimed() const; // Return if barrier was claimed, only for peeking
bool IsClaimed() const; // Return if barrier is claimed, only for peeking
bool TryClaim(); // Return if the barrier was claimed successfully
void Release(); // Release barrier after it was claimed
void Close(); // Close barrier after it was claimed

// Wait for barrier until time_point, indefinitely if time_point::max() was passed.
// Expiration plays by the same rules as all other threads, it will try to claim the barrier or
// wait for an ongoing modification to release.
// Wait for barrier until time_point, or indefinitely if time_point::max() was passed.
// After Wait returns, the barrier is guaranteed to be closed, including expiration.
std::cv_status Wait(time_point);

private:
std::atomic_bool claimed_{false};
std::atomic_bool released_{false};
std::atomic_bool closed_{false};
EventCount ec_{};
};

Expand Down Expand Up @@ -501,6 +501,7 @@ class Transaction {
// Optimized version of RunInShard for single shard uncontended cases.
RunnableResult RunQuickie(EngineShard* shard);

// Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier
void ExecuteAsync();

// Adds itself to watched queue in the shard. Must run in that shard thread.
Expand Down

0 comments on commit f8a8e76

Please sign in to comment.