Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-175]eliminate assumption of existence of tidb_rowid #35

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 94 additions & 48 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,72 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

template<typename T, typename ELEM_T>
Block RangesFilterBlockInputStream::readProcess(Block & block, T column, ELEM_T)
{
size_t rows = block.rows();

auto handle_begin = column->getElement(0);
auto handle_end = column->getElement(rows - 1);
constexpr HandleID min = std::numeric_limits<HandleID>::min();
constexpr HandleID max = std::numeric_limits<HandleID>::max();
HandleID range_start = ranges.first;
HandleID range_end = ranges.second;
if (range_start == min)
{
range_start = std::numeric_limits<ELEM_T>::min();
}
if (range_end == max)
{
range_end = std::numeric_limits<ELEM_T>::max();
}

if (handle_begin >= static_cast<ELEM_T>(range_end) || static_cast<ELEM_T>(range_start) > handle_end)
return block.cloneEmpty();

if (handle_begin >= static_cast<ELEM_T>(range_start))
{
if (handle_end < static_cast<ELEM_T>(range_end))
{
return block;
}
else
{
size_t pos
= std::lower_bound(column->getData().cbegin(), column->getData().cend(), range_end) - column->getData().cbegin();
size_t pop_num = rows - pos;
for (size_t i = 0; i < block.columns(); i++)
{
ColumnWithTypeAndName & ori_column = block.getByPosition(i);
MutableColumnPtr mutable_holder = (*std::move(ori_column.column)).mutate();
mutable_holder->popBack(pop_num);
ori_column.column = std::move(mutable_holder);
}
}
}
else
{
size_t pos_begin
= std::lower_bound(column->getData().cbegin(), column->getData().cend(), range_start) - column->getData().cbegin();
size_t pos_end = rows;
if (handle_end >= static_cast<ELEM_T>(range_end))
pos_end = std::lower_bound(column->getData().cbegin(), column->getData().cend(), range_end) - column->getData().cbegin();

size_t len = pos_end - pos_begin;
if (!len)
return block.cloneEmpty();
for (size_t i = 0; i < block.columns(); i++)
{
ColumnWithTypeAndName & ori_column = block.getByPosition(i);
auto new_column = ori_column.column->cloneEmpty();
new_column->insertRangeFrom(*ori_column.column, pos_begin, len);
ori_column.column = std::move(new_column);
}
}

return block;
}

Block RangesFilterBlockInputStream::readImpl()
{
while (true)
Expand All @@ -22,60 +88,40 @@ Block RangesFilterBlockInputStream::readImpl()
throw Exception("RangesFilterBlockInputStream: block without _tidb_rowid.", ErrorCodes::LOGICAL_ERROR);

const ColumnWithTypeAndName & handle_column = block.getByName(handle_col_name);
const ColumnInt64 * column = typeid_cast<const ColumnInt64 *>(handle_column.column.get());
if (!column)
std::string handle_column_type_name(handle_column.type->getFamilyName());
if (handle_column_type_name == "Int64")
{
throw Exception("RangesFilterBlockInputStream: _tidb_rowid column should be type ColumnInt64.", ErrorCodes::LOGICAL_ERROR);
}

size_t rows = block.rows();

auto handle_begin = column->getElement(0);
auto handle_end = column->getElement(rows - 1);

if (handle_begin >= ranges.second || ranges.first > handle_end)
continue;

if (handle_begin >= ranges.first)
block = readProcess(block, typeid_cast<const ColumnInt64 *>(handle_column.column.get()), (ColumnInt64::value_type)0);
} else if (handle_column_type_name == "Int32")
{
if (handle_end < ranges.second)
{
return block;
}
else
{
size_t pos
= std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin();
size_t pop_num = rows - pos;
for (size_t i = 0; i < block.columns(); i++)
{
ColumnWithTypeAndName & ori_column = block.getByPosition(i);
MutableColumnPtr mutable_holder = (*std::move(ori_column.column)).mutate();
mutable_holder->popBack(pop_num);
ori_column.column = std::move(mutable_holder);
}
}
block = readProcess(block, typeid_cast<const ColumnInt32 *>(handle_column.column.get()), (ColumnInt32::value_type)0);
} else if (handle_column_type_name == "Int16")
{
block = readProcess(block, typeid_cast<const ColumnInt16 *>(handle_column.column.get()), (ColumnInt16::value_type)0);
} else if (handle_column_type_name == "Int8")
{
block = readProcess(block, typeid_cast<const ColumnInt8 *>(handle_column.column.get()), (ColumnInt8::value_type)0);
} else if (handle_column_type_name == "UInt64")
{
block = readProcess(block, typeid_cast<const ColumnUInt64 *>(handle_column.column.get()), (ColumnUInt64::value_type)0);
} else if (handle_column_type_name == "UInt32")
{
block = readProcess(block, typeid_cast<const ColumnUInt32 *>(handle_column.column.get()), (ColumnUInt32::value_type)0);
} else if (handle_column_type_name == "UInt16")
{
block = readProcess(block, typeid_cast<const ColumnUInt16 *>(handle_column.column.get()), (ColumnUInt16::value_type)0);
} else if (handle_column_type_name == "UInt8")
{
block = readProcess(block, typeid_cast<const ColumnUInt8 *>(handle_column.column.get()), (ColumnUInt8::value_type)0);
}
else
{
size_t pos_begin
= std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.first) - column->getData().cbegin();
size_t pos_end = rows;
if (handle_end >= ranges.second)
pos_end = std::lower_bound(column->getData().cbegin(), column->getData().cend(), ranges.second) - column->getData().cbegin();

