Skip to content

Commit

Permalink
support tiflash_fastscan variable from tidb, remove fast mode in tabl…
Browse files Browse the repository at this point in the history
…e_info, and change name fast mode to fast scan (#5589)

ref #5252
  • Loading branch information
hongyunyan authored Aug 30, 2022
1 parent 6056cc0 commit e5ff89c
Show file tree
Hide file tree
Showing 32 changed files with 267 additions and 352 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
2 changes: 0 additions & 2 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ DBGInvoker::DBGInvoker()
regSchemafulFunc("query_mapped", dbgFuncQueryMapped);
regSchemalessFunc("get_tiflash_replica_count", dbgFuncGetTiflashReplicaCount);
regSchemalessFunc("get_partition_tables_tiflash_replica_count", dbgFuncGetPartitionTablesTiflashReplicaCount);
regSchemalessFunc("get_tiflash_mode", dbgFuncGetTiflashMode);
regSchemalessFunc("get_partition_tables_tiflash_mode", dbgFuncGetPartitionTablesTiflashMode);

regSchemalessFunc("search_log_for_key", dbgFuncSearchLogForKey);
regSchemalessFunc("tidb_dag", dbgFuncTiDBQueryFromNaturalDag);
Expand Down
53 changes: 0 additions & 53 deletions dbms/src/Debug/dbgFuncSchemaName.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,57 +180,4 @@ void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs

output(fmt_buf.toString());
}

void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.empty() || args.size() != 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
FmtBuffer fmt_buf;

const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
auto mapped = mappedTable(context, database_name, table_name);
auto storage = context.getTable(mapped->first, mapped->second);
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);

fmt_buf.append((TiFlashModeToString(managed_storage->getTableInfo().tiflash_mode)));

output(fmt_buf.toString());
}

void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.empty() || args.size() != 2)
throw Exception("Args not matched, should be: database-name[, table-name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
FmtBuffer fmt_buf;

const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
auto mapped = mappedTable(context, database_name, table_name);
auto storage = context.getTable(mapped->first, mapped->second);
auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
if (!managed_storage)
throw Exception(database_name + "." + table_name + " is not ManageableStorage", ErrorCodes::BAD_ARGUMENTS);

auto table_info = managed_storage->getTableInfo();

if (!table_info.isLogicalPartitionTable())
throw Exception(database_name + "." + table_name + " is not logical partition table", ErrorCodes::BAD_ARGUMENTS);

SchemaNameMapper name_mapper;
for (const auto & part_def : table_info.partition.definitions)
{
auto paritition_table_info = table_info.producePartitionTableInfo(part_def.id, name_mapper);
auto partition_storage = context.getTMTContext().getStorages().get(paritition_table_info->id);
fmt_buf.append((TiFlashModeToString(partition_storage->getTableInfo().tiflash_mode)));
fmt_buf.append("/");
}

output(fmt_buf.toString());
}

} // namespace DB
10 changes: 0 additions & 10 deletions dbms/src/Debug/dbgFuncSchemaName.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,4 @@ void dbgFuncGetTiflashReplicaCount(Context & context, const ASTs & args, DBGInvo
// ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_replica_count(db_name, table_name)"
void dbgFuncGetPartitionTablesTiflashReplicaCount(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Get table's tiflash mode with mapped table name
// Usage:
// ./storage-client.sh "DBGInvoke get_tiflash_mode(db_name, table_name)"
void dbgFuncGetTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Get the logical table's partition tables' tiflash replica counts with mapped table name
// Usage:
// ./storage-client.sh "DBGInvoke get_partition_tables_tiflash_mode(db_name, table_name)"
void dbgFuncGetPartitionTablesTiflashMode(Context & context, const ASTs & args, DBGInvoker::Printer output);

} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
context.getTimezoneInfo());
query_info.req_id = fmt::format("{} Table<{}>", log->identifier(), table_id);
query_info.keep_order = table_scan.keepOrder();
query_info.is_fast_scan = table_scan.isFastScan();
return query_info;
};
if (table_scan.isPartitionTableScan())
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ TiDBTableScan::TiDBTableScan(
// Only No-partition table need keep order when tablescan executor required keep order.
// If keep_order is not set, keep order for safety.
, keep_order(!is_partition_table_scan && (table_scan->tbl_scan().keep_order() || !table_scan->tbl_scan().has_keep_order()))
, is_fast_scan(table_scan->tbl_scan().is_fast_scan())
{
if (is_partition_table_scan)
{
Expand Down Expand Up @@ -73,6 +74,7 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
tipb_table_scan->set_next_read_engine(tipb::EngineType::Local);
for (auto id : partition_table_scan.primary_prefix_column_ids())
tipb_table_scan->add_primary_prefix_column_ids(id);
tipb_table_scan->set_is_fast_scan(partition_table_scan.is_fast_scan());
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class TiDBTableScan
return keep_order;
}

bool isFastScan() const
{
return is_fast_scan;
}

private:
const tipb::Executor * table_scan;
String executor_id;
Expand All @@ -71,6 +76,7 @@ class TiDBTableScan
std::vector<Int64> physical_table_ids;
Int64 logical_table_id;
bool keep_order;
bool is_fast_scan;
};

} // namespace DB
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
const RSOperatorPtr & filter,
const String & tracing_id,
bool keep_order,
bool is_fast_mode,
bool is_fast_scan,
size_t expected_block_size,
const SegmentIdSet & read_segments,
size_t extra_table_id_index)
Expand Down Expand Up @@ -1277,8 +1277,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw = */ is_fast_mode,
/* do_delete_mark_filter_for_raw = */ is_fast_mode,
/* is_raw = */ is_fast_scan,
/* do_delete_mark_filter_for_raw = */ is_fast_scan,
std::move(tasks),
after_segment_read);

