Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
Signed-off-by: Calvin Neo <[email protected]>
  • Loading branch information
CalvinNeo committed Sep 24, 2024
1 parent 258aec3 commit bed9de6
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,158 +284,158 @@ class DeltaMergeStoreTestFastAddPeer
constexpr static const char * TRACING_NAME = "DeltaMergeStoreTestFastAddPeer";
};

TEST_P(DeltaMergeStoreTestFastAddPeer, SimpleWriteReadAfterRestoreFromCheckPoint)
try
{
UInt64 write_store_id = current_store_id + 1;
resetStoreId(write_store_id);
{
auto table_column_defines = DMTestEnv::getDefaultColumns();

store = reload(table_column_defines);
}

const size_t num_rows_write = 128;
// write DMFile
{
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
store->write(*db_context, db_context->getSettingsRef(), block);
store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
store->mergeDeltaAll(*db_context);
}

// Write ColumnFileTiny
{
Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write, num_rows_write + num_rows_write, false);
store->write(*db_context, db_context->getSettingsRef(), block);
store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
}

// write ColumnFileDeleteRange
{
HandleRange handle_range(0, num_rows_write / 2);
store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(handle_range));
store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
}

// write ColumnFileBig
{
Block block = DMTestEnv::prepareSimpleWriteBlock(
num_rows_write + num_rows_write,
num_rows_write + 2 * num_rows_write,
false);
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto [range, file_ids] = genDMFile(*dm_context, block);
{
// Mock DMFiles are uploaded to S3 in SSTFilesToDTFilesOutputStream
auto remote_store = db_context->getSharedContextDisagg()->remote_data_store;
ASSERT_NE(remote_store, nullptr);
auto delegator = dm_context->path_pool->getStableDiskDelegator();
for (const auto & file_id : file_ids)
{
auto dm_file = DMFile::restore(
db_context->getFileProvider(),
file_id.id,
file_id.id,
delegator.getDTFilePath(file_id.id),
DMFileMeta::ReadMode::all());
remote_store->putDMFile(
dm_file,
S3::DMFileOID{
.store_id = write_store_id,
.keyspace_id = keyspace_id,
.table_id = store->physical_table_id,
.file_id = file_id.id,
},
true);
}
}
store->ingestFiles(dm_context, range, file_ids, false);
store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
}

dumpCheckpoint(write_store_id);

clearData();

verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0);

setStorageFormat(restore_format_version);

const auto manifest_key = S3::S3Filename::newCheckpointManifest(write_store_id, upload_sequence).toFullKey();
auto checkpoint_info = std::make_shared<CheckpointInfo>();
checkpoint_info->remote_store_id = write_store_id;
checkpoint_info->region_id = 1000;
checkpoint_info->checkpoint_data_holder = buildParsedCheckpointData(*db_context, manifest_key, /*dir_seq*/ 100);
checkpoint_info->temp_ps = checkpoint_info->checkpoint_data_holder->getUniversalPageStorage();
resetStoreId(current_store_id);
{
auto table_column_defines = DMTestEnv::getDefaultColumns();

store = reload(table_column_defines);
}

auto segments = store->buildSegmentsFromCheckpointInfo(
*db_context,
GeneralCancelHandle::genNotCanceled(),
db_context->getSettingsRef(),
RowKeyRange::newAll(false, 1),
checkpoint_info);
auto start = RecordKVFormat::genKey(table_id, 0);
auto end = RecordKVFormat::genKey(table_id, 10);
RegionPtr dummy_region = tests::makeRegion(checkpoint_info->region_id, start, end, nullptr);
store->ingestSegmentsFromCheckpointInfo(
*db_context,
db_context->getSettingsRef(),
RowKeyRange::newAll(false, 1),
std::make_shared<CheckpointIngestInfo>(
db_context->getTMTContext(),
checkpoint_info->region_id,
2333,
checkpoint_info->remote_store_id,
dummy_region,
std::move(segments),
0));

// check data file lock exists
{
const auto data_key = S3::S3Filename::newCheckpointData(write_store_id, upload_sequence, 0).toFullKey();
const auto data_key_view = S3::S3FilenameView::fromKey(data_key);
const auto lock_prefix = data_key_view.getLockPrefix();
auto client = S3::ClientFactory::instance().sharedTiFlashClient();
std::set<String> lock_keys;
S3::listPrefix(*client, lock_prefix, [&](const Aws::S3::Model::Object & object) {
const auto & lock_key = object.GetKey();
// also store the object.GetLastModified() for removing
// outdated manifest objects
lock_keys.emplace(lock_key);
return DB::S3::PageResult{.num_keys = 1, .more = true};
});
// 2 lock files, 1 from write store, 1 from current store
ASSERT_EQ(lock_keys.size(), 2);
bool current_store_lock_exist = false;
for (const auto & lock_key : lock_keys)
{
auto lock_key_view = S3::S3FilenameView::fromKey(lock_key);
ASSERT_TRUE(lock_key_view.isLockFile());
auto lock_info = lock_key_view.getLockInfo();
if (lock_info.store_id == current_store_id)
current_store_lock_exist = true;
}
ASSERT_TRUE(current_store_lock_exist);
}

verifyRows(
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
num_rows_write / 2 + 2 * num_rows_write);

reload();

verifyRows(
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
num_rows_write / 2 + 2 * num_rows_write);
}
CATCH
// TEST_P(DeltaMergeStoreTestFastAddPeer, SimpleWriteReadAfterRestoreFromCheckPoint)
// try
// {
// UInt64 write_store_id = current_store_id + 1;
// resetStoreId(write_store_id);
// {
// auto table_column_defines = DMTestEnv::getDefaultColumns();

