diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index f415cb7a69b..557cf7aac12 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -436,25 +436,31 @@ RegionTable::ResolveLocksAndWriteRegionRes RegionTable::resolveLocksAndWriteRegi region_data_lock); } +// Note that there could be a chance that the table have been totally removed from TiKV +// and TiFlash can not get the IStorage instance. +// - Check whether the StorageDeltaMerge is nullptr or not before you accessing to it. +// - You should hold the `TableLockHolder` when you're accessing to the IStorage instance. std::tuple, DecodingStorageSchemaSnapshotConstPtr> // -AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) +AtomicGetStorageSchema(RegionID region_id, KeyspaceID keyspace_id, TableID table_id, TMTContext & tmt) { TableLockHolder drop_lock = nullptr; std::shared_ptr dm_storage; DecodingStorageSchemaSnapshotConstPtr schema_snapshot; - auto keyspace_id = region->getKeyspaceID(); - auto table_id = region->getMappedTableID(); - LOG_DEBUG(Logger::get(__PRETTY_FUNCTION__), "Get schema, keyspace={} table_id={}", keyspace_id, table_id); - auto context = tmt.getContext(); + LOG_DEBUG( + Logger::get("AtomicGetStorageSchema"), + "Get schema, region_id={} keyspace={} table_id={}", + region_id, + keyspace_id, + table_id); + auto & context = tmt.getContext(); const auto atomic_get = [&](bool force_decode) -> bool { auto storage = tmt.getStorages().get(keyspace_id, table_id); if (storage == nullptr) { - if (!force_decode) - return false; - if (storage == nullptr) // Table must have just been GC-ed - return true; + // If force_decode == false, then should sync schema and check again + // Else force_decode == true, and storage instance still not exist, table must have just been GC-ed + return force_decode; } // Get a structure read lock. It will throw exception if the table has been dropped, // the caller should handle this situation. @@ -470,22 +476,19 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) { GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment(); Stopwatch watch; - tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id); + auto sync_schema_ok = tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id); auto schema_sync_cost = watch.elapsedMilliseconds(); LOG_INFO( Logger::get("AtomicGetStorageSchema"), - "sync schema cost {} ms, keyspace={} table_id={}", + "sync schema cost {} ms, sync_schema_ok={} region_id={} keyspace={} table_id={}", schema_sync_cost, + sync_schema_ok, + region_id, keyspace_id, table_id); - if (!atomic_get(true)) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "AtomicGetStorageSchema failed, region={} keyspace={} table_id={}", - region->toString(), - keyspace_id, - table_id); + // try get with `force_decode == true` + atomic_get(true); } return {std::move(drop_lock), std::move(dm_storage), std::move(schema_snapshot)}; @@ -498,8 +501,10 @@ static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & sch if (ori.columns() != schema.size()) { throw Exception( - "Try to sortColumnsBySchemaSnap with different column size [block_columns=" + DB::toString(ori.columns()) - + "] [schema_columns=" + DB::toString(schema.size()) + "]"); + ErrorCodes::LOGICAL_ERROR, + "Try to sortColumnsBySchemaSnap with different column size [block_columns={}] [schema_columns={}]", + ori.columns(), + schema.size()); } #endif diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.h b/dbms/src/Storages/KVStore/Decode/PartitionStreams.h index 934ab282198..e1c6f341346 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.h +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.h @@ -35,7 +35,7 @@ void RemoveRegionCommitCache( bool lock_region = true); std::tuple, DecodingStorageSchemaSnapshotConstPtr> // -AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); +AtomicGetStorageSchema(RegionID region_id, KeyspaceID keyspace_id, TableID table_id, TMTContext & tmt); Block GenRegionBlockDataWithSchema( const RegionPtr & region, // diff --git a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp index 521b94fa5b2..b00d82aa9a8 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp @@ -190,7 +190,7 @@ void KVStore::onSnapshot( LOG_INFO( log, "clear old range before apply snapshot, region_id={} old_range={} new_range={} " - "keyspace_id={} table_id={}", + "keyspace={} table_id={}", region_id, old_key_range.toDebugString(), new_key_range.toDebugString(), diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index fb7df02a75d..362d46d2c39 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -219,7 +219,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, { LOG_INFO( log, - "No found storage in clean stale FAP data region_id={} keyspace_id={} table_id={}", + "No found storage in clean stale FAP data region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -241,12 +241,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, }); wn_ps->write(std::move(wb)); - LOG_INFO( - log, - "Finish clean stale FAP data region_id={} keyspace_id={} table_id={}", - region_id, - keyspace_id, - table_id); + LOG_INFO(log, "Finish clean stale FAP data region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); } bool CheckpointIngestInfo::cleanOnSuccess(TMTContext & tmt, UInt64 region_id) diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index c5875dd328b..09698b07829 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -194,6 +194,8 @@ std::variant FastAddPeerImplSelect( Stopwatch watch; std::unordered_map checked_seq_map; auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; + RUNTIME_CHECK(fap_ctx != nullptr); + RUNTIME_CHECK(fap_ctx->tasks_trace != nullptr); auto cancel_handle = fap_ctx->tasks_trace->getCancelHandleFromExecutor(region_id); // Get candidate stores. @@ -307,7 +309,17 @@ FastAddPeerRes FastAddPeerImplWrite( auto keyspace_id = region->getKeyspaceID(); auto table_id = region->getMappedTableID(); - const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region, tmt); + const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region_id, keyspace_id, table_id, tmt); + if (!storage) + { + LOG_WARNING( + log, + "FAP failed because the table can not be found, region_id={} keyspace={} table_id={}", + region_id, + keyspace_id, + table_id); + return genFastAddPeerRes(FastAddPeerStatus::BadData, "", ""); + } UNUSED(schema_snap); RUNTIME_CHECK_MSG(storage->engineType() == TiDB::StorageEngine::DT, "ingest into unsupported storage engine"); auto dm_storage = std::dynamic_pointer_cast(storage); @@ -321,7 +333,7 @@ FastAddPeerRes FastAddPeerImplWrite( { LOG_INFO( log, - "FAP is canceled before write, region_id={} keyspace_id={} table_id={}", + "FAP is canceled before write, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -361,7 +373,7 @@ FastAddPeerRes FastAddPeerImplWrite( { LOG_INFO( log, - "FAP is canceled after write segments, region_id={} keyspace_id={} table_id={}", + "FAP is canceled after write segments, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -374,6 +386,7 @@ FastAddPeerRes FastAddPeerImplWrite( // Currently, FAP only handle when the peer is newly created in this store. // TODO(fap) However, Move this to `ApplyFapSnapshot` and clean stale data, if FAP can later handle all snapshots. UniversalWriteBatch wb; + RUNTIME_CHECK(checkpoint_info->temp_ps != nullptr); RaftDataReader raft_data_reader(*(checkpoint_info->temp_ps)); raft_data_reader.traverseRemoteRaftLogForRegion( region_id, @@ -388,13 +401,14 @@ FastAddPeerRes FastAddPeerImplWrite( }); GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_raft).Observe(watch.elapsedSecondsFromLastTime()); auto wn_ps = tmt.getContext().getWriteNodePageStorage(); + RUNTIME_CHECK(wn_ps != nullptr); wn_ps->write(std::move(wb)); SYNC_FOR("in_FastAddPeerImplWrite::after_write_raft_log"); if (cancel_handle->isCanceled()) { LOG_INFO( log, - "FAP is canceled after write raft log, region_id={} keyspace_id={} table_id={}", + "FAP is canceled after write raft log, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); @@ -402,12 +416,7 @@ FastAddPeerRes FastAddPeerImplWrite( GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); } - LOG_DEBUG( - log, - "Finish write FAP snapshot, region_id={} keyspace_id={} table_id={}", - region_id, - keyspace_id, - table_id); + LOG_DEBUG(log, "Finish write FAP snapshot, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id); return genFastAddPeerRes( FastAddPeerStatus::Ok, apply_state.SerializeAsString(), diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index 4684ff208a1..67ecc3319a4 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -682,7 +682,7 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( }); PrehandleResult prehandle_result; - TableID physical_table_id = InvalidTableID; + TableID physical_table_id = new_region->getMappedTableID(); auto region_id = new_region->id(); auto prehandle_task = prehandling_trace.registerTask(region_id); @@ -694,7 +694,8 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( { // Get storage schema atomically, will do schema sync if the storage does not exists. // Will return the storage even if it is tombstone. - const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(new_region, tmt); + const auto [table_drop_lock, storage, schema_snap] + = AtomicGetStorageSchema(region_id, keyspace_id, physical_table_id, tmt); if (unlikely(storage == nullptr)) { // The storage must be physically dropped, throw exception and do cleanup. @@ -711,7 +712,6 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles( /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds); } - physical_table_id = storage->getTableInfo().id; auto opt = DM::SSTFilesToBlockInputStreamOpts{ .log_prefix = fmt::format("keyspace={} table_id={}", keyspace_id, physical_table_id), diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 4f317a59bb2..1a15c2bd625 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -928,6 +928,5 @@ try } CATCH - } // namespace tests } // namespace DB