Skip to content

Commit

Permalink
Disagg: refresh expiration time of snapshot when calling getSnapshot (p…
Browse files Browse the repository at this point in the history
…ingcap#9570)

close pingcap#9567

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: Lloyd-Pottiger <[email protected]>
Co-authored-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
3 people authored and yibin committed Nov 6, 2024
1 parent 422ad2c commit f579b3e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
8 changes: 4 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1218,8 +1218,8 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
auto snaps = context.getSharedContextDisagg()->wn_snapshot_manager;
const auto & snap_id = *dag_context.getDisaggTaskId();
auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout;
auto expired_at = Clock::now() + std::chrono::seconds(timeout_s);
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at);
bool register_snapshot_ok
= snaps->registerSnapshot(snap_id, disaggregated_snap, std::chrono::seconds(timeout_s));
RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id);
}

Expand Down Expand Up @@ -1275,8 +1275,8 @@ void DAGStorageInterpreter::buildLocalExec(
auto snaps = context.getSharedContextDisagg()->wn_snapshot_manager;
const auto & snap_id = *dag_context.getDisaggTaskId();
auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout;
auto expired_at = Clock::now() + std::chrono::seconds(timeout_s);
bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at);
bool register_snapshot_ok
= snaps->registerSnapshot(snap_id, disaggregated_snap, std::chrono::seconds(timeout_s));
RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ grpc::Status FlashService::FetchDisaggPages(

try
{
auto snap = snaps->getSnapshot(task_id);
auto snap = snaps->getSnapshot(task_id, /*refresh_expiration*/ true);
RUNTIME_CHECK_MSG(snap != nullptr, "Can not find disaggregated task, task_id={}", task_id);
auto task = snap->popSegTask(request->table_id(), request->segment_id());
RUNTIME_CHECK(task.isValid(), task.err_msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ std::vector<VectorIndexViewer::SearchResult> VectorIndexHNSWViewer::searchWithDi
{
const auto rowid = result[i].member.key;
if (valid_rows[rowid])
search_results.emplace_back(rowid, result[i].distance);
search_results.push_back({rowid, result[i].distance});
}
return search_results;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ void WNDisaggSnapshotManager::clearExpiredSnapshots()
snapshots.withExclusive([&](auto & snapshots) {
for (auto iter = snapshots.begin(); iter != snapshots.end(); /*empty*/)
{
if (iter->second.expired_at < now)
if (iter->second->expired_at < now)
{
LOG_INFO(
log,
"Remove expired Disaggregated Snapshot, task_id={} expired_at={:%Y-%m-%d %H:%M:%S}",
iter->first,
iter->second.expired_at);
iter->second->expired_at);
iter = snapshots.erase(iter);
}
else
Expand Down
32 changes: 26 additions & 6 deletions dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <common/types.h>
#include <fmt/chrono.h>


namespace DB::DM::Remote
{
/**
Expand All @@ -39,9 +38,26 @@ class WNDisaggSnapshotManager
struct SnapshotWithExpireTime
{
DisaggReadSnapshotPtr snap;
std::chrono::seconds refresh_duration;
std::mutex mtx;
Timepoint expired_at;

SnapshotWithExpireTime(DisaggReadSnapshotPtr snap_, std::chrono::seconds refresh_duration_)
: snap(std::move(snap_))
, refresh_duration(refresh_duration_)
{
refreshExpiredTime();
}

void refreshExpiredTime()
{
std::lock_guard lock(mtx);
expired_at = Clock::now() + refresh_duration;
}
};

using SnapshotWithExpireTimePtr = std::unique_ptr<SnapshotWithExpireTime>;

public:
explicit WNDisaggSnapshotManager(BackgroundProcessingPool & bg_pool);

Expand All @@ -50,23 +66,27 @@ class WNDisaggSnapshotManager
bool registerSnapshot(
const DisaggTaskId & task_id,
const DisaggReadSnapshotPtr & snap,
const Timepoint & expired_at)
std::chrono::seconds refresh_duration)
{
return snapshots.withExclusive([&](auto & snapshots) {
LOG_INFO(log, "Register Disaggregated Snapshot, task_id={}", task_id);

// Since EstablishDisagg may be retried, there may be existing snapshot.
// We replace these existing snapshot using a new one.
snapshots[task_id] = SnapshotWithExpireTime{.snap = snap, .expired_at = expired_at};
snapshots[task_id] = std::make_unique<SnapshotWithExpireTime>(snap, refresh_duration);
return true;
});
}

DisaggReadSnapshotPtr getSnapshot(const DisaggTaskId & task_id) const
DisaggReadSnapshotPtr getSnapshot(const DisaggTaskId & task_id, bool refresh_expiration = false) const
{
return snapshots.withShared([&](auto & snapshots) {
if (auto iter = snapshots.find(task_id); iter != snapshots.end())
return iter->second.snap;
{
if (refresh_expiration)
iter->second->refreshExpiredTime();
return iter->second->snap;
}
return DisaggReadSnapshotPtr{nullptr};
});
}
Expand All @@ -92,7 +112,7 @@ class WNDisaggSnapshotManager
void clearExpiredSnapshots();

private:
SharedMutexProtected<std::unordered_map<DisaggTaskId, SnapshotWithExpireTime>> snapshots;
SharedMutexProtected<std::unordered_map<DisaggTaskId, SnapshotWithExpireTimePtr>> snapshots;

BackgroundProcessingPool & pool;
BackgroundProcessingPool::TaskHandle handle;
Expand Down

0 comments on commit f579b3e

Please sign in to comment.