From f8a8e76b2d18b7c2ef88fcfe9f87c34318c0fdb2 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 1 Feb 2024 11:14:06 +0300 Subject: [PATCH] fix: fixes and comments Signed-off-by: Vladislav Oleshko --- src/server/transaction.cc | 78 +++++++++++++++++++-------------------- src/server/transaction.h | 15 ++++---- 2 files changed, 47 insertions(+), 46 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 6f9d97b73dde..c707820e7f4b 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -82,6 +82,14 @@ void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inl ss->stats.tx_width_freq_arr[0]++; } +std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) { + using namespace chrono; + if (tp == Transaction::time_point::max()) + return os << "inf"; + size_t ms = duration_cast(tp - Transaction::time_point::clock::now()).count(); + return os << ms << "ms"; +} + } // namespace IntentLock::Mode Transaction::LockMode() const { @@ -124,26 +132,27 @@ bool Transaction::SingleClaimBarrier::TryClaim() { return !claimed_.exchange(true, memory_order_relaxed); // false means first means success } -void Transaction::SingleClaimBarrier::Release() { +void Transaction::SingleClaimBarrier::Close() { DCHECK(claimed_.load(memory_order_relaxed)); - released_.store(true, memory_order_relaxed); + closed_.store(true, memory_order_relaxed); ec_.notify(); // release } cv_status Transaction::SingleClaimBarrier::Wait(time_point tp) { - auto cb = [this] { return released_.load(memory_order_acquire); }; + auto cb = [this] { return closed_.load(memory_order_acquire); }; if (tp != time_point::max()) { - cv_status status = ec_.await_until(cb, tp); - - if (status == cv_status::no_timeout) // We finished without a timeout due to a release + // Wait until timepoint and return immediately if we finished without a timeout + if (ec_.await_until(cb, tp) == cv_status::no_timeout) return cv_status::no_timeout; - if (!TryClaim()) // If we can't claim the barrier after a timeout, someone is modifying us - return Wait(time_point::max()); // wait for the modification to finish + // We timed out and claimed the barrier, so no one will be able to claim it anymore + if (TryClaim()) { + closed_.store(true, memory_order_relaxed); // Purely formal + return cv_status::timeout; + } - Release(); // Purely a formal release - return cv_status::timeout; + // fallthrough: otherwise a modification is in progress, wait for it below } ec_.await(cb); @@ -1235,30 +1244,24 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider, KeyReadyChecker krc) { - DVLOG(2) << "WaitOnWatch " << DebugId(); - using namespace chrono; + DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used + // Register keys on active shards blocking controllers and mark shard state as suspended. auto cb = [&](Transaction* t, EngineShard* shard) { auto keys = wkeys_provider(t, shard); return t->WatchInShard(keys, shard, krc); }; - Execute(std::move(cb), true); auto* stats = ServerState::tl_connection_stats(); ++stats->num_blocked_clients; + DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId(); - if (DCHECK_IS_ON()) { - int64_t ms = -1; - if (tp != time_point::max()) - ms = duration_cast(tp - time_point::clock::now()).count(); - DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId(); - } - + // Wait for the blocking barrier to be closed. + // Note: It might return immediately if another thread already notified us. cv_status status = blocking_barrier_.Wait(tp); DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); - --stats->num_blocked_clients; OpStatus result = OpStatus::OK; @@ -1268,36 +1271,32 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p result = local_result_; } + // If we don't follow up with an "action" hop, we must clean up manually on all shards. if (result != OpStatus::OK) ExpireBlocking(wkeys_provider); return result; } -// Runs only in the shard thread. OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc) { - ShardId idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[SidToId(shard->shard_id())]; - auto& sd = shard_data_[idx]; CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); - - auto* bc = shard->EnsureBlockingController(); - bc->AddWatched(keys, std::move(krc), this); - sd.local_mask |= SUSPENDED_Q; sd.local_mask &= ~OUT_OF_ORDER; - DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask - << ", first_key:" << keys.front(); + + shard->EnsureBlockingController()->AddWatched(keys, std::move(krc), this); + DVLOG(2) << "WatchInShard " << DebugId() << ", first_key:" << keys.front(); return OpStatus::OK; } void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) { + // Blocking transactions don't release keys when suspending, release them now. auto lock_args = GetLockArgs(shard->shard_id()); shard->db_slice().Release(LockMode(), lock_args); - unsigned sd_idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[sd_idx]; + auto& sd = shard_data_[SidToId(shard->shard_id())]; sd.local_mask &= ~KEYLOCK_ACQUIRED; shard->blocking_controller()->FinalizeWatched(wkeys, this); @@ -1392,16 +1391,17 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view CHECK(sd.local_mask & SUSPENDED_Q); CHECK_EQ(sd.local_mask & AWAKED_Q, 0); - sd.local_mask &= ~SUSPENDED_Q; - sd.local_mask |= AWAKED_Q; - - // Find index of awakened key. + // Find index of awakened key auto args = GetShardArgs(sid); auto it = find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; }); - DCHECK(it != args.end()); + CHECK(it != args.end()); + + // Change state to awaked and store index of awakened key + sd.local_mask &= ~SUSPENDED_Q; + sd.local_mask |= AWAKED_Q; sd.wake_key_pos = it - args.begin(); - blocking_barrier_.Release(); + blocking_barrier_.Close(); return true; } @@ -1506,7 +1506,7 @@ void Transaction::CancelBlocking(std::function status_cb) { coordinator_state_ |= COORD_CANCELLED; local_result_ = status; - blocking_barrier_.Release(); + blocking_barrier_.Close(); } OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 3f852c9ce913..4300c6b57587 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -442,21 +442,21 @@ class Transaction { }; // "Single claim - single modification" barrier. Multiple threads might try to claim it, only one - // will succeed and will be allowed to modify the barrier until it releases it. + // will succeed and will be allowed to modify the guarded object until it closes the barrier. + // A closed barrier can't be claimed again or re-used in any way. class SingleClaimBarrier { public: - bool IsClaimed() const; // Return if barrier was claimed, only for peeking + bool IsClaimed() const; // Return if barrier is claimed, only for peeking bool TryClaim(); // Return if the barrier was claimed successfully - void Release(); // Release barrier after it was claimed + void Close(); // Close barrier after it was claimed - // Wait for barrier until time_point, indefinitely if time_point::max() was passed. - // Expiration plays by the same rules as all other threads, it will try to claim the barrier or - // wait for an ongoing modification to release. + // Wait for barrier until time_point, or indefinitely if time_point::max() was passed. + // After Wait returns, the barrier is guaranteed to be closed, including expiration. std::cv_status Wait(time_point); private: std::atomic_bool claimed_{false}; - std::atomic_bool released_{false}; + std::atomic_bool closed_{false}; EventCount ec_{}; }; @@ -501,6 +501,7 @@ class Transaction { // Optimized version of RunInShard for single shard uncontended cases. RunnableResult RunQuickie(EngineShard* shard); + // Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier void ExecuteAsync(); // Adds itself to watched queue in the shard. Must run in that shard thread.