size_t len = pos_end - pos_begin;
if (!len)
continue;
for (size_t i = 0; i < block.columns(); i++)
{
ColumnWithTypeAndName & ori_column = block.getByPosition(i);
auto new_column = ori_column.column->cloneEmpty();
new_column->insertRangeFrom(*ori_column.column, pos_begin, len);
ori_column.column = std::move(new_column);
}
throw Exception("RangesFilterBlockInputStream: handle column should be type ColumnInt64, ColumnInt32, ColumnInt16 or ColumnInt8.");
}
if (block.rows() == 0)
{
continue;
}

return block;
}
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataStreams/RangesFilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class RangesFilterBlockInputStream : public IProfilingBlockInputStream
const HandleRange ranges;
const String handle_col_name;
Logger * log = &Logger::get("RangesFilterBlockInputStream");

template<typename T, typename ELEM_T>
Block readProcess(Block & block, T column, ELEM_T elem);
};

} // namespace DB
26 changes: 24 additions & 2 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void MockTiDB::dropTable(const String & database_name, const String & table_name
tables_by_name.erase(it_by_name);
}

TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns)
TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, const String & primary_key)
{
std::lock_guard lock(tables_mutex);

Expand All @@ -78,6 +78,7 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
}

TableInfo table_info;
bool pk_is_handle = false;

if (databases.find(database_name) != databases.end())
{
Expand All @@ -95,6 +96,26 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
column_info.id = (i++);
column_info.name = column.name;
const IDataType *nested_type = column.type.get();
if (column.name == primary_key)
{
if (pk_is_handle)
{
throw Exception("primary key can only be one column");
}
column_info.setPriKeyFlag();
std::string handle_column_type_name = std::string(column.type->getFamilyName());
if ((handle_column_type_name == "Int64")
|| (handle_column_type_name == "Int32")
|| (handle_column_type_name == "Int16")
|| (handle_column_type_name == "Int8")
|| (handle_column_type_name == "UInt64")
|| (handle_column_type_name == "UInt32")
|| (handle_column_type_name == "UInt16")
|| (handle_column_type_name == "UInt8"))
{
pk_is_handle = true;
}
}
if (!column.type->isNullable())
{
column_info.setNotNullFlag();
Expand Down Expand Up @@ -127,7 +148,8 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
table_info.columns.emplace_back(column_info);
}

table_info.pk_is_handle = false;
table_info.pk_is_handle = pk_is_handle;

table_info.comment = "Mocked.";

auto table = std::make_shared<Table>(database_name, table_name, std::move(table_info));
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MockTiDB : public ext::singleton<MockTiDB>
public:
String getSchemaJson(TableID table_id);

TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns);
TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, const String & primary_key="");

void dropTable(const String & database_name, const String & table_name);

