diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index b3f305ae754..3aa77d7b64c 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -1218,8 +1218,8 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max auto snaps = context.getSharedContextDisagg()->wn_snapshot_manager; const auto & snap_id = *dag_context.getDisaggTaskId(); auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout; - auto expired_at = Clock::now() + std::chrono::seconds(timeout_s); - bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at); + bool register_snapshot_ok + = snaps->registerSnapshot(snap_id, disaggregated_snap, std::chrono::seconds(timeout_s)); RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id); } @@ -1275,8 +1275,8 @@ void DAGStorageInterpreter::buildLocalExec( auto snaps = context.getSharedContextDisagg()->wn_snapshot_manager; const auto & snap_id = *dag_context.getDisaggTaskId(); auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout; - auto expired_at = Clock::now() + std::chrono::seconds(timeout_s); - bool register_snapshot_ok = snaps->registerSnapshot(snap_id, disaggregated_snap, expired_at); + bool register_snapshot_ok + = snaps->registerSnapshot(snap_id, disaggregated_snap, std::chrono::seconds(timeout_s)); RUNTIME_CHECK_MSG(register_snapshot_ok, "Disaggregated task has been registered, snap_id={}", snap_id); } diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 60f44c4f8b5..30e7bea9fa0 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -1110,7 +1110,7 @@ grpc::Status FlashService::FetchDisaggPages( try { - auto snap = snaps->getSnapshot(task_id); + auto snap = snaps->getSnapshot(task_id, /*refresh_expiration*/ true); RUNTIME_CHECK_MSG(snap != nullptr, "Can not find disaggregated task, task_id={}", task_id); auto task = snap->popSegTask(request->table_id(), request->segment_id()); RUNTIME_CHECK(task.isValid(), task.err_msg); diff --git a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp index 063df3feacb..c7f5e6da16a 100644 --- a/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/VectorIndexHNSW/Index.cpp @@ -343,7 +343,7 @@ std::vector VectorIndexHNSWViewer::searchWithDi { const auto rowid = result[i].member.key; if (valid_rows[rowid]) - search_results.emplace_back(rowid, result[i].distance); + search_results.push_back({rowid, result[i].distance}); } return search_results; } diff --git a/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.cpp b/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.cpp index be3d322f860..9f23c9fc7ab 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.cpp @@ -52,13 +52,13 @@ void WNDisaggSnapshotManager::clearExpiredSnapshots() snapshots.withExclusive([&](auto & snapshots) { for (auto iter = snapshots.begin(); iter != snapshots.end(); /*empty*/) { - if (iter->second.expired_at < now) + if (iter->second->expired_at < now) { LOG_INFO( log, "Remove expired Disaggregated Snapshot, task_id={} expired_at={:%Y-%m-%d %H:%M:%S}", iter->first, - iter->second.expired_at); + iter->second->expired_at); iter = snapshots.erase(iter); } else diff --git a/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h b/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h index 96f19e2dedb..f192529f970 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h +++ b/dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h @@ -26,7 +26,6 @@ #include #include - namespace DB::DM::Remote { /** @@ -39,9 +38,26 @@ class WNDisaggSnapshotManager struct SnapshotWithExpireTime { DisaggReadSnapshotPtr snap; + std::chrono::seconds refresh_duration; + std::mutex mtx; Timepoint expired_at; + + SnapshotWithExpireTime(DisaggReadSnapshotPtr snap_, std::chrono::seconds refresh_duration_) + : snap(std::move(snap_)) + , refresh_duration(refresh_duration_) + { + refreshExpiredTime(); + } + + void refreshExpiredTime() + { + std::lock_guard lock(mtx); + expired_at = Clock::now() + refresh_duration; + } }; + using SnapshotWithExpireTimePtr = std::unique_ptr; + public: explicit WNDisaggSnapshotManager(BackgroundProcessingPool & bg_pool); @@ -50,23 +66,27 @@ class WNDisaggSnapshotManager bool registerSnapshot( const DisaggTaskId & task_id, const DisaggReadSnapshotPtr & snap, - const Timepoint & expired_at) + std::chrono::seconds refresh_duration) { return snapshots.withExclusive([&](auto & snapshots) { LOG_INFO(log, "Register Disaggregated Snapshot, task_id={}", task_id); // Since EstablishDisagg may be retried, there may be existing snapshot. // We replace these existing snapshot using a new one. - snapshots[task_id] = SnapshotWithExpireTime{.snap = snap, .expired_at = expired_at}; + snapshots[task_id] = std::make_unique(snap, refresh_duration); return true; }); } - DisaggReadSnapshotPtr getSnapshot(const DisaggTaskId & task_id) const + DisaggReadSnapshotPtr getSnapshot(const DisaggTaskId & task_id, bool refresh_expiration = false) const { return snapshots.withShared([&](auto & snapshots) { if (auto iter = snapshots.find(task_id); iter != snapshots.end()) - return iter->second.snap; + { + if (refresh_expiration) + iter->second->refreshExpiredTime(); + return iter->second->snap; + } return DisaggReadSnapshotPtr{nullptr}; }); } @@ -92,7 +112,7 @@ class WNDisaggSnapshotManager void clearExpiredSnapshots(); private: - SharedMutexProtected> snapshots; + SharedMutexProtected> snapshots; BackgroundProcessingPool & pool; BackgroundProcessingPool::TaskHandle handle;