diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index 9d39958e923..4497f1e9c49 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -505,25 +505,31 @@ RegionTable::ResolveLocksAndWriteRegionRes RegionTable::resolveLocksAndWriteRegi region_data_lock); } +// Note that there could be a chance that the table have been totally removed from TiKV +// and TiFlash can not get the IStorage instance. +// - Check whether the StorageDeltaMerge is nullptr or not before you accessing to it. +// - You should hold the `TableLockHolder` when you're accessing to the IStorage instance. std::tuple, DecodingStorageSchemaSnapshotConstPtr> // -AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) +AtomicGetStorageSchema(RegionID region_id, KeyspaceID keyspace_id, TableID table_id, TMTContext & tmt) { TableLockHolder drop_lock = nullptr; std::shared_ptr dm_storage; DecodingStorageSchemaSnapshotConstPtr schema_snapshot; - auto keyspace_id = region->getKeyspaceID(); - auto table_id = region->getMappedTableID(); - LOG_DEBUG(Logger::get(__PRETTY_FUNCTION__), "Get schema, keyspace={} table_id={}", keyspace_id, table_id); + LOG_DEBUG( + Logger::get("AtomicGetStorageSchema"), + "Get schema, region_id={} keyspace={} table_id={}", + region_id, + keyspace_id, + table_id); auto & context = tmt.getContext(); const auto atomic_get = [&](bool force_decode) -> bool { auto storage = tmt.getStorages().get(keyspace_id, table_id); if (storage == nullptr) { - if (!force_decode) - return false; - if (storage == nullptr) // Table must have just been GC-ed - return true; + // If force_decode == false, then should sync schema and check again + // Else force_decode == true, and storage instance still not exist, table must have just been GC-ed + return force_decode; } // Get a structure read lock. It will throw exception if the table has been dropped, // the caller should handle this situation. @@ -539,22 +545,19 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) { GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment(); Stopwatch watch; - tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id); + auto sync_schema_ok = tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id); auto schema_sync_cost = watch.elapsedMilliseconds(); LOG_INFO( Logger::get("AtomicGetStorageSchema"), - "sync schema cost {} ms, keyspace={} table_id={}", + "sync schema cost {} ms, sync_schema_ok={} region_id={} keyspace={} table_id={}", schema_sync_cost, + sync_schema_ok, + region_id, keyspace_id, table_id); - if (!atomic_get(true)) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "AtomicGetStorageSchema failed, region={} keyspace={} table_id={}", - region->toString(), - keyspace_id, - table_id); + // try get with `force_decode == true` + atomic_get(true); } return {std::move(drop_lock), std::move(dm_storage), std::move(schema_snapshot)}; @@ -567,8 +570,10 @@ static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & sch if (ori.columns() != schema.size()) { throw Exception( - "Try to sortColumnsBySchemaSnap with different column size [block_columns=" + DB::toString(ori.columns()) - + "] [schema_columns=" + DB::toString(schema.size()) + "]"); + ErrorCodes::LOGICAL_ERROR, + "Try to sortColumnsBySchemaSnap with different column size [block_columns={}] [schema_columns={}]", + ori.columns(), + schema.size()); } #endif diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.h b/dbms/src/Storages/KVStore/Decode/PartitionStreams.h index ee3c28a0a2b..ff380e1b417 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.h +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.h @@ -37,7 +37,7 @@ void RemoveRegionCommitCache( bool lock_region = true); std::tuple, DecodingStorageSchemaSnapshotConstPtr> // -AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); +AtomicGetStorageSchema(RegionID region_id, KeyspaceID keyspace_id, TableID table_id, TMTContext & tmt); Block GenRegionBlockDataWithSchema( const RegionPtr & region, // diff --git a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp index 7bb8d5e945a..0f1bc76b438 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp @@ -190,7 +190,7 @@ void KVStore::onSnapshot( LOG_INFO( log, "clear old range before apply snapshot, region_id={} old_range={} new_range={} " - "keyspace_id={} table_id={}", + "keyspace={} table_id={}", region_id, old_key_range.toDebugString(), new_key_range.toDebugString(), diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index c25fe5f9871..be220967915 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -217,7 +217,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, { LOG_INFO( log, - "No found storage in clean stale FAP data region_id={} keyspace_id={} table_id={}", + "No found storage in clean stale FAP data region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -239,12 +239,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, }); wn_ps->write(std::move(wb)); - LOG_INFO( - log, - "Finish clean stale FAP data region_id={} keyspace_id={} table_id={}", - region_id, - keyspace_id, - table_id); + LOG_INFO(log, "Finish clean stale FAP data region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); } bool CheckpointIngestInfo::cleanOnSuccess(TMTContext & tmt, UInt64 region_id) diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index 9a13986dea3..2b30f6b02ac 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -193,6 +193,8 @@ std::variant FastAddPeerImplSelect( Stopwatch watch; std::unordered_map checked_seq_map; auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; + RUNTIME_CHECK(fap_ctx != nullptr); + RUNTIME_CHECK(fap_ctx->tasks_trace != nullptr); auto cancel_handle = fap_ctx->tasks_trace->getCancelHandleFromExecutor(region_id); // Get candidate stores. @@ -306,7 +308,17 @@ FastAddPeerRes FastAddPeerImplWrite( auto keyspace_id = region->getKeyspaceID(); auto table_id = region->getMappedTableID(); - const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region, tmt); + const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region_id, keyspace_id, table_id, tmt); + if (!storage) + { + LOG_WARNING( + log, + "FAP failed because the table can not be found, region_id={} keyspace={} table_id={}", + region_id, + keyspace_id, + table_id); + return genFastAddPeerRes(FastAddPeerStatus::BadData, "", ""); + } UNUSED(schema_snap); RUNTIME_CHECK_MSG(storage->engineType() == TiDB::StorageEngine::DT, "ingest into unsupported storage engine"); auto dm_storage = std::dynamic_pointer_cast(storage); @@ -320,7 +332,7 @@ FastAddPeerRes FastAddPeerImplWrite( { LOG_INFO( log, - "FAP is canceled before write, region_id={} keyspace_id={} table_id={}", + "FAP is canceled before write, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -360,7 +372,7 @@ FastAddPeerRes FastAddPeerImplWrite( { LOG_INFO( log, - "FAP is canceled after write segments, region_id={} keyspace_id={} table_id={}", + "FAP is canceled after write segments, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -373,6 +385,7 @@ FastAddPeerRes FastAddPeerImplWrite( // Currently, FAP only handle when the peer is newly created in this store. // TODO(fap) However, Move this to `ApplyFapSnapshot` and clean stale data, if FAP can later handle all snapshots. UniversalWriteBatch wb; + RUNTIME_CHECK(checkpoint_info->temp_ps != nullptr); RaftDataReader raft_data_reader(*(checkpoint_info->temp_ps)); raft_data_reader.traverseRemoteRaftLogForRegion( region_id, @@ -387,13 +400,14 @@ FastAddPeerRes FastAddPeerImplWrite( }); GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_raft).Observe(watch.elapsedSecondsFromLastTime()); auto wn_ps = tmt.getContext().getWriteNodePageStorage(); + RUNTIME_CHECK(wn_ps != nullptr); wn_ps->write(std::move(wb)); SYNC_FOR("in_FastAddPeerImplWrite::after_write_raft_log"); if (cancel_handle->isCanceled()) { LOG_INFO( log, - "FAP is canceled after write raft log, region_id={} keyspace_id={} table_id={}", + "FAP is canceled after write raft log, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -401,12 +415,7 @@ FastAddPeerRes FastAddPeerImplWrite( GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); } - LOG_DEBUG( - log, - "Finish write FAP snapshot, region_id={} keyspace_id={} table_id={}", - region_id, - keyspace_id, - table_id); + LOG_DEBUG(log, "Finish write FAP snapshot, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); return genFastAddPeerRes( FastAddPeerStatus::Ok, apply_state.SerializeAsString(), diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index ffaa688ea45..255064b8ff5 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -659,7 +659,7 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( }); PrehandleResult prehandle_result; - TableID physical_table_id = InvalidTableID; + TableID physical_table_id = new_region->getMappedTableID(); auto region_id = new_region->id(); auto prehandle_task = prehandling_trace.registerTask(region_id); @@ -671,7 +671,8 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( { // Get storage schema atomically, will do schema sync if the storage does not exists. // Will return the storage even if it is tombstone. - const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(new_region, tmt); + const auto [table_drop_lock, storage, schema_snap] + = AtomicGetStorageSchema(region_id, keyspace_id, physical_table_id, tmt); if (unlikely(storage == nullptr)) { // The storage must be physically dropped, throw exception and do cleanup. @@ -688,7 +689,6 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds); } - physical_table_id = storage->getTableInfo().id; auto opt = DM::SSTFilesToBlockInputStreamOpts{ .log_prefix = fmt::format("keyspace={} table_id={}", keyspace_id, physical_table_id), diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 3717bea6ee3..15e2373a34f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -925,6 +925,41 @@ try } CATCH +TEST_F(RegionKVStoreTestFAP, TableNotFound) +try +{ + CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{}); + RegionPtr kv_region = std::get<1>(mock_data); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + auto & tmt = global_context.getTMTContext(); + uint64_t region_id = 1; + + auto keyspace_id = kv_region->getKeyspaceID(); + auto table_id = kv_region->getMappedTableID(); + auto fap_context = global_context.getSharedContextDisagg()->fap_context; + + std::mutex exe_mut; + std::unique_lock exe_lock(exe_mut); + fap_context->tasks_trace->addTask(region_id, [&]() { + // Keep the task in `tasks_trace` to prevent from canceling. + std::scoped_lock wait_exe_lock(exe_mut); + return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", ""); + }); + + // Mock that the storage instance have been dropped + auto & storages = tmt.getStorages(); + storages.remove(keyspace_id, table_id); + auto res = FastAddPeerImplWrite( + global_context.getTMTContext(), + proxy_helper.get(), + region_id, + 2333, + std::move(mock_data), + 0); + ASSERT_EQ(res.status, FastAddPeerStatus::BadData); +} +CATCH } // namespace tests } // namespace DB