Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-handle SSTFiles to DTFiles when applying snapshots (#1439) #1867

Merged
merged 9 commits into from
May 18, 2021
Merged
10 changes: 6 additions & 4 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ Tests <!-- At least one of them must be included. -->

Side effects

# - Performance regression
# - Consumes more CPU
# - Consumes more MEM
# - Breaking backward compatibility
<!--
- Performance regression
- Consumes more CPU
- Consumes more MEM
- Breaking backward compatibility
-->

### Release note <!-- bugfixes or new feature need a release note -->

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ namespace ErrorCodes
extern const int PAGE_SIZE_NOT_MATCH = 9006;
extern const int ILLFORMED_PAGE_NAME = 9007;
extern const int ILLFORMAT_RAFT_ROW = 9008;
extern const int REGION_DATA_SCHEMA_UPDATED = 9009;

extern const int LOCK_EXCEPTION = 10000;
extern const int VERSION_ERROR = 10001;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_before_mpp_register_tunnel_for_root_mpp_task) \
M(exception_before_mpp_root_task_run) \
M(exception_during_mpp_root_task_run) \
M(exception_during_write_to_storage)
M(exception_during_write_to_storage) \
M(force_set_sst_to_dtfile_block_size) \
M(force_set_sst_decode_rand)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) \
M(pause_after_learner_read) \
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace DB
M(tiflash_storage_write_amplification, "The data write amplification in storage engine", Gauge) \
M(tiflash_storage_read_tasks_count, "Total number of storage engine read tasks", Counter) \
M(tiflash_storage_command_count, "Total number of storage's command, such as delete range / shutdown /startup", Counter, \
F(type_delete_range, {"type", "delete_range"})) \
F(type_delete_range, {"type", "delete_range"}), F(type_ingest, {"type", "ingest"})) \
M(tiflash_storage_subtask_count, "Total number of storage's sub task", Counter, F(type_delta_merge, {"type", "delta_merge"}), \
F(type_delta_merge_fg, {"type", "delta_merge_fg"}), F(type_delta_merge_bg_gc, {"type", "delta_merge_bg_gc"}), \
F(type_delta_compact, {"type", "delta_compact"}), F(type_delta_flush, {"type", "delta_flush"}), \
Expand All @@ -101,11 +101,13 @@ namespace DB
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_storage_throughput_bytes, "Calculate the throughput of tasks of storage in bytes", Gauge, /**/ \
F(type_write, {"type", "write"}), /**/ \
F(type_ingest, {"type", "ingest"}), /**/ \
F(type_delta_merge, {"type", "delta_merge"}), /**/ \
F(type_split, {"type", "split"}), /**/ \
F(type_merge, {"type", "merge"})) /**/ \
M(tiflash_storage_throughput_rows, "Calculate the throughput of tasks of storage in rows", Gauge, /**/ \
F(type_write, {"type", "write"}), /**/ \
F(type_ingest, {"type", "ingest"}), /**/ \
F(type_delta_merge, {"type", "delta_merge"}), /**/ \
F(type_split, {"type", "split"}), /**/ \
F(type_merge, {"type", "merge"})) /**/ \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/IBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class IBlockInputStream : private boost::noncopyable

protected:
BlockInputStreams children;
std::shared_mutex children_mutex;
mutable std::shared_mutex children_mutex;

private:
TableStructureReadLocks table_locks;
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/DataTypes/tests/gtest_data_type_get_common_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@

#include <sstream>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#include <gtest/gtest.h>
#pragma GCC diagnostic pop

