Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Fallback elegantly when getting table error in FAP #9253

Merged
merged 12 commits into from
Jul 26, 2024
38 changes: 23 additions & 15 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ RegionTable::ResolveLocksAndWriteRegionRes RegionTable::resolveLocksAndWriteRegi
region_data_lock);
}

// Note that there could be a chance that the table have been totally removed from TiKV
// and TiFlash can not get the IStorage instance.
// - Check whether the StorageDeltaMerge is nullptr or not before you accessing to it.
// - You should hold the `TableLockHolder` when you're accessing to the IStorage instance.
std::tuple<TableLockHolder, std::shared_ptr<StorageDeltaMerge>, DecodingStorageSchemaSnapshotConstPtr> //
AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
{
Expand All @@ -514,16 +518,21 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)

auto keyspace_id = region->getKeyspaceID();
auto table_id = region->getMappedTableID();
LOG_DEBUG(Logger::get(__PRETTY_FUNCTION__), "Get schema, keyspace={} table_id={}", keyspace_id, table_id);
LOG_DEBUG(
Logger::get("AtomicGetStorageSchema"),
"Get schema, keyspace={} table_id={} region_id={}",
keyspace_id,
table_id,
region->id());
auto & context = tmt.getContext();
const auto atomic_get = [&](bool force_decode) -> bool {
auto storage = tmt.getStorages().get(keyspace_id, table_id);
if (storage == nullptr)
{
if (!force_decode)
return false;
if (storage == nullptr) // Table must have just been GC-ed
return true;
// Else force_decode == true, and storage instance still not exist, table must have just been GC-ed
return true;
}
// Get a structure read lock. It will throw exception if the table has been dropped,
// the caller should handle this situation.
Expand All @@ -539,22 +548,19 @@ AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt)
{
GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment();
Stopwatch watch;
tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id);
auto sync_schema_ok = tmt.getSchemaSyncerManager()->syncTableSchema(context, keyspace_id, table_id);
auto schema_sync_cost = watch.elapsedMilliseconds();
LOG_INFO(
Logger::get("AtomicGetStorageSchema"),
"sync schema cost {} ms, keyspace={} table_id={}",
"sync schema cost {} ms, sync_schema_ok={} keyspace={} table_id={} region_id={}",
schema_sync_cost,
sync_schema_ok,
keyspace_id,
table_id);
table_id,
region->id());

if (!atomic_get(true))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"AtomicGetStorageSchema failed, region={} keyspace={} table_id={}",
region->toString(),
keyspace_id,
table_id);
// try get with `force_decode == true`
atomic_get(true);
}

return {std::move(drop_lock), std::move(dm_storage), std::move(schema_snapshot)};
Expand All @@ -567,8 +573,10 @@ static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & sch
if (ori.columns() != schema.size())
{
throw Exception(
"Try to sortColumnsBySchemaSnap with different column size [block_columns=" + DB::toString(ori.columns())
+ "] [schema_columns=" + DB::toString(schema.size()) + "]");
ErrorCodes::LOGICAL_ERROR,
"Try to sortColumnsBySchemaSnap with different column size [block_columns={}] [schema_columns={}]",
ori.columns(),
schema.size());
}
#endif

Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
Stopwatch watch;
std::unordered_map<StoreID, UInt64> checked_seq_map;
auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context;
RUNTIME_CHECK(fap_ctx != nullptr);
RUNTIME_CHECK(fap_ctx->tasks_trace != nullptr);
auto cancel_handle = fap_ctx->tasks_trace->getCancelHandleFromExecutor(region_id);

// Get candidate stores.
Expand Down Expand Up @@ -307,6 +309,16 @@ FastAddPeerRes FastAddPeerImplWrite(
auto keyspace_id = region->getKeyspaceID();
auto table_id = region->getMappedTableID();
const auto [table_drop_lock, storage, schema_snap] = AtomicGetStorageSchema(region, tmt);
if (!storage)
{
LOG_WARNING(
log,
"FAP failed because the table can not be found, region_id={} keyspace_id={} table_id={}",
region_id,
keyspace_id,
table_id);
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
}
UNUSED(schema_snap);
RUNTIME_CHECK_MSG(storage->engineType() == TiDB::StorageEngine::DT, "ingest into unsupported storage engine");
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
Expand Down Expand Up @@ -373,6 +385,7 @@ FastAddPeerRes FastAddPeerImplWrite(
// Currently, FAP only handle when the peer is newly created in this store.
// TODO(fap) However, Move this to `ApplyFapSnapshot` and clean stale data, if FAP can later handle all snapshots.
UniversalWriteBatch wb;
RUNTIME_CHECK(checkpoint_info->temp_ps != nullptr);
RaftDataReader raft_data_reader(*(checkpoint_info->temp_ps));
raft_data_reader.traverseRemoteRaftLogForRegion(
region_id,
Expand All @@ -387,6 +400,7 @@ FastAddPeerRes FastAddPeerImplWrite(
});
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_raft).Observe(watch.elapsedSecondsFromLastTime());
auto wn_ps = tmt.getContext().getWriteNodePageStorage();
RUNTIME_CHECK(wn_ps != nullptr);
wn_ps->write(std::move(wb));
SYNC_FOR("in_FastAddPeerImplWrite::after_write_raft_log");
if (cancel_handle->isCanceled())
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,34 @@ try
}
CATCH

TEST_F(RegionKVStoreTestFAP, TableNotFound)
try
{
CheckpointRegionInfoAndData mock_data = prepareForRestart(FAPTestOpt{});
RegionPtr kv_region = std::get<1>(mock_data);

auto & global_context = TiFlashTestEnv::getGlobalContext();
auto & tmt = global_context.getTMTContext();
uint64_t region_id = 1;

auto keyspace_id = kv_region->getKeyspaceID();
auto table_id = kv_region->getMappedTableID();
auto fap_context = global_context.getSharedContextDisagg()->fap_context;

std::mutex exe_mut;
std::unique_lock exe_lock(exe_mut);
fap_context->tasks_trace->addTask(region_id, [&]() {
// Keep the task in `tasks_trace` to prevent from canceling.
std::scoped_lock wait_exe_lock(exe_mut);
return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", "");
});

auto & storages = tmt.getStorages();
storages.remove(keyspace_id, table_id);
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
// Will generate and persist some information in local ps, which will not be uploaded.
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
FastAddPeerImplWrite(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333, std::move(mock_data), 0);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
}
CATCH

} // namespace tests
} // namespace DB