Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tiflash into refi…
Browse files Browse the repository at this point in the history
…ne_finegrainedshuffle_ut
  • Loading branch information
ywqzzy committed Dec 28, 2022
2 parents 240cba9 + 156e216 commit fbb127c
Show file tree
Hide file tree
Showing 28 changed files with 1,488 additions and 650 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Common/OptimizedRegularExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Columns/ColumnString.h>
#include <Common/config.h>
#include <common/StringRef.h>
#include <common/types.h>
Expand Down Expand Up @@ -117,6 +118,7 @@ class OptimizedRegularExpressionImpl

Int64 instr(const char * subject, size_t subject_size, Int64 pos, Int64 occur, Int64 ret_op);
std::optional<StringRef> substr(const char * subject, size_t subject_size, Int64 pos, Int64 occur);
void replace(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 pos, Int64 occur);

private:
Int64 processInstrEmptyStringExpr(const char * expr, size_t expr_size, size_t byte_pos, Int64 occur);
Expand All @@ -125,6 +127,11 @@ class OptimizedRegularExpressionImpl
std::optional<StringRef> processSubstrEmptyStringExpr(const char * expr, size_t expr_size, size_t byte_pos, Int64 occur);
std::optional<StringRef> substrImpl(const char * subject, size_t subject_size, Int64 byte_pos, Int64 occur);

void processReplaceEmptyStringExpr(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur);
void replaceImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur);
void replaceOneImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur);
void replaceAllImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos);

bool is_trivial;
bool required_substring_is_prefix;
bool is_case_insensitive;
Expand Down
171 changes: 162 additions & 9 deletions dbms/src/Common/OptimizedRegularExpression.inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <common/defines.h>
#include <common/types.h>

#include <cstring>
#include <iostream>
#include <optional>

Expand Down Expand Up @@ -499,22 +500,67 @@ std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::processSub
return std::optional<StringRef>(StringRef(matched_str.data(), matched_str.size()));
}

static inline void checkInstrArgs(Int64 utf8_total_len, size_t subject_size, Int64 pos, Int64 ret_op)
template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::processReplaceEmptyStringExpr(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur)
{
RUNTIME_CHECK_MSG(!(ret_op != 0 && ret_op != 1), "Incorrect argument to regexp function: return_option must be 1 or 0");
RUNTIME_CHECK_MSG(!(pos <= 0 || (pos > utf8_total_len && subject_size != 0)), "Index out of bounds in regular function.");
if (occur > 1 || byte_pos != 1)
{
res_data.resize(res_data.size() + 1);
res_data[res_offset++] = '\0';
return;
}

StringPieceType expr_sp(subject, subject_size);
StringPieceType matched_str;
bool success = RegexType::FindAndConsume(&expr_sp, *re2, &matched_str);
if (!success)
{
res_data.resize(res_data.size() + 1);
}
else
{
res_data.resize(res_data.size() + repl.size + 1);
memcpy(&res_data[res_offset], repl.data, repl.size);
res_offset += repl.size;
}

res_data[res_offset++] = '\0';
}

static inline void checkSubstrArgs(Int64 utf8_total_len, size_t subject_size, Int64 pos)
namespace FunctionsRegexp
{
inline void checkArgPos(Int64 utf8_total_len, size_t subject_size, Int64 pos)
{
RUNTIME_CHECK_MSG(!(pos <= 0 || (pos > utf8_total_len && subject_size != 0)), "Index out of bounds in regular function.");
}

static inline void makeOccurValid(Int64 & occur)
inline void checkArgsInstr(Int64 utf8_total_len, size_t subject_size, Int64 pos, Int64 ret_op)
{
RUNTIME_CHECK_MSG(!(ret_op != 0 && ret_op != 1), "Incorrect argument to regexp function: return_option must be 1 or 0");
checkArgPos(utf8_total_len, subject_size, pos);
}

inline void checkArgsSubstr(Int64 utf8_total_len, size_t subject_size, Int64 pos)
{
checkArgPos(utf8_total_len, subject_size, pos);
}

inline void checkArgsReplace(Int64 utf8_total_len, size_t subject_size, Int64 pos)
{
checkArgPos(utf8_total_len, subject_size, pos);
}

inline void makeOccurValid(Int64 & occur)
{
occur = occur < 1 ? 1 : occur;
}

inline void makeReplaceOccurValid(Int64 & occur)
{
occur = occur < 0 ? 1 : occur;
}
} // namespace FunctionsRegexp