namespace DB
{
namespace tests
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ DBGInvoker::DBGInvoker()

regSchemalessFunc("region_snapshot", MockRaftCommand::dbgFuncRegionSnapshot);
regSchemalessFunc("region_snapshot_data", MockRaftCommand::dbgFuncRegionSnapshotWithData);
regSchemalessFunc("region_snapshot_pre_handle_block", MockRaftCommand::dbgFuncRegionSnapshotPreHandleBlock);
regSchemalessFunc("region_snapshot_apply_block", MockRaftCommand::dbgFuncRegionSnapshotApplyBlock);
regSchemalessFunc("region_snapshot_pre_handle_block", /**/ MockRaftCommand::dbgFuncRegionSnapshotPreHandleBlock);
regSchemalessFunc("region_snapshot_apply_block", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyBlock);
regSchemalessFunc("region_snapshot_pre_handle_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles);
regSchemalessFunc("region_snapshot_apply_file", /* */ MockRaftCommand::dbgFuncRegionSnapshotApplyDTFiles);
regSchemalessFunc("region_ingest_sst", MockRaftCommand::dbgFuncIngestSST);

regSchemalessFunc("init_fail_point", DbgFailPointFunc::dbgInitFailPoint);
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ static const String MPP_QUERY = "mpp_query";
static const String USE_BROADCAST_JOIN = "use_broadcast_join";
static const String MPP_PARTITION_NUM = "mpp_partition_num";
static const String MPP_TIMEOUT = "mpp_timeout";
static const String LOCAL_HOST = "127.0.0.1:3930";
static String LOCAL_HOST = "127.0.0.1:3930";

namespace Debug
{
void setServiceAddr(const std::string & addr) { LOCAL_HOST = addr; }
} // namespace Debug

struct DAGProperties
{
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Debug/dbgFuncCoprocessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ BlockInputStreamPtr dbgFuncTiDBQuery(Context & context, const ASTs & args);
// ./storages-client.sh "DBGInvoke mock_dag(query, region_id[, start_ts])"
BlockInputStreamPtr dbgFuncMockTiDBQuery(Context & context, const ASTs & args);

namespace Debug
{
void setServiceAddr(const std::string & addr);
}

} // namespace DB
10 changes: 10 additions & 0 deletions dbms/src/Debug/dbgFuncMockRaftCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ struct MockRaftCommand
// Usage:
// ./storages-client.sh "DBGInvoke region_snapshot_apply_block(region_id)"
static void dbgFuncRegionSnapshotApplyBlock(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Simulate a region pre-handle snapshot data to DTFiles
// Usage:
// ./storage-client.sh "DBGInvoke region_snapshot_pre_handle_file(database_name, table_name, region_id, start, end, schema_string, pk_name[, test-fields=1, cfs="write,default"])"
static void dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Apply snapshot for a region. (apply a pre-handle snapshot)
// Usage:
// ./storages-client.sh "DBGInvoke region_snapshot_apply_file(region_id)"
static void dbgFuncRegionSnapshotApplyDTFiles(Context & context, const ASTs & args, DBGInvoker::Printer output);
};

} // namespace DB
127 changes: 123 additions & 4 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ namespace DB

namespace FailPoints
{
extern const char force_set_prehandle_dtfile_block_size[];
}
extern const char force_set_sst_to_dtfile_block_size[];
extern const char force_set_sst_decode_rand[];
} // namespace FailPoints

namespace ErrorCodes
{
Expand Down Expand Up @@ -143,7 +144,7 @@ void MockRaftCommand::dbgFuncRegionSnapshotWithData(Context & context, const AST

// Mock to apply a snapshot with data in `region`
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplySnapshot(region, tmt);
context.getTMTContext().getKVStore()->checkAndApplySnapshot<RegionPtrWithBlock>(region, tmt);
std::stringstream ss;
ss << "put region #" << region_id << ", range" << range_string << " to table #" << table_id << " with " << cnt << " records";
output(ss.str());
Expand Down Expand Up @@ -402,6 +403,7 @@ void MockRaftCommand::dbgFuncIngestSST(Context & context, const ASTs & args, DBG
auto & kvstore = tmt.getKVStore();
auto region = kvstore->getRegion(region_id);

FailPointHelper::enableFailPoint(FailPoints::force_set_sst_decode_rand);
// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, region);

Expand Down Expand Up @@ -517,12 +519,129 @@ void MockRaftCommand::dbgFuncRegionSnapshotApplyBlock(Context & context, const A
RegionID region_id = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args.front()).value);
auto [region, block_cache] = GLOBAL_REGION_MAP.popRegionCache("__snap_" + std::to_string(region_id));
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplySnapshot({region, std::move(block_cache)}, tmt);
context.getTMTContext().getKVStore()->checkAndApplySnapshot<RegionPtrWithBlock>({region, std::move(block_cache)}, tmt);

std::stringstream ss;
ss << "success apply " << region->id() << " with block cache";
output(ss.str());
}


/// Mock to pre-decode snapshot to DTFile(s) then apply

// Simulate a region pre-handle snapshot data to DTFiles
// ./storage-client.sh "DBGInvoke region_snapshot_pre_handle_file(database_name, table_name, region_id, start, end, schema_string, pk_name[, test-fields=1, cfs="write,default"])"
void MockRaftCommand::dbgFuncRegionSnapshotPreHandleDTFiles(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 7 || args.size() > 9)
throw Exception("Args not matched, should be: database_name, table_name, region_id, start, end, schema_string, pk_name"
" [, test-fields, cfs=\"write,default\"]",
ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
RegionID region_id = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
RegionID start_handle = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[3]).value);
RegionID end_handle = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[4]).value);

