Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(transaction): Add special barrier for blocking tx #2512

Merged
merged 3 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 82 additions & 64 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inl
ss->stats.tx_width_freq_arr[0]++;
}

std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) {
using namespace chrono;
if (tp == Transaction::time_point::max())
return os << "inf";
size_t ms = duration_cast<milliseconds>(tp - Transaction::time_point::clock::now()).count();
return os << ms << "ms";
}

} // namespace

IntentLock::Mode Transaction::LockMode() const {
Expand Down Expand Up @@ -116,6 +124,41 @@ uint32_t Transaction::PhasedBarrier::DEBUG_Count() const {
return count_.load(memory_order_relaxed);
}

bool Transaction::BatonBarrierrier::IsClaimed() const {
return claimed_.load(memory_order_relaxed);
}

bool Transaction::BatonBarrierrier::TryClaim() {
return !claimed_.exchange(true, memory_order_relaxed); // false means first means success
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why relaxed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to understand why relaxed is enough here. Do not see any obvious reasons why not and it makes me nervous. Usually, it should not be like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It was relaxed to begin with 😆
  2. Currently code touches only shard local state or expire/cancel are placed on the coordinator thread
  3. If we acquire, what is our corresponding release pair? We had an acuiqre/release for the hop that prepared suspension, since then we did no writes to our local state, so there is noting to sync
  4. I can make it acquire because though I assume it might have real impact on the conclusion writes

Will think about it

}

void Transaction::BatonBarrierrier::Close() {
DCHECK(claimed_.load(memory_order_relaxed));
closed_.store(true, memory_order_relaxed);
ec_.notify(); // release
}

cv_status Transaction::BatonBarrierrier::Wait(time_point tp) {
auto cb = [this] { return closed_.load(memory_order_acquire); };

if (tp != time_point::max()) {
// Wait until timepoint and return immediately if we finished without a timeout
if (ec_.await_until(cb, tp) == cv_status::no_timeout)
return cv_status::no_timeout;

// We timed out and claimed the barrier, so no one will be able to claim it anymore
if (TryClaim()) {
closed_.store(true, memory_order_relaxed); // Purely formal
return cv_status::timeout;
}

// fallthrough: otherwise a modification is in progress, wait for it below
}

ec_.await(cb);
return cv_status::no_timeout;
}

/**
* @brief Construct a new Transaction:: Transaction object
*
Expand Down Expand Up @@ -1201,42 +1244,24 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {

OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider,
KeyReadyChecker krc) {
DVLOG(2) << "WaitOnWatch " << DebugId();
using namespace chrono;
DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used

// Register keys on active shards blocking controllers and mark shard state as suspended.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto keys = wkeys_provider(t, shard);
return t->WatchInShard(keys, shard, krc);
};

Execute(std::move(cb), true);

coordinator_state_ |= COORD_BLOCKED;

auto wake_cb = [this] {
return (coordinator_state_ & COORD_CANCELLED) ||
wakeup_requested_.load(memory_order_relaxed) > 0;
};

auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients;
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();

if (DCHECK_IS_ON()) {
int64_t ms = -1;
if (tp != time_point::max())
ms = duration_cast<milliseconds>(tp - time_point::clock::now()).count();
DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId();
}

cv_status status = cv_status::no_timeout;
if (tp == time_point::max()) {
blocking_ec_.await(std::move(wake_cb));
} else {
status = blocking_ec_.await_until(std::move(wake_cb), tp);
}
// Wait for the blocking barrier to be closed.
// Note: It might return immediately if another thread already notified us.
cv_status status = blocking_barrier_.Wait(tp);

DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();

--stats->num_blocked_clients;

OpStatus result = OpStatus::OK;
Expand All @@ -1246,38 +1271,32 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
result = local_result_;
}

// If we don't follow up with an "action" hop, we must clean up manually on all shards.
if (result != OpStatus::OK)
ExpireBlocking(wkeys_provider);

coordinator_state_ &= ~COORD_BLOCKED;
return result;
}

// Runs only in the shard thread.
OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[SidToId(shard->shard_id())];

auto& sd = shard_data_[idx];
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);

auto* bc = shard->EnsureBlockingController();
bc->AddWatched(keys, std::move(krc), this);

sd.local_mask |= SUSPENDED_Q;
sd.local_mask &= ~OUT_OF_ORDER;
DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask
<< ", first_key:" << keys.front();

shard->EnsureBlockingController()->AddWatched(keys, std::move(krc), this);
DVLOG(2) << "WatchInShard " << DebugId() << ", first_key:" << keys.front();

return OpStatus::OK;
}

void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
// Blocking transactions don't release keys when suspending, release them now.
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(LockMode(), lock_args);

unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];
sd.local_mask |= EXPIRED_Q;
auto& sd = shard_data_[SidToId(shard->shard_id())];
sd.local_mask &= ~KEYLOCK_ACQUIRED;

shard->blocking_controller()->FinalizeWatched(wkeys, this);
Expand Down Expand Up @@ -1357,40 +1376,32 @@ bool Transaction::IsGlobal() const {
// Returns true if the transacton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view key) {
unsigned idx = SidToId(sid);
auto& sd = shard_data_[idx];
unsigned local_mask = sd.local_mask;

if (local_mask & Transaction::EXPIRED_Q) {
return false;
}

// Wake a transaction only once on the first notify.
// We don't care about preserving the strict order with multiple operations running on blocking
// keys in parallel, because the internal order is not observable from outside either way.
if (wakeup_requested_.fetch_add(1, memory_order_relaxed) > 0)
if (!blocking_barrier_.TryClaim())
return false;

DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id "
<< committed_txid;
auto& sd = shard_data_[SidToId(sid)];

// local_mask could be awaked (i.e. not suspended) if the transaction has been
// awakened by another key or awakened by the same key multiple times.
if (local_mask & SUSPENDED_Q) {
DCHECK_EQ(0u, local_mask & AWAKED_Q);
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << sd.local_mask
<< " by commited_id " << committed_txid;

sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
// We're the first and only to wake this transaction, expect the shard to be suspended
CHECK(sd.local_mask & SUSPENDED_Q);
CHECK_EQ(sd.local_mask & AWAKED_Q, 0);

// Find index of awakened key.
auto args = GetShardArgs(sid);
auto it =
find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
DCHECK(it != args.end());
sd.wake_key_pos = it - args.begin();
}
// Find index of awakened key
auto args = GetShardArgs(sid);
auto it = find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
CHECK(it != args.end());

// Change state to awaked and store index of awakened key
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
sd.wake_key_pos = it - args.begin();

blocking_ec_.notify();
blocking_barrier_.Close();
return true;
}

Expand Down Expand Up @@ -1470,7 +1481,10 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
}

void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
if ((coordinator_state_ & COORD_BLOCKED) == 0)
// We're on the owning thread of this transaction, so we can safely access it's data below.
// We still need to claim the blocking barrier, but as this function is often called blindly, we
// want to check first if it makes sense to even proceed.
if (blocking_barrier_.IsClaimed())
return;

OpStatus status = OpStatus::CANCELLED;
Expand All @@ -1486,9 +1500,13 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
if (status == OpStatus::OK)
return;

// Check if someone else is about to wake us up
if (!blocking_barrier_.TryClaim())
return;

coordinator_state_ |= COORD_CANCELLED;
local_result_ = status;
blocking_ec_.notify();
blocking_barrier_.Close();
}

OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
Expand Down
30 changes: 24 additions & 6 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ class Transaction {
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped
UNLOCK_MULTI = 1 << 7, // Whether this shard executed UnlockMultiShardCb
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
};

public:
Expand Down Expand Up @@ -412,8 +411,7 @@ class Transaction {
enum CoordinatorState : uint8_t {
COORD_SCHED = 1,
COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction
COORD_BLOCKED = 1 << 2,
COORD_CANCELLED = 1 << 3,
COORD_CANCELLED = 1 << 2,
};

// Auxiliary structure used during initialization
Expand Down Expand Up @@ -443,6 +441,25 @@ class Transaction {
EventCount ec_{};
};

// "Single claim - single modification" barrier. Multiple threads might try to claim it, only one
// will succeed and will be allowed to modify the guarded object until it closes the barrier.
// A closed barrier can't be claimed again or re-used in any way.
class BatonBarrierrier {
public:
bool IsClaimed() const; // Return if barrier is claimed, only for peeking
bool TryClaim(); // Return if the barrier was claimed successfully
void Close(); // Close barrier after it was claimed

// Wait for barrier until time_point, or indefinitely if time_point::max() was passed.
// After Wait returns, the barrier is guaranteed to be closed, including expiration.
std::cv_status Wait(time_point);

private:
std::atomic_bool claimed_{false};
std::atomic_bool closed_{false};
EventCount ec_{};
};

private:
// Init basic fields and reset re-usable.
void InitBase(DbIndex dbid, CmdArgList args);
Expand Down Expand Up @@ -484,6 +501,7 @@ class Transaction {
// Optimized version of RunInShard for single shard uncontended cases.
RunnableResult RunQuickie(EngineShard* shard);

// Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier
void ExecuteAsync();

// Adds itself to watched queue in the shard. Must run in that shard thread.
Expand Down Expand Up @@ -592,8 +610,8 @@ class Transaction {
ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1
UniqueSlotChecker unique_slot_checker_;

std::atomic_uint32_t wakeup_requested_{0}; // incremented when blocking transaction gets notified
EventCount blocking_ec_; // to wait for wakeup_requested > 0 (or cancelled)
// Barrier for waking blocking transactions that ensures exclusivity of waking operation.
BatonBarrierrier blocking_barrier_{};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something is wrong here. Also maybe "baton_barrier_" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Baton barrier doesn't indicate where its used, it like calling an atomic variable just atomic.

blocking_ec just became blocking_barrier... how else can we call blocking transactions? Suspension_barrier, watch_barrier, notify_barrier?


// Transaction coordinator state, written and read by coordinator thread.
uint8_t coordinator_state_ = 0;
Expand Down
Loading