Skip to content

Commit

Permalink
Merge branch 'clean_functionBinaryArithmetic' of https://github.com/y…
Browse files Browse the repository at this point in the history
…wqzzy/tics into clean_functionBinaryArithmetic
  • Loading branch information
ywqzzy committed Jan 13, 2022
2 parents 730f9e8 + 6492da6 commit 2ec0991
Show file tree
Hide file tree
Showing 42 changed files with 599 additions and 557 deletions.
7 changes: 7 additions & 0 deletions cmake/find_xxhash.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,12 @@ if(NOT BUILD_SHARED_LIBS)
endif()
add_subdirectory(
${ClickHouse_SOURCE_DIR}/contrib/xxHash/cmake_unofficial EXCLUDE_FROM_ALL)
if(ARCH_AMD64)
add_library(xxhash_dispatch STATIC ${ClickHouse_SOURCE_DIR}/contrib/xxHash/xxh_x86dispatch.c)
target_link_libraries(xxhash_dispatch PUBLIC xxHash::xxhash)
set(TIFLASH_XXHASH_LIBRARY xxhash_dispatch)
else()
set(TIFLASH_XXHASH_LIBRARY xxHash::xxhash)
endif()
set(XXHASH_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/xxHash)

2 changes: 1 addition & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ target_include_directories (clickhouse_common_io BEFORE PUBLIC ${DOUBLE_CONVERSI
# also for copy_headers.sh:
target_include_directories (clickhouse_common_io BEFORE PRIVATE ${COMMON_INCLUDE_DIR})
# https://cmake.org/pipermail/cmake/2016-May/063400.html
target_link_libraries (clickhouse_common_io PUBLIC xxHash::xxhash)
target_link_libraries (clickhouse_common_io PUBLIC ${TIFLASH_XXHASH_LIBRARY})

if (ENABLE_TESTS)
include (${ClickHouse_SOURCE_DIR}/cmake/find_gtest.cmake)
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Common/Checksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
#include <Poco/Base64Decoder.h>
#include <Poco/Base64Encoder.h>
#include <common/crc64.h>
#ifdef __x86_64__
#include <xxh_x86dispatch.h>
#else
#include <xxh3.h>
#endif
#include <zlib.h>

#include <cstddef>
Expand Down Expand Up @@ -100,7 +104,11 @@ class XXH3
void update(const void * src, size_t length)
{
ProfileEvents::increment(ProfileEvents::ChecksumDigestBytes, length);
#ifdef __x86_64__ // dispatched version can utilize hardware resource
state = XXH3_64bits_withSeed_dispatch(src, length, state);
#else // use inlined version
state = XXH_INLINE_XXH3_64bits_withSeed(src, length, state);
#endif
}
[[nodiscard]] HashType checksum() const { return state; }

Expand Down
13 changes: 12 additions & 1 deletion dbms/src/Common/ThreadManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,23 @@ class RawThreadManager : public ThreadManager
}

void wait() override
{
waitAndClear();
}

~RawThreadManager()
{
waitAndClear();
}

protected:
void waitAndClear()
{
for (auto & worker : workers)
worker.join();
workers.clear();
}

protected:
std::vector<std::thread> workers;
};

Expand Down
23 changes: 13 additions & 10 deletions dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
#include <Parsers/ASTInsertQuery.h>
#include <Interpreters/Context.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTInsertQuery.h>


namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int LOGICAL_ERROR;
}


InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context)
const ASTPtr & ast,
ReadBuffer & input_buffer_tail_part,
const BlockIO & streams,
Context & context)
{
const ASTInsertQuery * ast_insert_query = dynamic_cast<const ASTInsertQuery *>(ast.get());

Expand All @@ -30,7 +32,8 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
/// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query.

input_buffer_ast_part = std::make_unique<ReadBufferFromMemory>(
ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);
ast_insert_query->data,
ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0);

ConcatReadBuffer::ReadBuffers buffers;
if (ast_insert_query->data)
Expand All @@ -43,7 +46,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(

input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);

res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettingsRef().max_insert_block_size);
}

}
} // namespace DB
30 changes: 13 additions & 17 deletions dbms/src/DataStreams/tests/union_stream2.cpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
#include <iostream>
#include <iomanip>

#include <IO/WriteBufferFromFileDescriptor.h>

#include <Storages/System/StorageSystemNumbers.h>
#include <Storages/RegionQueryInfo.h>

#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>

#include <DataTypes/DataTypesNumber.h>

#include <IO/WriteBufferFromFileDescriptor.h>
#include <Interpreters/Context.h>
#include <Interpreters/loadMetadata.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/System/StorageSystemNumbers.h>

#include <iomanip>
#include <iostream>


using namespace DB;
Expand All @@ -41,8 +37,8 @@ try
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = std::make_shared<AsynchronousBlockInputStream>(streams[i]);

BlockInputStreamPtr stream = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, settings.max_threads);
stream = std::make_shared<LimitBlockInputStream>(stream, 10, 0);
BlockInputStreamPtr stream = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, settings.max_threads, nullptr);
stream = std::make_shared<LimitBlockInputStream>(stream, 10, 0, nullptr);

WriteBufferFromFileDescriptor wb(STDERR_FILENO);
Block sample = table->getSampleBlock();
Expand All @@ -55,8 +51,8 @@ try
catch (const Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString();
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString();
return 1;
}
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
= std::make_shared<ExchangeReceiver>(
std::make_shared<GRPCReceiverContext>(context.getTMTContext().getKVCluster(),
context.getTMTContext().getMPPTaskManager(),
context.getSettings().enable_local_tunnel),
context.getSettingsRef().enable_local_tunnel),
tipb_exchange_receiver,
root_tm,
10,
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
Expand Down Expand Up @@ -93,7 +92,7 @@ try
{
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<UnaryDAGResponseWriter>(
dag_response,
context.getSettings().dag_records_per_chunk,
context.getSettingsRef().dag_records_per_chunk,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
copyData(*streams.in, *dag_output_stream);
Expand Down Expand Up @@ -121,8 +120,8 @@ try
std::vector<Int64>(),
collators,
tipb::ExchangeType::PassThrough,
context.getSettings().dag_records_per_chunk,
context.getSettings().batch_send_min_limit,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
true,
dag_context);
dag_output_stream = std::make_shared<DAGBlockOutputStream>(streams.in->getHeader(), std::move(response_writer));
Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ class DAGQueryBlock
void fillOutputFieldTypes();
void collectAllPossibleChildrenJoinSubqueryAlias(std::unordered_map<UInt32, std::vector<String>> & result);
bool isRootQueryBlock() const { return id == 1; };
bool isRemoteQuery() const
{
return source->tp() == tipb::ExecType::TypeTableScan && source->tbl_scan().next_read_engine() != tipb::EngineType::Local;
}
};

} // namespace DB
Loading

0 comments on commit 2ec0991

Please sign in to comment.