Skip to content

Commit

Permalink
[#24958] DocDB: Fix pg_locks test TestWaitStartTimeIsConsistentAcross…
Browse files Browse the repository at this point in the history
…WaiterReEntries

Summary:
Test `PgGetLockStatusTest.TestWaitStartTimeIsConsistentAcrossWaiterReEntries` relied on sleeps in the test to assert that wait start time of waiter requests remains the same upon re-entries of the request to the wait-queue. The same rpc request could re-enter the wait-queue in the following ways:
1. Waiter is resumed, re-does conflict resolution, finds new blocker(s), enters the queue again
2. Waiter request times out in the wait-queue (30s default), callback is invoked with Timeout status, the host tserver retries the rpc if it is well within the deadline, and the request re-enters the wait-queue.

The revision re-constructs the test by introducing a sync point for better synchronization. Additionally, it has one other change - previously, the wait start time during conflict resolution was being set prior to the request entering the wait-queue. The diff modifies this by setting the wait start time post the request enters the wait-queue (this is consistent with the logic that we have in `TryPreWait`). Note that the wait start time field is only used for incrementing the metric `tablet::TabletEventStats::kTotalWaitQueueTime`.
Jira: DB-14094

Test Plan:
Jenkins

./yb_build.sh fastdebug --cxx-test='TEST_F(PgGetLockStatusTest, TestWaitStartTimeIsConsistentAcrossWaiterReEntries) {'  -n 50 --tp 1

Reviewers: pjain, patnaik.balivada

Reviewed By: patnaik.balivada

Subscribers: yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D40062
  • Loading branch information
basavaraj29 committed Nov 20, 2024
1 parent b426a72 commit 5cd3998
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
8 changes: 5 additions & 3 deletions src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ class WaitOnConflictResolver : public ConflictResolver {
if (!wait_start_time_.Initialized()) {
wait_start_time_ = MonoTime::Now();
}
DEBUG_ONLY_TEST_SYNC_POINT("ConflictResolver::MaybeSetWaitStartTime");
}

void TryPreWait() {
Expand All @@ -705,13 +706,14 @@ class WaitOnConflictResolver : public ConflictResolver {
VTRACE(3, "Waiting on $0 transactions after $1 tries.",
conflict_data_->NumActiveTransactions(), wait_for_iters_);

MaybeSetWaitStartTime();
return wait_queue_->WaitOn(
RETURN_NOT_OK(wait_queue_->WaitOn(
context_->transaction_id(), context_->subtransaction_id(), lock_batch_,
ConsumeTransactionDataAndReset(), status_tablet_id_, serial_no_,
context_->GetTxnStartUs(), request_start_us_, request_id_, deadline_,
std::bind(&WaitOnConflictResolver::GetLockStatusInfo, shared_from(this)),
std::bind(&WaitOnConflictResolver::WaitingDone, shared_from(this), _1, _2));
std::bind(&WaitOnConflictResolver::WaitingDone, shared_from(this), _1, _2)));
MaybeSetWaitStartTime();
return Status::OK();
}

// Note: we must pass in shared_this to keep the WaitOnConflictResolver alive until the wait queue
Expand Down
27 changes: 22 additions & 5 deletions src/yb/yql/pgwrapper/pg_get_lock_status-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,10 @@ TEST_F(PgGetLockStatusTest, TestPgLocksWhileDMLsInProgress) {
ASSERT_OK(status_future.get());
}

#ifndef NDEBUG
TEST_F(PgGetLockStatusTest, TestWaitStartTimeIsConsistentAcrossWaiterReEntries) {
constexpr int kMinTxnAgeMs = 1;
constexpr auto waiter_refresh_secs = 2;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = waiter_refresh_secs * 1000;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_refresh_waiter_timeout_ms) = 5 * 1000 * kTimeMultiplier;

auto conn = ASSERT_RESULT(Connect());
ASSERT_OK(conn.ExecuteFormat("SET yb_locks_min_txn_age='$0ms'", kMinTxnAgeMs));
Expand All @@ -791,6 +791,15 @@ TEST_F(PgGetLockStatusTest, TestWaitStartTimeIsConsistentAcrossWaiterReEntries)
auto init_stamp = ASSERT_RESULT(conn.FetchRow<MonoDelta>(
"SELECT DISTINCT(waitend) FROM pg_locks WHERE granted"));

std::atomic<uint> total_num_waiting_requests{0};
yb::SyncPoint::GetInstance()->SetCallBack(
"ConflictResolver::MaybeSetWaitStartTime",
[&](void* arg) {
total_num_waiting_requests++;
});
yb::SyncPoint::GetInstance()->ClearTrace();
yb::SyncPoint::GetInstance()->EnableProcessing();

auto status_future_write_req = std::async(std::launch::async, [&]() -> Status {
auto conn = VERIFY_RESULT(Connect());
RETURN_NOT_OK(conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION));
Expand All @@ -806,8 +815,12 @@ TEST_F(PgGetLockStatusTest, TestWaitStartTimeIsConsistentAcrossWaiterReEntries)
return Status::OK();
});


SleepFor(2ms * FLAGS_heartbeat_interval_ms * kTimeMultiplier);
// The above two requests (read & write) should have been blocked by session_1 and should have
// entered the wait-queue, resulting in total_num_waiting_requests >= 2.
ASSERT_OK(WaitFor(
[&] { return total_num_waiting_requests >= 2; }, 10s * kTimeMultiplier,
"Long wait for waiting requests to enter the wait-queue"));
auto now_stamp = ASSERT_RESULT(conn.FetchRow<MonoDelta>("SELECT NOW()"));

auto wait_start_time_1_query =
Expand All @@ -823,8 +836,11 @@ TEST_F(PgGetLockStatusTest, TestWaitStartTimeIsConsistentAcrossWaiterReEntries)
ASSERT_RESULT(conn.FetchRow<MonoDelta>(wait_start_time_2_query));
ASSERT_LE(init_stamp, wait_start_time_2);
ASSERT_GE(now_stamp, wait_start_time_2);

SleepFor(2 * waiter_refresh_secs * 1s * kTimeMultiplier);
// Wait for the 2 waiter requests in the wait-queue to timeout, and re-enter the queue, resulting
// in total_num_waiting_requests >= 4.
ASSERT_OK(WaitFor(
[&] { return total_num_waiting_requests >= 4; }, 10s * kTimeMultiplier,
"Long wait for waiting requests to re-enter the wait-queue"));
ASSERT_EQ(wait_start_time_1,
ASSERT_RESULT(conn.FetchRow<MonoDelta>(wait_start_time_1_query)));
ASSERT_EQ(wait_start_time_2,
Expand All @@ -833,6 +849,7 @@ TEST_F(PgGetLockStatusTest, TestWaitStartTimeIsConsistentAcrossWaiterReEntries)
ASSERT_OK(status_future_write_req.get());
ASSERT_OK(status_future_read_req.get());
}
#endif // NDEBUG

TEST_F(PgGetLockStatusTest, TestLockStatusRespHasHostNodeSet) {
constexpr int kMinTxnAgeMs = 1;
Expand Down

0 comments on commit 5cd3998

Please sign in to comment.