Skip to content

Commit

Permalink
[#22597] YSQL: Add Save/Restore state functionality to ConsistentRead…
Browse files Browse the repository at this point in the history
…Point

Summary:
In context of the https://phorge.dev.yugabyte.com/D32313 it is required to preserve and restore the state of the `ConsistentReadPoint`.
The `ConsistentReadPoint` objects are no copyable due to mutex. To preserve current state of the object new class `Momento` is introduced. This class contains all the fields, which is required to represent the sate of `ConsistentReadPoint`. All the fields in this class are private to avoid breaking of `ConsistentReadPoint` invariants in situation like this

```
auto momento = rp.GetMomento();
momento.read_time = ...;
rp.SetMomento(std::move(momento));
```

**Note:** The diff contains some formatting and include cleanup changes.
Jira: DB-11503

Test Plan: Jenkins

Reviewers: sergei, pjain

Reviewed By: sergei, pjain

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D35372
  • Loading branch information
d-uspenskiy committed Jun 11, 2024
1 parent 8dc7196 commit 0dcc22b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 21 deletions.
34 changes: 24 additions & 10 deletions src/yb/common/consistent_read_point.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
//
#include "yb/common/consistent_read_point.h"

#include <mutex>
#include <type_traits>
#include <utility>

#include "yb/common/common.pb.h"

namespace yb {
Expand All @@ -33,11 +37,10 @@ void ConsistentReadPoint::SetReadTimeUnlocked(
restarts_.clear();
}

void ConsistentReadPoint::SetCurrentReadTimeUnlocked(const ClampUncertaintyWindow clamp) {
void ConsistentReadPoint::SetCurrentReadTimeUnlocked(ClampUncertaintyWindow clamp) {
SetReadTimeUnlocked(
clamp
? ReadHybridTime::SingleTime(clock_->Now())
: ReadHybridTime::FromHybridTimeRange(clock_->NowRange()));
clamp ? ReadHybridTime::SingleTime(clock_->Now())
: ReadHybridTime::FromHybridTimeRange(clock_->NowRange()));
}

void ConsistentReadPoint::SetReadTime(
Expand All @@ -46,7 +49,7 @@ void ConsistentReadPoint::SetReadTime(
SetReadTimeUnlocked(read_time, &local_limits);
}

void ConsistentReadPoint::SetCurrentReadTime(const ClampUncertaintyWindow clamp) {
void ConsistentReadPoint::SetCurrentReadTime(ClampUncertaintyWindow clamp) {
std::lock_guard lock(mutex_);
SetCurrentReadTimeUnlocked(clamp);
}
Expand All @@ -72,8 +75,6 @@ ReadHybridTime ConsistentReadPoint::GetReadTime(const TabletId& tablet) const {
const auto it = local_limits_.find(tablet);
if (it != local_limits_.end()) {
read_time.local_limit = it->second;
} else {
read_time.local_limit = read_time.local_limit;
}
}
return read_time;
Expand Down Expand Up @@ -145,7 +146,7 @@ void ConsistentReadPoint::PrepareChildTransactionData(ChildTransactionDataPB* da
read_time_.AddToPB(data);
auto& local_limits = *data->mutable_local_limits();
for (const auto& entry : local_limits_) {
typedef std::remove_reference<decltype(*local_limits.begin())>::type PairType;
using PairType = std::remove_reference_t<decltype(*local_limits.begin())>;
local_limits.insert(PairType(entry.first, entry.second.ToUint64()));
}
}
Expand All @@ -157,7 +158,7 @@ void ConsistentReadPoint::FinishChildTransactionResult(
result->set_restart_read_ht(restart_read_ht_.ToUint64());
auto& restarts = *result->mutable_read_restarts();
for (const auto& restart : restarts_) {
typedef std::remove_reference<decltype(*restarts.begin())>::type PairType;
using PairType = std::remove_reference_t<decltype(*restarts.begin())>;
restarts.insert(PairType(restart.first, restart.second.ToUint64()));
}
} else {
Expand Down Expand Up @@ -201,7 +202,7 @@ ReadHybridTime ConsistentReadPoint::GetReadTime() const {
return read_time_;
}

// NO_THREAD_SAFETY_ANALYSIS is required here because anylysis does not understand std::lock.
// NO_THREAD_SAFETY_ANALYSIS is required here because analysis does not understand std::lock.
void ConsistentReadPoint::MoveFrom(ConsistentReadPoint* rhs) NO_THREAD_SAFETY_ANALYSIS {
std::lock(mutex_, rhs->mutex_);
std::lock_guard lock1(mutex_, std::adopt_lock);
Expand All @@ -212,4 +213,17 @@ void ConsistentReadPoint::MoveFrom(ConsistentReadPoint* rhs) NO_THREAD_SAFETY_AN
restarts_ = std::move(rhs->restarts_);
}

ConsistentReadPoint::Momento ConsistentReadPoint::GetMomento() const {
std::lock_guard lock(mutex_);
return {read_time_, restart_read_ht_, local_limits_, restarts_};
}

void ConsistentReadPoint::SetMomento(ConsistentReadPoint::Momento&& momento) {
std::lock_guard lock(mutex_);
read_time_ = std::move(momento.read_time_);
restart_read_ht_ = std::move(momento.restart_read_ht_);
local_limits_ = std::move(momento.local_limits_);
restarts_ = std::move(momento.restarts_);
}

} // namespace yb
45 changes: 34 additions & 11 deletions src/yb/common/consistent_read_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@

#pragma once

#include <mutex>
#include <set>
#include <unordered_map>
#include <utility>

#include "yb/common/common_fwd.h"
#include "yb/common/entity_ids_types.h"
#include "yb/common/read_hybrid_time.h"

#include "yb/gutil/macros.h"
#include "yb/gutil/ref_counted.h"
#include "yb/gutil/thread_annotations.h"

#include "yb/util/locks.h"
#include "yb/util/strongly_typed_bool.h"

namespace yb {

Expand All @@ -34,7 +34,7 @@ YB_STRONGLY_TYPED_BOOL(HadReadTime);
class ConsistentReadPoint {
public:
// A map of tablet id to local limits.
typedef std::unordered_map<TabletId, HybridTime> HybridTimeMap;
using HybridTimeMap = std::unordered_map<TabletId, HybridTime>;

explicit ConsistentReadPoint(const scoped_refptr<ClockBase>& clock);

Expand All @@ -43,7 +43,7 @@ class ConsistentReadPoint {
// Set the current time as the read point.
// No uncertainty window when clamp is set.
void SetCurrentReadTime(
const ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) EXCLUDES(mutex_);
ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) EXCLUDES(mutex_);

// If read point is not set, use the current time as the read point and defer it to the global
// limit. If read point was already set, return error if it is not deferred.
Expand All @@ -52,18 +52,18 @@ class ConsistentReadPoint {
// Set the read point to the specified read time with local limits.
void SetReadTime(const ReadHybridTime& read_time, HybridTimeMap&& local_limits) EXCLUDES(mutex_);

ReadHybridTime GetReadTime() const;
[[nodiscard]] ReadHybridTime GetReadTime() const EXCLUDES(mutex_);

// Get the read time of this read point for a tablet.
ReadHybridTime GetReadTime(const TabletId& tablet) const EXCLUDES(mutex_);
[[nodiscard]] ReadHybridTime GetReadTime(const TabletId& tablet) const EXCLUDES(mutex_);

// Notify that a tablet requires restart. This method is thread-safe.
void RestartRequired(const TabletId& tablet, const ReadHybridTime& restart_time) EXCLUDES(mutex_);

void UpdateLocalLimit(const TabletId& tablet, HybridTime local_limit) EXCLUDES(mutex_);

// Does the current read require restart?
bool IsRestartRequired() const EXCLUDES(mutex_);
[[nodiscard]] bool IsRestartRequired() const EXCLUDES(mutex_);

// Restart read.
void Restart() EXCLUDES(mutex_);
Expand All @@ -75,7 +75,7 @@ class ConsistentReadPoint {
void UpdateClock(HybridTime propagated_hybrid_time) EXCLUDES(mutex_);

// Return the current time to propagate.
HybridTime Now() const EXCLUDES(mutex_);
[[nodiscard]] HybridTime Now() const EXCLUDES(mutex_);

// Prepare the read time and local limits in a child transaction.
void PrepareChildTransactionData(ChildTransactionDataPB* data) const EXCLUDES(mutex_);
Expand All @@ -90,16 +90,37 @@ class ConsistentReadPoint {
// Sets in transaction limit.
void SetInTxnLimit(HybridTime value) EXCLUDES(mutex_);

class Momento {
Momento(const ReadHybridTime& read_time,
HybridTime restart_read_ht,
const HybridTimeMap& local_limits,
const HybridTimeMap& restarts) :
read_time_(read_time),
restart_read_ht_(restart_read_ht),
local_limits_(local_limits),
restarts_(restarts) {}

ReadHybridTime read_time_;
HybridTime restart_read_ht_;
HybridTimeMap local_limits_;
HybridTimeMap restarts_;

friend class ConsistentReadPoint;
};

[[nodiscard]] Momento GetMomento() const EXCLUDES(mutex_);
void SetMomento(ConsistentReadPoint::Momento&& momento) EXCLUDES(mutex_);

private:
inline void SetReadTimeUnlocked(
const ReadHybridTime& read_time, HybridTimeMap* local_limits = nullptr) REQUIRES(mutex_);
void SetCurrentReadTimeUnlocked(
const ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) REQUIRES(mutex_);
ClampUncertaintyWindow clamp = ClampUncertaintyWindow::kFalse) REQUIRES(mutex_);
void UpdateLimitsMapUnlocked(
const TabletId& tablet, const HybridTime& local_limit, HybridTimeMap* map) REQUIRES(mutex_);
void RestartRequiredUnlocked(const TabletId& tablet, const ReadHybridTime& restart_time)
REQUIRES(mutex_);
bool IsRestartRequiredUnlocked() const REQUIRES(mutex_);
[[nodiscard]] bool IsRestartRequiredUnlocked() const REQUIRES(mutex_);

const scoped_refptr<ClockBase> clock_;

Expand All @@ -115,6 +136,8 @@ class ConsistentReadPoint {
// Restarts that happen during a consistent read. Used to initialise local_limits for restarted
// read.
HybridTimeMap restarts_ GUARDED_BY(mutex_);

DISALLOW_COPY_AND_ASSIGN(ConsistentReadPoint);
};

} // namespace yb

0 comments on commit 0dcc22b

Please sign in to comment.