// store = reload(table_column_defines);
// }

// const size_t num_rows_write = 128;
// // write DMFile
// {
// Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
// store->write(*db_context, db_context->getSettingsRef(), block);
// store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
// store->mergeDeltaAll(*db_context);
// }

// // Write ColumnFileTiny
// {
// Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write, num_rows_write + num_rows_write, false);
// store->write(*db_context, db_context->getSettingsRef(), block);
// store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
// }

// // write ColumnFileDeleteRange
// {
// HandleRange handle_range(0, num_rows_write / 2);
// store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::fromHandleRange(handle_range));
// store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
// }

// // write ColumnFileBig
// {
// Block block = DMTestEnv::prepareSimpleWriteBlock(
// num_rows_write + num_rows_write,
// num_rows_write + 2 * num_rows_write,
// false);
// auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
// auto [range, file_ids] = genDMFile(*dm_context, block);
// {
// // Mock DMFiles are uploaded to S3 in SSTFilesToDTFilesOutputStream
// auto remote_store = db_context->getSharedContextDisagg()->remote_data_store;
// ASSERT_NE(remote_store, nullptr);
// auto delegator = dm_context->path_pool->getStableDiskDelegator();
// for (const auto & file_id : file_ids)
// {
// auto dm_file = DMFile::restore(
// db_context->getFileProvider(),
// file_id.id,
// file_id.id,
// delegator.getDTFilePath(file_id.id),
// DMFileMeta::ReadMode::all());
// remote_store->putDMFile(
// dm_file,
// S3::DMFileOID{
// .store_id = write_store_id,
// .keyspace_id = keyspace_id,
// .table_id = store->physical_table_id,
// .file_id = file_id.id,
// },
// true);
// }
// }
// store->ingestFiles(dm_context, range, file_ids, false);
// store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
// }

// dumpCheckpoint(write_store_id);

// clearData();

// verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0);

// setStorageFormat(restore_format_version);

// const auto manifest_key = S3::S3Filename::newCheckpointManifest(write_store_id, upload_sequence).toFullKey();
// auto checkpoint_info = std::make_shared<CheckpointInfo>();
// checkpoint_info->remote_store_id = write_store_id;
// checkpoint_info->region_id = 1000;
// checkpoint_info->checkpoint_data_holder = buildParsedCheckpointData(*db_context, manifest_key, /*dir_seq*/ 100);
// checkpoint_info->temp_ps = checkpoint_info->checkpoint_data_holder->getUniversalPageStorage();
// resetStoreId(current_store_id);
// {
// auto table_column_defines = DMTestEnv::getDefaultColumns();

// store = reload(table_column_defines);
// }

// auto segments = store->buildSegmentsFromCheckpointInfo(
// *db_context,
// GeneralCancelHandle::genNotCanceled(),
// db_context->getSettingsRef(),
// RowKeyRange::newAll(false, 1),
// checkpoint_info);
// auto start = RecordKVFormat::genKey(table_id, 0);
// auto end = RecordKVFormat::genKey(table_id, 10);
// RegionPtr dummy_region = tests::makeRegion(checkpoint_info->region_id, start, end, nullptr);
// store->ingestSegmentsFromCheckpointInfo(
// *db_context,
// db_context->getSettingsRef(),
// RowKeyRange::newAll(false, 1),
// std::make_shared<CheckpointIngestInfo>(
// db_context->getTMTContext(),
// checkpoint_info->region_id,
// 2333,
// checkpoint_info->remote_store_id,
// dummy_region,
// std::move(segments),
// 0));

// // check data file lock exists
// {
// const auto data_key = S3::S3Filename::newCheckpointData(write_store_id, upload_sequence, 0).toFullKey();
// const auto data_key_view = S3::S3FilenameView::fromKey(data_key);
// const auto lock_prefix = data_key_view.getLockPrefix();
// auto client = S3::ClientFactory::instance().sharedTiFlashClient();
// std::set<String> lock_keys;
// S3::listPrefix(*client, lock_prefix, [&](const Aws::S3::Model::Object & object) {
// const auto & lock_key = object.GetKey();
// // also store the object.GetLastModified() for removing
// // outdated manifest objects
// lock_keys.emplace(lock_key);
// return DB::S3::PageResult{.num_keys = 1, .more = true};
// });
// // 2 lock files, 1 from write store, 1 from current store
// ASSERT_EQ(lock_keys.size(), 2);
// bool current_store_lock_exist = false;
// for (const auto & lock_key : lock_keys)
// {
// auto lock_key_view = S3::S3FilenameView::fromKey(lock_key);
// ASSERT_TRUE(lock_key_view.isLockFile());
// auto lock_info = lock_key_view.getLockInfo();
// if (lock_info.store_id == current_store_id)
// current_store_lock_exist = true;
// }
// ASSERT_TRUE(current_store_lock_exist);
// }

// verifyRows(
// RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
// num_rows_write / 2 + 2 * num_rows_write);

// reload();

// verifyRows(
// RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
// num_rows_write / 2 + 2 * num_rows_write);
// }
// CATCH

TEST_P(DeltaMergeStoreTestFastAddPeer, SimpleWriteReadAfterRestoreFromCheckPointWithSplit)
try
Expand Down
Loading

0 comments on commit bed9de6

Please sign in to comment.