const String schema_str = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[5]).value);
String handle_pk_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[6]).value);

UInt64 test_fields = 1;
if (args.size() > 7)
test_fields = (UInt64)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[7]).value);
std::unordered_set<ColumnFamilyType> cfs;
{
String cfs_str = "write,default";
if (args.size() > 8)
cfs_str = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[8]).value);
if (cfs_str.find("write") != std::string::npos)
cfs.insert(ColumnFamilyType::Write);
if (cfs_str.find("default") != std::string::npos)
cfs.insert(ColumnFamilyType::Default);
}

// Parse a TableInfo from `schema_str` to generate data with this schema
TiDB::TableInfoPtr mocked_table_info;
{
ASTPtr columns_ast;
ParserColumnDeclarationList schema_parser;
Tokens tokens(schema_str.data(), schema_str.data() + schema_str.length());
TokenIterator pos(tokens);
Expected expected;
if (!schema_parser.parse(pos, columns_ast, expected))
throw Exception("Invalid TiDB table schema", ErrorCodes::LOGICAL_ERROR);
ColumnsDescription columns
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);
mocked_table_info = MockTiDB::parseColumns(table_name, columns, handle_pk_name, "dt");
}

MockTiDB::TablePtr table = MockTiDB::instance().getTableByName(database_name, table_name);
const auto & table_info = RegionBench::getTableInfo(context, database_name, table_name);
if (table_info.is_common_handle)
throw Exception("Mocking pre handle SST files to DTFiles to a common handle table is not supported", ErrorCodes::LOGICAL_ERROR);

// Mock SST data for handle [start, end)
const auto region_name = "__snap_snap_" + std::to_string(region_id);
GenMockSSTData(*mocked_table_info, table->id(), region_name, start_handle, end_handle, test_fields, cfs);

auto & tmt = context.getTMTContext();
auto & kvstore = tmt.getKVStore();
auto old_region = kvstore->getRegion(region_id);

// We may call this function mutiple time to mock some situation, try to reuse the region in `GLOBAL_REGION_MAP`
// so that we can collect uncommitted data.
UInt64 index = MockTiKV::instance().getRaftIndex(region_id) + 1;
RegionPtr new_region = RegionBench::createRegion(table->id(), region_id, start_handle, end_handle, index);

// Register some mock SST reading methods so that we can decode data in `MockSSTReader::MockSSTData`
RegionMockTest mock_test(kvstore, new_region);

std::vector<SSTView> sst_views;
{
if (cfs.count(ColumnFamilyType::Write) > 0)
sst_views.push_back(SSTView{
ColumnFamilyType::Write,
BaseBuffView{region_name.data(), region_name.length()},
});
if (cfs.count(ColumnFamilyType::Default) > 0)
sst_views.push_back(SSTView{
ColumnFamilyType::Default,
BaseBuffView{region_name.data(), region_name.length()},
});
}

