Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compat serverless FAP features including shard_ver, txn ref and enc key #9434

Merged
merged 7 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h>
#include <Storages/KVStore/MultiRaft/Disagg/ServerlessUtils.h>
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
Expand Down Expand Up @@ -55,17 +56,36 @@ extern const char force_fap_worker_throw[];
extern const char force_set_fap_candidate_store_id[];
} // namespace FailPoints

FastAddPeerRes genFastAddPeerRes(FastAddPeerStatus status, std::string && apply_str, std::string && region_str)
FastAddPeerRes genFastAddPeerRes(
FastAddPeerStatus status,
std::string && apply_str,
std::string && region_str,
uint64_t shard_ver,
std::string && inner_key_str,
std::string && enc_key_str,
std::string && txn_file_ref_str)
{
auto * apply = RawCppString::New(apply_str);
auto * region = RawCppString::New(region_str);
auto * inner_key = RawCppString::New(inner_key_str);
auto * enc_key = RawCppString::New(enc_key_str);
auto * txn_file_ref = RawCppString::New(txn_file_ref_str);
return FastAddPeerRes{
.status = status,
.apply_state = CppStrWithView{.inner = GenRawCppPtr(apply, RawCppPtrTypeImpl::String), .view = BaseBuffView{apply->data(), apply->size()}},
.region = CppStrWithView{.inner = GenRawCppPtr(region, RawCppPtrTypeImpl::String), .view = BaseBuffView{region->data(), region->size()}},
.shard_ver = shard_ver,
.inner_key = CppStrWithView{.inner = GenRawCppPtr(inner_key, RawCppPtrTypeImpl::String), .view = BaseBuffView{inner_key->data(), inner_key->size()}},
.enc_key = CppStrWithView{.inner = GenRawCppPtr(enc_key, RawCppPtrTypeImpl::String), .view = BaseBuffView{enc_key->data(), enc_key->size()}},
.txn_file_ref = CppStrWithView{.inner = GenRawCppPtr(txn_file_ref, RawCppPtrTypeImpl::String), .view = BaseBuffView{txn_file_ref->data(), txn_file_ref->size()}},
};
}

FastAddPeerRes genFastAddPeerResFail(FastAddPeerStatus status)
{
return genFastAddPeerRes(status, "", "", 0, "", "", "");
}

std::vector<StoreID> getCandidateStoreIDsForRegion(TMTContext & tmt_context, UInt64 region_id, UInt64 current_store_id)
{
fiu_do_on(FailPoints::force_set_fap_candidate_store_id, { return {1234}; });
Expand Down Expand Up @@ -208,7 +228,7 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
{
LOG_DEBUG(log, "No suitable candidate peer for region_id={}", region_id);
GET_METRIC(tiflash_fap_task_result, type_failed_no_suitable).Increment();
return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::NoSuitable);
}
LOG_DEBUG(log, "Begin to select checkpoint for region_id={}", region_id);

Expand Down Expand Up @@ -269,7 +289,7 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
region_id,
new_peer_id);
GET_METRIC(tiflash_fap_task_result, type_failed_timeout).Increment();
return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::NoSuitable);
}
SYNC_FOR("in_FastAddPeerImplSelect::before_sleep");
if (cancel_handle->blockedWaitFor(std::chrono::milliseconds(1000)))
Expand All @@ -280,7 +300,7 @@ std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
fap_ctx->tasks_trace->leakingDiscardTask(region_id);
// We immediately increase this metrics when cancel, since a canceled task may not be fetched.
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::Canceled);
}
}
}
Expand Down Expand Up @@ -317,7 +337,7 @@ FastAddPeerRes FastAddPeerImplWrite(
region_id,
keyspace_id,
table_id);
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::BadData);
}
UNUSED(schema_snap);
RUNTIME_CHECK_MSG(storage->engineType() == TiDB::StorageEngine::DT, "ingest into unsupported storage engine");
Expand All @@ -338,7 +358,7 @@ FastAddPeerRes FastAddPeerImplWrite(
table_id);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::TiFlashCancel);
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::Canceled);
}

DM::Segments segments;
Expand Down Expand Up @@ -378,7 +398,7 @@ FastAddPeerRes FastAddPeerImplWrite(
table_id);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::TiFlashCancel);
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::Canceled);
}

