Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAP: compute RU and match index and term when handling snapshot #8716

Merged
merged 15 commits into from
Jan 29, 2024
7 changes: 7 additions & 0 deletions dbms/src/Common/TiFlashMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ void TiFlashMetrics::addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru)
counter->Increment(ru);
}

UInt64 TiFlashMetrics::queryReplicaSyncRU(UInt32 keyspace_id)
{
std::unique_lock lock(replica_sync_ru_mtx);
auto * counter = getReplicaSyncRUCounter(keyspace_id, lock);
return counter->Value();
}

prometheus::Counter * TiFlashMetrics::getReplicaSyncRUCounter(UInt32 keyspace_id, std::unique_lock<std::mutex> &)
{
auto itr = registered_keyspace_sync_replica_ru.find(keyspace_id);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,7 @@ class TiFlashMetrics
static TiFlashMetrics & instance();

void addReplicaSyncRU(UInt32 keyspace_id, UInt64 ru);
UInt64 queryReplicaSyncRU(UInt32 keyspace_id);
JinheLin marked this conversation as resolved.
Show resolved Hide resolved

private:
TiFlashMetrics();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,12 @@ class DeltaMergeStore : private boost::noncopyable
return buildSegmentsFromCheckpointInfo(dm_context, range, checkpoint_info);
}

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const DMContextPtr & dm_context,
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info);

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const Context & db_context,
const DB::Settings & db_settings,
const DM::RowKeyRange & range,
Expand Down
19 changes: 14 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo(
return {};
}

void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
UInt64 DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
const DMContextPtr & dm_context,
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info)
Expand All @@ -1202,18 +1202,25 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
"Ingest checkpoint from remote meet empty range, ignore, store_id={} region_id={}",
checkpoint_info->getRemoteStoreId(),
checkpoint_info->regionId());
return;
return 0;
}

auto restored_segments = checkpoint_info->getRestoredSegments();
auto updated_segments = ingestSegmentsUsingSplit(dm_context, range, restored_segments);
auto estimated_bytes = 0;

for (const auto & segment : restored_segments)
{
estimated_bytes += segment->getEstimatedBytes();
}

LOG_INFO(
log,
"Ingest checkpoint from remote done, store_id={} region_id={} n_segments={}",
"Ingest checkpoint from remote done, store_id={} region_id={} n_segments={} est_bytes={}",
checkpoint_info->getRemoteStoreId(),
checkpoint_info->regionId(),
restored_segments.size());

restored_segments.size(),
estimated_bytes);