template <bool thread_safe>
Int64 OptimizedRegularExpressionImpl<thread_safe>::instrImpl(const char * subject, size_t subject_size, Int64 byte_pos, Int64 occur, Int64 ret_op)
{
Expand Down Expand Up @@ -557,13 +603,95 @@ std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::substrImpl
return std::optional<StringRef>(StringRef(matched_str.data(), matched_str.size()));
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replaceAllImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos)
{
size_t byte_offset = byte_pos - 1; // This is a offset for bytes, not utf8
StringPieceType expr_sp(subject + byte_offset, subject_size - byte_offset);
StringPieceType matched_str;
size_t prior_offset = 0;

while (true)
{
bool success = RegexType::FindAndConsume(&expr_sp, *re2, &matched_str);
if (!success)
break;

auto skipped_byte_size = reinterpret_cast<Int64>(matched_str.data() - (subject + prior_offset));
res_data.resize(res_data.size() + skipped_byte_size);
memcpy(&res_data[res_offset], subject + prior_offset, skipped_byte_size); // copy the skipped bytes
res_offset += skipped_byte_size;

res_data.resize(res_data.size() + repl.size);
memcpy(&res_data[res_offset], repl.data, repl.size); // replace the matched string
res_offset += repl.size;

prior_offset = expr_sp.data() - subject;
}

size_t suffix_byte_size = subject_size - prior_offset;
res_data.resize(res_data.size() + suffix_byte_size + 1);
memcpy(&res_data[res_offset], subject + prior_offset, suffix_byte_size); // Copy suffix string
res_offset += suffix_byte_size;
res_data[res_offset++] = 0;
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replaceOneImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur)
{
size_t byte_offset = byte_pos - 1; // This is a offset for bytes, not utf8
StringPieceType expr_sp(subject + byte_offset, subject_size - byte_offset);
StringPieceType matched_str;

while (occur > 0)
{
bool success = RegexType::FindAndConsume(&expr_sp, *re2, &matched_str);
if (!success)
{
res_data.resize(res_data.size() + subject_size + 1);
memcpy(&res_data[res_offset], subject, subject_size);
res_offset += subject_size;
res_data[res_offset++] = 0;
return;
}

--occur;
}

auto prefix_byte_size = reinterpret_cast<Int64>(matched_str.data() - subject);
res_data.resize(res_data.size() + prefix_byte_size);
memcpy(&res_data[res_offset], subject, prefix_byte_size); // Copy prefix string
res_offset += prefix_byte_size;

res_data.resize(res_data.size() + repl.size);
memcpy(&res_data[res_offset], repl.data, repl.size); // Replace the matched string
res_offset += repl.size;

const char * suffix_str = subject + prefix_byte_size + matched_str.size();
size_t suffix_byte_size = subject_size - prefix_byte_size - matched_str.size();
res_data.resize(res_data.size() + suffix_byte_size + 1);
memcpy(&res_data[res_offset], suffix_str, suffix_byte_size); // Copy suffix string
res_offset += suffix_byte_size;

res_data[res_offset++] = 0;
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replaceImpl(const char * subject, size_t subject_size, DB::ColumnString::Chars_t & res_data, DB::ColumnString::Offset & res_offset, const StringRef & repl, Int64 byte_pos, Int64 occur)
{
if (occur == 0)
return replaceAllImpl(subject, subject_size, res_data, res_offset, repl, byte_pos);
else
return replaceOneImpl(subject, subject_size, res_data, res_offset, repl, byte_pos, occur);
}

template <bool thread_safe>
Int64 OptimizedRegularExpressionImpl<thread_safe>::instr(const char * subject, size_t subject_size, Int64 pos, Int64 occur, Int64 ret_op)
{
Int64 utf8_total_len = DB::UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(subject), subject_size);
;
checkInstrArgs(utf8_total_len, subject_size, pos, ret_op);
makeOccurValid(occur);
FunctionsRegexp::checkArgsInstr(utf8_total_len, subject_size, pos, ret_op);
FunctionsRegexp::makeOccurValid(occur);

if (unlikely(subject_size == 0))
return processInstrEmptyStringExpr(subject, subject_size, pos, occur);
Expand All @@ -576,8 +704,8 @@ template <bool thread_safe>
std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::substr(const char * subject, size_t subject_size, Int64 pos, Int64 occur)
{
Int64 utf8_total_len = DB::UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(subject), subject_size);
checkSubstrArgs(utf8_total_len, subject_size, pos);
makeOccurValid(occur);
FunctionsRegexp::checkArgsSubstr(utf8_total_len, subject_size, pos);
FunctionsRegexp::makeOccurValid(occur);

if (unlikely(subject_size == 0))
return processSubstrEmptyStringExpr(subject, subject_size, pos, occur);
Expand All @@ -586,5 +714,30 @@ std::optional<StringRef> OptimizedRegularExpressionImpl<thread_safe>::substr(con
return substrImpl(subject, subject_size, byte_pos, occur);
}

template <bool thread_safe>
void OptimizedRegularExpressionImpl<thread_safe>::replace(
const char * subject,
size_t subject_size,
DB::ColumnString::Chars_t & res_data,
DB::ColumnString::Offset & res_offset,
const StringRef & repl,
Int64 pos,
Int64 occur)
{
Int64 utf8_total_len = DB::UTF8::countCodePoints(reinterpret_cast<const UInt8 *>(subject), subject_size);
;
FunctionsRegexp::checkArgsReplace(utf8_total_len, subject_size, pos);
FunctionsRegexp::makeReplaceOccurValid(occur);

if (unlikely(subject_size == 0))
{
processReplaceEmptyStringExpr(subject, subject_size, res_data, res_offset, repl, pos, occur);
return;
}

size_t byte_pos = DB::UTF8::utf8Pos2bytePos(reinterpret_cast<const UInt8 *>(subject), pos);
replaceImpl(subject, subject_size, res_data, res_offset, repl, byte_pos, occur);
}

#undef MIN_LENGTH_FOR_STRSTR
#undef MAX_SUBPATTERNS
39 changes: 39 additions & 0 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,45 @@ static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, cons
return ReturnType(true);
}