// set block size so that we can test for schema-sync while decoding dt files
FailPointHelper::enableFailPoint(FailPoints::force_set_sst_to_dtfile_block_size);

auto ingest_ids = kvstore->preHandleSnapshotToFiles(
new_region, SSTViewVec{sst_views.data(), sst_views.size()}, index, MockTiKV::instance().getRaftTerm(region_id), tmt);
GLOBAL_REGION_MAP.insertRegionSnap(region_name, {new_region, ingest_ids});

{
std::stringstream ss;
ss << "Generate " << ingest_ids.size() << " files for [region_id=" << region_id << "]";
output(ss.str());
}
}

// Apply snapshot for a region. (apply a pre-handle snapshot)
// ./storages-client.sh "DBGInvoke region_snapshot_apply_file(region_id)"
void MockRaftCommand::dbgFuncRegionSnapshotApplyDTFiles(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
throw Exception("Args not matched, should be: region-id", ErrorCodes::BAD_ARGUMENTS);

RegionID region_id = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args.front()).value);
const auto region_name = "__snap_snap_" + std::to_string(region_id);
auto [new_region, ingest_ids] = GLOBAL_REGION_MAP.popRegionSnap(region_name);
auto & tmt = context.getTMTContext();
context.getTMTContext().getKVStore()->checkAndApplySnapshot<RegionPtrWithSnapshotFiles>(
RegionPtrWithSnapshotFiles{new_region, std::move(ingest_ids)}, tmt);

std::stringstream ss;
ss << "success apply region " << new_region->id() << " with dt files";
output(ss.str());
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncMockTiDBTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ struct MockTiDBTable

// Inject mocked TiDB table.
// Usage:
// ./storages-client.sh "DBGInvoke mock_tidb_table(database_name, table_name, 'col1 type1, col2 type2, ...' [, handle_pk_name, engine-type(tmt|dt)])"

// ./storages-client.sh "DBGInvoke mock_tidb_table(database_name, table_name, 'col1 type1, col2 type2, ...' [, handle_pk_name, engine-type(tmt|dt)])"
// engine: [tmt, dt], tmt by default
static void dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer

TMTContext & tmt = context.getTMTContext();
RegionPtr region = RegionBench::createRegion(table_info, region_id, start_keys, end_keys);
tmt.getKVStore()->onSnapshot(region, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);

std::stringstream ss;
ss << "put region #" << region_id << ", range" << RecordKVFormat::DecodedTiKVKeyRangeToDebugString(region->getRange()->rawKeys())
Expand All @@ -74,7 +74,7 @@ void dbgFuncPutRegion(Context & context, const ASTs & args, DBGInvoker::Printer

TMTContext & tmt = context.getTMTContext();
RegionPtr region = RegionBench::createRegion(table_id, region_id, start, end);
tmt.getKVStore()->onSnapshot(region, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);

std::stringstream ss;
ss << "put region #" << region_id << ", range[" << start << ", " << end << ")"
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ void concurrentBatchInsert(const TiDB::TableInfo & table_info, Int64 concurrent_

Regions regions = createRegions(table_info.id, concurrent_num, key_num_each_region, handle_begin, curr_max_region_id + 1);
for (const RegionPtr & region : regions)
tmt.getKVStore()->onSnapshot(region, nullptr, 0, tmt);
tmt.getKVStore()->onSnapshot<RegionPtrWithBlock>(region, nullptr, 0, tmt);

std::list<std::thread> threads;
for (Int64 i = 0; i < concurrent_num; i++, handle_begin += key_num_each_region)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/tests/gtest_arithmetic_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <gtest/gtest.h>

#pragma GCC diagnostic pop

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/tests/gtest_datetime_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#include <Poco/Types.h>
#include <gtest/gtest.h>

#pragma GCC diagnostic pop

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/tests/gtest_strings_pad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#include <Poco/Types.h>
#include <gtest/gtest.h>

#pragma GCC diagnostic pop

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Functions/tests/gtest_strings_trim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsign-compare"
#include <Poco/Types.h>
#include <gtest/gtest.h>

#pragma GCC diagnostic pop

Expand Down
Loading