WriteBatches wbs{*dm_context->storage_pool};
for (auto & segment : restored_segments)
Expand All @@ -1228,6 +1235,8 @@ void DeltaMergeStore::ingestSegmentsFromCheckpointInfo(
// TODO(fap) This could be executed in a dedicated thread if it consumes too much time.
for (auto & segment : updated_segments)
checkSegmentUpdate(dm_context, segment, ThreadType::Write, InputType::RaftSSTAndSnap);

return estimated_bytes;
}

} // namespace DM
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,18 @@ void KVStore::onSnapshot(
}
else if constexpr (std::is_same_v<RegionPtrWrap, RegionPtrWithCheckpointInfo>)
{
dm_storage->ingestSegmentsFromCheckpointInfo(
auto ingested_bytes = dm_storage->ingestSegmentsFromCheckpointInfo(
new_key_range,
new_region_wrap.checkpoint_info,
context.getSettingsRef());
if (auto [count, is_syncing] = getTiFlashReplicaSyncInfo(dm_storage); is_syncing)
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
{
// For write, 1 RU per KB. Reference: https://docs.pingcap.com/tidb/v7.0/tidb-resource-control
// Only calculate RU of one replica. So each replica reports 1/count consumptions.
TiFlashMetrics::instance().addReplicaSyncRU(
keyspace_id,
std::ceil(static_cast<double>(ingested_bytes) / 1024.0 / count));
}
}
else
{
Expand Down
24 changes: 23 additions & 1 deletion dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerCache.h>
#include <Storages/KVStore/MultiRaft/Disagg/FastAddPeerContext.h>
#include <Storages/KVStore/Types.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/KVStore/tests/kvstore_helper.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
Expand Down Expand Up @@ -174,6 +175,7 @@ class RegionKVStoreTestFAP : public KVStoreTestBase

protected:
UInt64 upload_sequence = 1000;
UInt64 table_id;

private:
ContextPtr context;
Expand Down Expand Up @@ -353,7 +355,7 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart(FAPTestOpt o
global_context.getTMTContext().debugSetKVStore(kvstore);
auto page_storage = global_context.getWriteNodePageStorage();

TableID table_id = proxy_instance->bootstrapTable(global_context, kvs, global_context.getTMTContext());
table_id = proxy_instance->bootstrapTable(global_context, kvs, global_context.getTMTContext());
auto fap_context = global_context.getSharedContextDisagg()->fap_context;
proxy_instance->bootstrapWithRegion(kvs, global_context.getTMTContext(), region_id, std::nullopt);
auto proxy_helper = proxy_instance->generateProxyHelper();
Expand All @@ -365,6 +367,7 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart(FAPTestOpt o
UInt64 index = 0;
if (!opt.persist_empty_segment)
{
LOG_DEBUG(log, "Do write to the region");
auto k1 = RecordKVFormat::genKey(table_id, 1, 111);
auto && [value_write1, value_default1] = proxy_instance->generateTiKVKeyValue(111, 999);
UInt64 term = 0;
Expand Down Expand Up @@ -415,6 +418,15 @@ CheckpointRegionInfoAndData RegionKVStoreTestFAP::prepareForRestart(FAPTestOpt o
return mock_data;
}

// This function get tiflash replica count from local schema.
void setTiFlashReplicaSyncInfo(StorageDeltaMergePtr & dm_storage)
{
auto table_info = dm_storage->getTableInfo();
table_info.replica_info.count = 1;
table_info.replica_info.available = false;
dm_storage->setTableInfo(table_info);
}

// Test load from restart.
TEST_F(RegionKVStoreTestFAP, RestoreFromRestart1)
try
Expand All @@ -426,6 +438,13 @@ try
auto fap_context = global_context.getSharedContextDisagg()->fap_context;
uint64_t region_id = 1;

{
auto storage = global_context.getTMTContext().getStorages().get(NullspaceID, table_id);
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
ASSERT_TRUE(dm_storage != nullptr);
setTiFlashReplicaSyncInfo(dm_storage);
}

std::mutex exe_mut;
std::unique_lock exe_lock(exe_mut);
fap_context->tasks_trace->addTask(region_id, [&]() {
Expand All @@ -445,8 +464,11 @@ try
SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_not_clean_fap_on_destroy"); });
kvstore->handleDestroy(region_id, global_context.getTMTContext());

auto prev_ru = TiFlashMetrics::instance().queryReplicaSyncRU(NullspaceID);
// After restart, continue the FAP from persisted checkpoint ingest info.
ApplyFapSnapshotImpl(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333);
auto current_ru = TiFlashMetrics::instance().queryReplicaSyncRU(NullspaceID);
ASSERT_GT(current_ru, prev_ru);

{
auto keyspace_id = kv_region->getKeyspaceID();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ DM::Segments StorageDeltaMerge::buildSegmentsFromCheckpointInfo(
return getAndMaybeInitStore()->buildSegmentsFromCheckpointInfo(global_context, settings, range, checkpoint_info);
}

void StorageDeltaMerge::ingestSegmentsFromCheckpointInfo(
UInt64 StorageDeltaMerge::ingestSegmentsFromCheckpointInfo(
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info,
const Settings & settings)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class StorageDeltaMerge
CheckpointInfoPtr checkpoint_info,
const Settings & settings);

void ingestSegmentsFromCheckpointInfo(
UInt64 ingestSegmentsFromCheckpointInfo(
const DM::RowKeyRange & range,
const CheckpointIngestInfoPtr & checkpoint_info,
const Settings & settings);
Expand Down