Block mergeBlocks(Blocks && blocks)
{
if (blocks.empty())
{
return {};
}

if (blocks.size() == 1)
{
return std::move(blocks[0]);
}

auto & first_block = blocks[0];
size_t result_rows = 0;
for (const auto & block : blocks)
{
result_rows += block.rows();
}

MutableColumns dst_columns(first_block.columns());

for (size_t i = 0; i < first_block.columns(); ++i)
{
dst_columns[i] = (*std::move(first_block.getByPosition(i).column)).mutate();
dst_columns[i]->reserve(result_rows);
}

for (size_t i = 1; i < blocks.size(); ++i)
{
if (likely(blocks[i].rows()) > 0)
{
for (size_t column = 0; column < blocks[i].columns(); ++column)
{
dst_columns[column]->insertRangeFrom(*blocks[i].getByPosition(column).column, 0, blocks[i].rows());
}
}
}
return first_block.cloneWithColumns(std::move(dst_columns));
}

bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class Block
*/
void updateHash(SipHash & hash) const;


private:
void eraseImpl(size_t position);
void initializeIndexByName();
Expand All @@ -157,6 +158,7 @@ class Block
using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;

Block mergeBlocks(Blocks && blocks);

/// Compare number of columns, data types, column types, column names, and values of constant columns.
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
Expand Down
21 changes: 19 additions & 2 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#include <DataStreams/HashJoinProbeBlockInputStream.h>
#include <Interpreters/ExpressionActions.h>

namespace DB
{
Expand All @@ -25,6 +24,7 @@ HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
: log(Logger::get(req_id))
, join(join_)
, probe_process_info(max_block_size)
, squashing_transform(max_block_size)
{
children.push_back(input);

Expand Down Expand Up @@ -66,18 +66,35 @@ Block HashJoinProbeBlockInputStream::getHeader() const
}

Block HashJoinProbeBlockInputStream::readImpl()
{
// if join finished, return {} directly.
if (squashing_transform.isJoinFinished())
{
return Block{};
}

while (squashing_transform.needAppendBlock())
{
Block result_block = getOutputBlock();
squashing_transform.appendBlock(result_block);
}
return squashing_transform.getFinalOutputBlock();
}

Block HashJoinProbeBlockInputStream::getOutputBlock()
{
if (probe_process_info.all_rows_joined_finish)
{
Block block = children.back()->read();
if (!block)
{
return block;
}
join->checkTypes(block);
probe_process_info.resetBlock(std::move(block));
}

return join->joinBlock(probe_process_info);
}


} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/SquashingHashJoinBlockTransform.h>
#include <Interpreters/Join.h>

namespace DB
{


/** Executes a certain expression over the block.
* Basically the same as ExpressionBlockInputStream,
* but requires that there must be a join probe action in the Expression.
Expand All @@ -47,11 +47,13 @@ class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream

protected:
Block readImpl() override;
Block getOutputBlock();

private:
const LoggerPtr log;
JoinPtr join;
ProbeProcessInfo probe_process_info;
SquashingHashJoinBlockTransform squashing_transform;
};

} // namespace DB
Loading

0 comments on commit fbb127c

Please sign in to comment.