Expand Down Expand Up @@ -1308,8 +1308,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
filter,
max_version,
expected_block_size,
/* is_raw_ */ is_fast_mode,
/* do_delete_mark_filter_for_raw_ */ is_fast_mode,
/* is_raw_= */ is_fast_scan,
/* do_delete_mark_filter_for_raw_= */ is_fast_scan,
extra_table_id_index,
physical_table_id,
req_info);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,9 @@ class DeltaMergeStore : private boost::noncopyable


/// Read rows in two modes:
/// when is_fast_mode == false, we are in normal mode. Thus we will read rows with MVCC filtering, del mark !=0 filter and sorted merge
/// when is_fast_mode == true, we are in fast mode. Thus we will read rows without MVCC and sorted merge
/// `sorted_ranges` should be already sorted and merged
/// when is_fast_scan == false, we will read rows with MVCC filtering, del mark !=0 filter and sorted merge.
/// when is_fast_scan == true, we will read rows without MVCC and sorted merge.
/// `sorted_ranges` should be already sorted and merged.
BlockInputStreams read(const Context & db_context,
const DB::Settings & db_settings,
const ColumnDefines & columns_to_read,
Expand All @@ -371,7 +371,7 @@ class DeltaMergeStore : private boost::noncopyable
const RSOperatorPtr & filter,
const String & tracing_id,
bool keep_order,
bool is_fast_mode = false, // set true when read in fast mode
bool is_fast_scan = false,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
const SegmentIdSet & read_segments = {},
size_t extra_table_id_index = InvalidColumnID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
is_common_handle,
enable_handle_clean_read,
enable_del_clean_read,
is_fast_mode,
is_fast_scan,
max_data_version,
std::move(pack_filter),
mark_cache,
Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,19 @@ class DMFileBlockInputStreamBuilder
// **** filters **** //

// Only set enable_handle_clean_read_ param to true when
// in normal mode:
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast mode:
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// Only set is_fast_mode_ param to true when read in fast mode.
// Only set enable_del_clean_read_ param to true when you don't need del columns in fast mode.
// in normal mode (is_fast_scan_ == false):
// 1. There is no delta.
// 2. You don't need pk, version and delete_tag columns
// in fast scan mode (is_fast_scan_ == true):
// 1. You don't need pk columns
// If you have no idea what it means, then simply set it to false.
// Only set enable_del_clean_read_ param to true when you don't need del columns in fast scan.
// `max_data_version_` is the MVCC filter version for reading. Used by clean read check
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable_handle_clean_read_, bool is_fast_mode_, bool enable_del_clean_read_, UInt64 max_data_version_)
DMFileBlockInputStreamBuilder & enableCleanRead(bool enable_handle_clean_read_, bool is_fast_scan_, bool enable_del_clean_read_, UInt64 max_data_version_)
{
enable_handle_clean_read = enable_handle_clean_read_;
enable_del_clean_read = enable_del_clean_read_;
is_fast_mode = is_fast_mode_;
is_fast_scan = is_fast_scan_;
max_data_version = max_data_version_;
return *this;
}
Expand Down Expand Up @@ -159,8 +158,9 @@ class DMFileBlockInputStreamBuilder
FileProviderPtr file_provider;

// clean read

bool enable_handle_clean_read = false;
bool is_fast_mode = false;
bool is_fast_scan = false;
bool enable_del_clean_read = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ DMFileReader::DMFileReader(
// clean read
bool enable_handle_clean_read_,
bool enable_del_clean_read_,
bool is_fast_mode_,
bool is_fast_scan_,
UInt64 max_read_version_,
// filters
DMFilePackFilter && pack_filter_,
Expand All @@ -233,7 +233,7 @@ DMFileReader::DMFileReader(
, single_file_mode(dmfile_->isSingleFileMode())
, enable_handle_clean_read(enable_handle_clean_read_)
, enable_del_clean_read(enable_del_clean_read_)
, is_fast_mode(is_fast_mode_)
, is_fast_scan(is_fast_scan_)
, max_read_version(max_read_version_)
, pack_filter(std::move(pack_filter_))
, skip_packs_by_column(read_columns.size(), 0)
Expand Down Expand Up @@ -364,10 +364,11 @@ Block DMFileReader::read()
}

// TODO: this will need better algorithm: we should separate those packs which can and can not do clean read.
bool do_clean_read_on_normal_mode = enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_mode);
bool do_clean_read_on_normal_mode = enable_handle_clean_read && expected_handle_res == All && not_clean_rows == 0 && (!is_fast_scan);

bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_scan && expected_handle_res == All;
bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_scan && deleted_rows == 0;

bool do_clean_read_on_handle_on_fast_mode = enable_handle_clean_read && is_fast_mode && expected_handle_res == All;
bool do_clean_read_on_del_on_fast_mode = enable_del_clean_read && is_fast_mode && deleted_rows == 0;

if (do_clean_read_on_normal_mode)
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class DMFileReader
// If you have no idea what it means, then simply set it to false.
bool enable_handle_clean_read_,
bool enable_del_clean_read_,
bool is_fast_mode_,
bool is_fast_scan_,
// The the MVCC filter version. Used by clean read check.
UInt64 max_read_version_,
// filters
Expand Down Expand Up @@ -148,7 +148,8 @@ class DMFileReader
// if we don't need del column, we will try to do clean read on del_column(enable_del_clean_read is true).
const bool enable_handle_clean_read;
const bool enable_del_clean_read;
const bool is_fast_mode;
const bool is_fast_scan;

const UInt64 max_read_version;

/// Filters
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
/// But this way seems not to be robustness enough, maybe we need another flag?
auto new_columns_to_read = std::make_shared<ColumnDefines>();

// new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column.
new_columns_to_read->reserve(columns_to_read.size() + 2);

new_columns_to_read->push_back(getExtraHandleColumnDefine(is_common_handle));
if (filter_delete_mark)
Expand Down Expand Up @@ -578,7 +580,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_context,
std::numeric_limits<UInt64>::max(),
expected_block_size,
/* enable_handle_clean_read */ enable_handle_clean_read,
/* is_fast_mode */ filter_delete_mark,
/* is_fast_scan */ filter_delete_mark,
/* enable_del_clean_read */ enable_del_clean_read);

BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,17 +330,17 @@ StableValueSpace::Snapshot::getInputStream(
UInt64 max_data_version,
size_t expected_block_size,
bool enable_handle_clean_read,
bool is_fast_mode,
bool is_fast_scan,
bool enable_del_clean_read)
{
LOG_FMT_DEBUG(log, "max_data_version: {}, enable_handle_clean_read: {}, is_fast_mode: {}, enable_del_clean_read: {}", max_data_version, enable_handle_clean_read, is_fast_mode, enable_del_clean_read);
LOG_FMT_DEBUG(log, "max_data_version: {}, enable_handle_clean_read: {}, is_fast_mode: {}, enable_del_clean_read: {}", max_data_version, enable_handle_clean_read, is_fast_scan, enable_del_clean_read);
SkippableBlockInputStreams streams;

for (size_t i = 0; i < stable->files.size(); i++)
{
DMFileBlockInputStreamBuilder builder(context.db_context);
builder
.enableCleanRead(enable_handle_clean_read, is_fast_mode, enable_del_clean_read, max_data_version)
.enableCleanRead(enable_handle_clean_read, is_fast_scan, enable_del_clean_read, max_data_version)
.setRSOperator(filter)
.setColumnCache(column_caches[i])
.setTracingID(context.tracing_id)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
UInt64 max_data_version,
size_t expected_block_size,
bool enable_handle_clean_read,
bool is_fast_mode = false,
bool is_fast_scan = false,
bool enable_del_clean_read = false);

RowsAndBytes getApproxRowsAndBytes(const DMContext & context, const RowKeyRange & range) const;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ target_link_libraries(dm_test_storage_delta_merge
add_executable(dm_test_delta_merge_store EXCLUDE_FROM_ALL gtest_dm_delta_merge_store.cpp)
target_link_libraries(dm_test_delta_merge_store dbms gtest_main clickhouse_functions)

add_executable(dm_test_delta_merge_store_for_fast_mode EXCLUDE_FROM_ALL gtest_dm_delta_merge_store_for_fast_mode.cpp)
target_link_libraries(dm_test_delta_merge_store_for_fast_mode dbms gtest_main clickhouse_functions)
add_executable(dm_test_delta_merge_store_for_fast_scan EXCLUDE_FROM_ALL gtest_dm_delta_merge_store_for_fast_scan.cpp)
target_link_libraries(dm_test_delta_merge_store_for_fast_scan dbms gtest_main clickhouse_functions)

add_executable(dm_test_segment EXCLUDE_FROM_ALL gtest_dm_segment.cpp)
target_link_libraries(dm_test_segment dbms gtest_main clickhouse_functions)
Expand Down
Loading

0 comments on commit e5ff89c

Please sign in to comment.