// Write raft log to uni ps, we do this here because we store raft log seperately.
Expand Down Expand Up @@ -413,13 +433,20 @@ FastAddPeerRes FastAddPeerImplWrite(
table_id);
fap_ctx->cleanTask(tmt, proxy_helper, region_id, CheckpointIngestInfo::CleanReason::TiFlashCancel);
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::Canceled);
}
LOG_DEBUG(log, "Finish write FAP snapshot, region_id={} keyspace={} table_id={}", region_id, keyspace_id, table_id);
// Return a FastAddPeerRes with region meta under the serverless branch to generate a fap snapshot
// with correct meta
auto tmp_ps = checkpoint_info->checkpoint_data_holder->getUniversalPageStorage();
return genFastAddPeerRes(
FastAddPeerStatus::Ok,
apply_state.SerializeAsString(),
region_state.region().SerializeAsString());
region_state.region().SerializeAsString(),
getShardVer(tmp_ps, region_id),
getCompactibleInnerKey(tmp_ps, keyspace_id, region_id),
getCompactibleEncKey(tmp_ps, keyspace_id, region_id),
getTxnFileRef(tmp_ps, region_id));
}

// This function executes FAP phase 1 from a thread in a dedicated pool.
Expand Down Expand Up @@ -477,7 +504,7 @@ FastAddPeerRes FastAddPeerImpl(
GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment();
// The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState,
// since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy.
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::BadData);
}
catch (...)
{
Expand All @@ -490,7 +517,7 @@ FastAddPeerRes FastAddPeerImpl(
GET_METRIC(tiflash_fap_task_result, type_failed_baddata).Increment();
// The task could stuck in AsyncTasks as Finished till fetched by resolveFapSnapshotState.
// since a FastAddPeerStatus::BadData result will lead to a fallback in Proxy.
return genFastAddPeerRes(FastAddPeerStatus::BadData, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::BadData);
}
}

Expand Down Expand Up @@ -650,12 +677,12 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
{
auto log = Logger::get("FastAddPeer");
if (!server->tmt->getContext().getSharedContextDisagg()->isDisaggregatedStorageMode())
return genFastAddPeerRes(FastAddPeerStatus::OtherError, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::OtherError);
auto fap_ctx = server->tmt->getContext().getSharedContextDisagg()->fap_context;
if (fap_ctx == nullptr)
{
LOG_WARNING(log, "FAP Context is not initialized. Should only enable FAP in DisaggregatedStorageMode.");
return genFastAddPeerRes(FastAddPeerStatus::NoSuitable, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::NoSuitable);
}
if (!fap_ctx->tasks_trace->isScheduled(region_id))
{
Expand All @@ -682,7 +709,7 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
new_peer_id);
// It is already canceled in queue.
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::Canceled);
});
if (res)
{
Expand All @@ -699,7 +726,7 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
new_peer_id,
region_id);
GET_METRIC(tiflash_fap_task_result, type_failed_other).Increment();
return genFastAddPeerRes(FastAddPeerStatus::OtherError, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::OtherError);
}
}

Expand Down Expand Up @@ -747,10 +774,10 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
}
GET_METRIC(tiflash_fap_task_state, type_blocking_cancel_stage).Decrement();
// Return Canceled because it is cancel from outside FAP worker.
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::Canceled);
}
LOG_DEBUG(log, "Task is still pending new_peer_id={} region_id={}", new_peer_id, region_id);
return genFastAddPeerRes(FastAddPeerStatus::WaitForData, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::WaitForData);
}
}
catch (const Exception & e)
Expand All @@ -762,7 +789,7 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
region_id,
new_peer_id,
e.message()));
return genFastAddPeerRes(FastAddPeerStatus::OtherError, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::OtherError);
}
catch (...)
{
Expand All @@ -772,7 +799,7 @@ FastAddPeerRes FastAddPeer(EngineStoreServerWrap * server, uint64_t region_id, u
"Failed when try to restore from checkpoint region_id={} new_peer_id={}",
region_id,
new_peer_id));
return genFastAddPeerRes(FastAddPeerStatus::OtherError, "", "");
return genFastAddPeerResFail(FastAddPeerStatus::OtherError);
}
}

Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ class Region;
using RegionPtr = std::shared_ptr<Region>;
using CheckpointRegionInfoAndData
= std::tuple<CheckpointInfoPtr, RegionPtr, raft_serverpb::RaftApplyState, raft_serverpb::RegionLocalState>;
FastAddPeerRes genFastAddPeerRes(FastAddPeerStatus status, std::string && apply_str, std::string && region_str);
FastAddPeerRes genFastAddPeerRes(
FastAddPeerStatus status,
std::string && apply_str,
std::string && region_str,
uint64_t shard_ver,
std::string && inner_key_str,
std::string && enc_key_str,
std::string && txn_file_ref_str);
FastAddPeerRes genFastAddPeerResFail(FastAddPeerStatus status);
std::variant<CheckpointRegionInfoAndData, FastAddPeerRes> FastAddPeerImplSelect(
TMTContext & tmt,
const TiFlashRaftProxyHelper * proxy_helper,
Expand Down
Loading