Skip to content

Commit

Permalink
KVStore: Fallback elegantly when getting table error in FAP (pingcap#…
Browse files Browse the repository at this point in the history
…9253)

ref pingcap#8673

Co-authored-by: JaySon-Huang <[email protected]>
Co-authored-by: JaySon <[email protected]>
  • Loading branch information
CalvinNeo and JaySon-Huang committed Jul 26, 2024
1 parent 0212ecb commit 5c231da
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 43 deletions.
45 changes: 25 additions & 20 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,25 +436,31 @@ RegionTable::ResolveLocksAndWriteRegionRes RegionTable::resolveLocksAndWriteRegi
region_data_lock);
}

// Note that there could be a chance that the table have been totally removed from TiKV
// and TiFlash can not get the IStorage instance.
// - Check whether the StorageDeltaMerge is nullptr or not before you accessing to it.
// - You should hold the `TableLockHolder` when you're accessing to the IStorage instance.
std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshotConstPtr> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
AtomicGetStorageSchema(RegionID region_id, KeyspaceID keyspace_id, TableID table_id, TMTContext & tmt)
{
TableLockHolder drop_lock = nullptr;
std::shared_ptr<StorageDeltaMerge> dm_storage;
DecodingStorageSchemaSnapshotConstPtr schema_snapshot;

auto keyspace_id = region->getKeyspaceID();
auto table_id = region->getMappedTableID();
LOG_DEBUG(Logger::get(__PRETTY_FUNCTION__), "Get schema, keyspace={} table_id={}", keyspace_id, table_id);
auto context = tmt.getContext();
LOG_DEBUG(
Logger::get("AtomicGetStorageSchema"),
"Get schema, region_id={} keyspace={} table_id={}",
region_id,
keyspace_id,
table_id);
auto & context = tmt.getContext();
const auto atomic_get = [&](bool force_decode) -> bool {
auto storage = tmt.getStorages().get(keyspace_id, table_id);
if (storage == nullptr)
{
if (!force_decode)
return false;
if (storage == nullptr) // Table must have just been GC-ed
return true;
// If force_decode == false, then should sync schema and check again
// Else force_decode == true, and storage instance still not exist, table must have just been GC-ed
return force_decode;
}
// Get a structure read lock. It will throw exception if the table has been dropped,
// the caller should handle this situation.
Expand All @@ -470,22 +476,19 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
{
GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment();
Stopwatch watch;
tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id);
auto sync_schema_ok = tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id);
auto schema_sync_cost = watch.elapsedMilliseconds();
LOG_INFO(
Logger::get("AtomicGetStorageSchema"),
"sync schema cost {} ms, keyspace={} table_id={}",
"sync schema cost {} ms, sync_schema_ok={} region_id={} keyspace={} table_id={}",
schema_sync_cost,
sync_schema_ok,
region_id,
keyspace_id,
table_id);

if (!atomic_get(true))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"AtomicGetStorageSchema failed, region={} keyspace={} table_id={}",
region->toString(),
keyspace_id,
table_id);
// try get with `force_decode == true`
atomic_get(true);
}

return {std::move(drop_lock), std::move(dm_storage), std::move(schema_snapshot)};
Expand All @@ -498,8 +501,10 @@ static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & sch
if (ori.columns() != schema.size())
{
throw Exception(
"Try to sortColumnsBySchemaSnap with different column size [block_columns=" + DB::toString(ori.columns())
+ "] [schema_columns=" + DB::toString(schema.size()) + "]");
ErrorCodes::LOGICAL_ERROR,
"Try to sortColumnsBySchemaSnap with different column size [block_columns={}] [schema_columns={}]",
ori.columns(),
schema.size());
}
#endif

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Decode/PartitionStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void RemoveRegionCommitCache(
bool lock_region = true);

std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshotConstPtr> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt);
AtomicGetStorageSchema(RegionID region_id, KeyspaceID keyspace_id, TableID table_id, TMTContext & tmt);