Expand Down
14 changes: 10 additions & 4 deletions dbms/src/Debug/dbgFuncMockTiDBData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,22 @@ void dbgFuncSetFlushThreshold(Context & context, const ASTs & args, DBGInvoker::

void dbgInsertRow(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() < 4)
throw Exception("Args not matched, should be: database-name, table-name, region-id, handle-id, values", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 3)
throw Exception("Args not matched, should be: database-name, table-name, region-id[, handle-id], values", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
RegionID region_id = (RegionID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
HandleID handle_id = (HandleID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[3]).value);
HandleID handle_id = 0;

MockTiDB::TablePtr table = MockTiDB::instance().getTableByName(database_name, table_name);
RegionBench::insert(table->table_info, region_id, handle_id, args.begin() + 4, args.end(), context);
int value_offset = 3;
if (!table->table_info.pk_is_handle)
{
value_offset = 4;
handle_id = (HandleID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*args[3]).value);
}
RegionBench::insert(table->table_info, region_id, handle_id, args.begin() + value_offset, args.end(), context);

std::stringstream ss;
ss << "wrote one row to " << database_name << "." + table_name << " region #" << region_id;
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@ void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::P

void dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3)
throw Exception("Args not matched, should be: database-name, table-name, schema-string", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 3)
throw Exception("Args not matched, should be: database-name, table-name, schema-string[, primary-key]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;

auto schema_str = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[2]).value);
String primary_key = "";

ASTPtr columns_ast;
if (args.size() == 4) {
primary_key = typeid_cast<const ASTIdentifier &>(*args[3]).name;
}

ParserColumnDeclarationList schema_parser;
Tokens tokens(schema_str.data(), schema_str.data() + schema_str.length());
TokenIterator pos(tokens);
Expand All @@ -63,7 +69,7 @@ void dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Prin
throw Exception("Invalid TiDB table schema", ErrorCodes::LOGICAL_ERROR);
ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);

TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns);
TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, primary_key);

std::stringstream ss;
ss << "mock table #" << table_id;
Expand Down
18 changes: 16 additions & 2 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,24 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
{
std::vector<Field> fields;
ASTs::const_iterator it;
int i = 0;
while ((it = begin++) != end)
{
auto field = typeid_cast<const ASTLiteral *>((*it).get())->value;
fields.emplace_back(field);
if (table_info.pk_is_handle && table_info.columns[i].hasPriKeyFlag())
{
handle_id = getFieldValue<Int64>(field);
} else
{
fields.emplace_back(field);
}
i++;
}
if (fields.size() != table_info.columns.size())
if (!((table_info.pk_is_handle && (fields.size() == table_info.columns.size() - 1))
|| (fields.size() == table_info.columns.size())))
{
throw Exception("Number of insert values and columns do not match.", ErrorCodes::LOGICAL_ERROR);
}

TiKVKey key = RecordKVFormat::genKey(table_info.id, handle_id);
TiKVValue value = RecordKVFormat::EncodeRow(table_info, fields);
Expand Down Expand Up @@ -234,6 +245,9 @@ struct BatchCtrl
for (size_t i = 0; i < table_info.columns.size(); i++)
{
const TiDB::ColumnInfo & column = table_info.columns[i];
if (table_info.pk_is_handle && column.hasPriKeyFlag()) {
continue;
}
EncodeDatum(ss, TiDB::CodecFlagInt, column.id);
EncodeDatum(ss, column.getCodecFlag(), magic_num);
}
Expand Down
26 changes: 25 additions & 1 deletion dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti
row.reserve(table_info.columns.size() * 2);
for (const TiDB::ColumnInfo & column : table_info.columns)
{
if (table_info.pk_is_handle && column.hasPriKeyFlag())
{
continue;
}
row.push_back(Field(column.id));
row.push_back(MockDecodeRow(column.getCodecFlag()));
}
Expand Down Expand Up @@ -150,11 +154,31 @@ Block RegionBlockRead(const TiDB::TableInfo & table_info, const ColumnsDescripti
Field date_field(static_cast<Int64>(date));
it->second.first->insert(date_field);
}
} else if (col_id.get<Int64>() == handle_id) {
throw Exception("handle column is not expected to be encoded in value");
} else {
it->second.first->insert(row[i + 1]);
}
}
column_map[handle_id].first->insert(Field(handle));
// TODO: use InvalidHandleID
std::string handle_column_type_name(column_map[handle_id].second.type->getFamilyName());
if (handle_column_type_name == "Int64"
|| handle_column_type_name == "Int32"
|| handle_column_type_name == "Int16"
|| handle_column_type_name == "Int8") {
column_map[handle_id].first->insert(Field((Int64)handle));
} else if (handle_column_type_name == "UInt64"
|| handle_column_type_name == "UInt32"
|| handle_column_type_name == "UInt16"
|| handle_column_type_name == "UInt8")
{
column_map[handle_id].first->insert(Field(handle));
} else
{
throw Exception("handle column type must be one of int type");
}


}

next_table_id = scanner->hasNext();
Expand Down
Loading