Block GenRegionBlockDataWithSchema(
const RegionPtr & region, //
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void KVStore::onSnapshot(
LOG_INFO(
log,
"clear old range before apply snapshot, region_id={} old_range={} new_range={} "
"keyspace_id={} table_id={}",
"keyspace={} table_id={}",
region_id,
old_key_range.toDebugString(),
new_key_range.toDebugString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region,
{
LOG_INFO(
log,
"No found storage in clean stale FAP data region_id={} keyspace_id={} table_id={}",
"No found storage in clean stale FAP data region_id={} keyspace={} table_id={}",
region_id,
keyspace_id,
table_id);
Expand All @@ -241,12 +241,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region,
});
wn_ps->write(std::move(wb));

LOG_INFO(
log,
"Finish clean stale FAP data region_id={} keyspace_id={} table_id={}",
region_id,
keyspace_id,
table_id);
LOG_INFO(log, "Finish clean stale FAP data region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id);
}

bool CheckpointIngestInfo::cleanOnSuccess(TMTContext & tmt, UInt64 region_id)
Expand Down
29 changes: 19 additions & 10 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
Stopwatch watch;
std::unordered_map<StoreID, UInt64> checked_seq_map;
auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context;
RUNTIME_CHECK(fap_ctx != nullptr);
RUNTIME_CHECK(fap_ctx->tasks_trace != nullptr);
auto cancel_handle = fap_ctx->tasks_trace->getCancelHandleFromExecutor(region_id);

// Get candidate stores.
Expand Down Expand Up @@ -307,7 +309,17 @@ FastAddPeerRes FastAddPeerImplWrite(

auto keyspace_id = region->getKeyspaceID();
auto table_id = region->getMappedTableID();
const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region, tmt);
const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region_id, keyspace_id, table_id, tmt);
if (!storage)
{
LOG_WARNING(
log,
"FAP failed because the table can not be found, region_id={} keyspace={} table_id={}",
region_id,
keyspace_id,
table_id);
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
}
UNUSED(schema_snap);
RUNTIME_CHECK_MSG(storage->engineType() == TiDB::StorageEngine::DT, "ingest into unsupported storage engine");
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
Expand All @@ -321,7 +333,7 @@ FastAddPeerRes FastAddPeerImplWrite(
{
LOG_INFO(
log,
"FAP is canceled before write, region_id={} keyspace_id={} table_id={}",
"FAP is canceled before write, region_id={} keyspace={} table_id={}",
region_id,
keyspace_id,
table_id);
Expand Down Expand Up @@ -361,7 +373,7 @@ FastAddPeerRes FastAddPeerImplWrite(
{
LOG_INFO(
log,
"FAP is canceled after write segments, region_id={} keyspace_id={} table_id={}",
"FAP is canceled after write segments, region_id={} keyspace={} table_id={}",
region_id,
keyspace_id,
table_id);
Expand All @@ -374,6 +386,7 @@ FastAddPeerRes FastAddPeerImplWrite(
// Currently, FAP only handle when the peer is newly created in this store.
// TODO(fap) However, Move this to `ApplyFapSnapshot` and clean stale data, if FAP can later handle all snapshots.
UniversalWriteBatch wb;
RUNTIME_CHECK(checkpoint_info->temp_ps != nullptr);
RaftDataReader raft_data_reader(*(checkpoint_info->temp_ps));
raft_data_reader.traverseRemoteRaftLogForRegion(
region_id,
Expand All @@ -388,26 +401,22 @@ FastAddPeerRes FastAddPeerImplWrite(
});
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_raft).Observe(watch.elapsedSecondsFromLastTime());
auto wn_ps = tmt.getContext().getWriteNodePageStorage();
RUNTIME_CHECK(wn_ps != nullptr);
wn_ps->write(std::move(wb));
SYNC_FOR("in_FastAddPeerImplWrite::after_write_raft_log");
if (cancel_handle->isCanceled())
{
LOG_INFO(
log,
"FAP is canceled after write raft log, region_id={} keyspace_id={} table_id={}",
"FAP is canceled after write raft log, region_id={} keyspace={} table_id={}",
region_id,
keyspace_id,
table_id);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::TiFlashCancel);
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
}
LOG_DEBUG(
log,
"Finish write FAP snapshot, region_id={} keyspace_id={} table_id={}",
region_id,
keyspace_id,
table_id);
LOG_DEBUG(log, "Finish write FAP snapshot, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id);
return genFastAddPeerRes(
FastAddPeerStatus::Ok,
apply_state.SerializeAsString(),
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
});

PrehandleResult prehandle_result;
TableID physical_table_id = InvalidTableID;
TableID physical_table_id = new_region->getMappedTableID();

auto region_id = new_region->id();
auto prehandle_task = prehandling_trace.registerTask(region_id);
Expand All @@ -694,7 +694,8 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
{
// Get storage schema atomically, will do schema sync if the storage does not exists.
// Will return the storage even if it is tombstone.
const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(new_region, tmt);
const auto [table_drop_lock, storage, schema_snap]
= AtomicGetStorageSchema(region_id, keyspace_id, physical_table_id, tmt);
if (unlikely(storage == nullptr))
{
// The storage must be physically dropped, throw exception and do cleanup.
Expand All @@ -711,7 +712,6 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
/* ignore_cache= */ false,
context.getSettingsRef().safe_point_update_interval_seconds);
}
physical_table_id = storage->getTableInfo().id;

auto opt = DM::SSTFilesToBlockInputStreamOpts{
.log_prefix = fmt::format("keyspace={} table_id={}", keyspace_id, physical_table_id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,5 @@ try
}
CATCH


} // namespace tests
} // namespace DB

0 comments on commit 5c231da

Please sign in to comment.