diff --git a/cmake/find_xxhash.cmake b/cmake/find_xxhash.cmake index d11c54ea831..2b4ad1afede 100644 --- a/cmake/find_xxhash.cmake +++ b/cmake/find_xxhash.cmake @@ -5,5 +5,12 @@ if(NOT BUILD_SHARED_LIBS) endif() add_subdirectory( ${ClickHouse_SOURCE_DIR}/contrib/xxHash/cmake_unofficial EXCLUDE_FROM_ALL) +if(ARCH_AMD64) + add_library(xxhash_dispatch STATIC ${ClickHouse_SOURCE_DIR}/contrib/xxHash/xxh_x86dispatch.c) + target_link_libraries(xxhash_dispatch PUBLIC xxHash::xxhash) + set(TIFLASH_XXHASH_LIBRARY xxhash_dispatch) +else() + set(TIFLASH_XXHASH_LIBRARY xxHash::xxhash) +endif() set(XXHASH_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/xxHash) diff --git a/contrib/xxHash b/contrib/xxHash index 0e492170469..94e5f23e736 160000 --- a/contrib/xxHash +++ b/contrib/xxHash @@ -1 +1 @@ -Subproject commit 0e49217046917180b9118a2d89aceaa76a65cf51 +Subproject commit 94e5f23e736f2bb67ebdf90727353e65344f9fc0 diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 2285bcbaa60..5c240785586 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -246,7 +246,7 @@ target_include_directories (clickhouse_common_io BEFORE PUBLIC ${DOUBLE_CONVERSI # also for copy_headers.sh: target_include_directories (clickhouse_common_io BEFORE PRIVATE ${COMMON_INCLUDE_DIR}) # https://cmake.org/pipermail/cmake/2016-May/063400.html -target_link_libraries (clickhouse_common_io PUBLIC xxHash::xxhash) +target_link_libraries (clickhouse_common_io PUBLIC ${TIFLASH_XXHASH_LIBRARY}) if (ENABLE_TESTS) include (${ClickHouse_SOURCE_DIR}/cmake/find_gtest.cmake) diff --git a/dbms/src/Common/Checksum.h b/dbms/src/Common/Checksum.h index 6f5046332d0..35233c5cfd2 100644 --- a/dbms/src/Common/Checksum.h +++ b/dbms/src/Common/Checksum.h @@ -4,7 +4,11 @@ #include #include #include +#ifdef __x86_64__ +#include +#else #include +#endif #include #include @@ -100,7 +104,11 @@ class XXH3 void update(const void * src, size_t length) { ProfileEvents::increment(ProfileEvents::ChecksumDigestBytes, length); +#ifdef __x86_64__ // dispatched version can utilize hardware resource + state = XXH3_64bits_withSeed_dispatch(src, length, state); +#else // use inlined version state = XXH_INLINE_XXH3_64bits_withSeed(src, length, state); +#endif } [[nodiscard]] HashType checksum() const { return state; } diff --git a/dbms/src/Common/ThreadManager.cpp b/dbms/src/Common/ThreadManager.cpp index 188494954e2..7073ed728e4 100644 --- a/dbms/src/Common/ThreadManager.cpp +++ b/dbms/src/Common/ThreadManager.cpp @@ -58,12 +58,23 @@ class RawThreadManager : public ThreadManager } void wait() override + { + waitAndClear(); + } + + ~RawThreadManager() + { + waitAndClear(); + } + +protected: + void waitAndClear() { for (auto & worker : workers) worker.join(); + workers.clear(); } -protected: std::vector workers; }; diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp index 0e4f876925d..cde812174fb 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp @@ -1,22 +1,24 @@ -#include -#include -#include -#include #include #include +#include +#include +#include +#include namespace DB { - namespace ErrorCodes { - extern const int LOGICAL_ERROR; +extern const int LOGICAL_ERROR; } InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( - const ASTPtr & ast, ReadBuffer & input_buffer_tail_part, const BlockIO & streams, Context & context) + const ASTPtr & ast, + ReadBuffer & input_buffer_tail_part, + const BlockIO & streams, + Context & context) { const ASTInsertQuery * ast_insert_query = dynamic_cast(ast.get()); @@ -30,7 +32,8 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. input_buffer_ast_part = std::make_unique( - ast_insert_query->data, ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); + ast_insert_query->data, + ast_insert_query->data ? ast_insert_query->end - ast_insert_query->data : 0); ConcatReadBuffer::ReadBuffers buffers; if (ast_insert_query->data) @@ -43,7 +46,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( input_buffer_contacenated = std::make_unique(buffers); - res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size); + res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettingsRef().max_insert_block_size); } -} +} // namespace DB diff --git a/dbms/src/DataStreams/tests/union_stream2.cpp b/dbms/src/DataStreams/tests/union_stream2.cpp index 785e48744a1..dde2827f121 100644 --- a/dbms/src/DataStreams/tests/union_stream2.cpp +++ b/dbms/src/DataStreams/tests/union_stream2.cpp @@ -1,21 +1,17 @@ -#include -#include - -#include - -#include -#include - -#include -#include #include #include +#include +#include #include - #include - +#include #include #include +#include +#include + +#include +#include using namespace DB; @@ -41,8 +37,8 @@ try for (size_t i = 0, size = streams.size(); i < size; ++i) streams[i] = std::make_shared(streams[i]); - BlockInputStreamPtr stream = std::make_shared>(streams, nullptr, settings.max_threads); - stream = std::make_shared(stream, 10, 0); + BlockInputStreamPtr stream = std::make_shared>(streams, nullptr, settings.max_threads, nullptr); + stream = std::make_shared(stream, 10, 0, nullptr); WriteBufferFromFileDescriptor wb(STDERR_FILENO); Block sample = table->getSampleBlock(); @@ -55,8 +51,8 @@ try catch (const Exception & e) { std::cerr << e.what() << ", " << e.displayText() << std::endl - << std::endl - << "Stack trace:" << std::endl - << e.getStackTrace().toString(); + << std::endl + << "Stack trace:" << std::endl + << e.getStackTrace().toString(); return 1; } diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index e62302228a3..8ea1adb6879 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -326,7 +326,7 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA = std::make_shared( std::make_shared(context.getTMTContext().getKVCluster(), context.getTMTContext().getMPPTaskManager(), - context.getSettings().enable_local_tunnel), + context.getSettingsRef().enable_local_tunnel), tipb_exchange_receiver, root_tm, 10, diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index df1e215aec6..245ca1ca4d4 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -93,7 +92,7 @@ try { std::unique_ptr response_writer = std::make_unique( dag_response, - context.getSettings().dag_records_per_chunk, + context.getSettingsRef().dag_records_per_chunk, dag_context); dag_output_stream = std::make_shared(streams.in->getHeader(), std::move(response_writer)); copyData(*streams.in, *dag_output_stream); @@ -121,8 +120,8 @@ try std::vector(), collators, tipb::ExchangeType::PassThrough, - context.getSettings().dag_records_per_chunk, - context.getSettings().batch_send_min_limit, + context.getSettingsRef().dag_records_per_chunk, + context.getSettingsRef().batch_send_min_limit, true, dag_context); dag_output_stream = std::make_shared(streams.in->getHeader(), std::move(response_writer)); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h index f8fd4ad31da..b4b191c2cb0 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlock.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlock.h @@ -60,10 +60,6 @@ class DAGQueryBlock void fillOutputFieldTypes(); void collectAllPossibleChildrenJoinSubqueryAlias(std::unordered_map> & result); bool isRootQueryBlock() const { return id == 1; }; - bool isRemoteQuery() const - { - return source->tp() == tipb::ExecType::TypeTableScan && source->tbl_scan().next_read_engine() != tipb::EngineType::Local; - } }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index fd4e33953a1..ae69bb55aed 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -264,6 +264,8 @@ void DAGQueryBlockInterpreter::executeTS(const tipb::TableScan & ts, DAGPipeline { throw TiFlashException("Dag Request does not have region to read. ", Errors::Coprocessor::BadRequest); } + if (ts.next_read_engine() != tipb::EngineType::Local) + throw TiFlashException("Unsupported remote query.", Errors::Coprocessor::BadRequest); DAGStorageInterpreter storage_interpreter(context, query_block, ts, conditions, max_streams); storage_interpreter.execute(pipeline); @@ -700,6 +702,8 @@ void DAGQueryBlockInterpreter::executeAggregation( settings.aggregation_memory_efficient_merge_threads ? static_cast(settings.aggregation_memory_efficient_merge_threads) : static_cast(settings.max_threads), taskLogger()); pipeline.streams.resize(1); + // should record for agg before restore concurrency. See #3804. + recordProfileStreams(pipeline, query_block.aggregation_name); restorePipelineConcurrency(pipeline); } else @@ -718,6 +722,7 @@ void DAGQueryBlockInterpreter::executeAggregation( context.getFileProvider(), true, taskLogger()); + recordProfileStreams(pipeline, query_block.aggregation_name); } // add cast } @@ -764,134 +769,9 @@ void DAGQueryBlockInterpreter::executeOrder(DAGPipeline & pipeline, const std::v void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key) { - dagContext().getProfileStreamsMap()[key].qb_id = query_block.id; - for (auto & stream : pipeline.streams) - { - dagContext().getProfileStreamsMap()[key].input_streams.push_back(stream); - } - for (auto & stream : pipeline.streams_with_non_joined_data) - dagContext().getProfileStreamsMap()[key].input_streams.push_back(stream); -} - -void copyExecutorTreeWithLocalTableScan( - tipb::DAGRequest & dag_req, - const tipb::Executor * root, - const tipb::DAGRequest & org_req) -{ - const tipb::Executor * current = root; - auto * exec = dag_req.mutable_root_executor(); - while (current->tp() != tipb::ExecType::TypeTableScan) - { - exec->set_tp(current->tp()); - exec->set_executor_id(current->executor_id()); - if (current->tp() == tipb::ExecType::TypeSelection) - { - auto * sel = exec->mutable_selection(); - for (auto const & condition : current->selection().conditions()) - { - auto * tmp = sel->add_conditions(); - tmp->CopyFrom(condition); - } - exec = sel->mutable_child(); - current = ¤t->selection().child(); - } - else if (current->tp() == tipb::ExecType::TypeAggregation || current->tp() == tipb::ExecType::TypeStreamAgg) - { - auto * agg = exec->mutable_aggregation(); - for (auto const & expr : current->aggregation().agg_func()) - { - auto * tmp = agg->add_agg_func(); - tmp->CopyFrom(expr); - } - for (auto const & expr : current->aggregation().group_by()) - { - auto * tmp = agg->add_group_by(); - tmp->CopyFrom(expr); - } - agg->set_streamed(current->aggregation().streamed()); - exec = agg->mutable_child(); - current = ¤t->aggregation().child(); - } - else if (current->tp() == tipb::ExecType::TypeLimit) - { - auto * limit = exec->mutable_limit(); - limit->set_limit(current->limit().limit()); - exec = limit->mutable_child(); - current = ¤t->limit().child(); - } - else if (current->tp() == tipb::ExecType::TypeTopN) - { - auto * topn = exec->mutable_topn(); - topn->set_limit(current->topn().limit()); - for (auto const & expr : current->topn().order_by()) - { - auto * tmp = topn->add_order_by(); - tmp->CopyFrom(expr); - } - exec = topn->mutable_child(); - current = ¤t->topn().child(); - } - else - { - throw TiFlashException("Not supported yet", Errors::Coprocessor::Unimplemented); - } - } - - if (current->tp() != tipb::ExecType::TypeTableScan) - throw TiFlashException("Only support copy from table scan sourced query block", Errors::Coprocessor::Internal); - exec->set_tp(tipb::ExecType::TypeTableScan); - exec->set_executor_id(current->executor_id()); - auto * new_ts = new tipb::TableScan(current->tbl_scan()); - new_ts->set_next_read_engine(tipb::EngineType::Local); - exec->set_allocated_tbl_scan(new_ts); - - /// force the encode type to be TypeCHBlock, so the receiver side does not need to handle the timezone related issues - dag_req.set_encode_type(tipb::EncodeType::TypeCHBlock); - dag_req.set_force_encode_type(true); - if (org_req.has_time_zone_name() && !org_req.time_zone_name().empty()) - dag_req.set_time_zone_name(org_req.time_zone_name()); - else if (org_req.has_time_zone_offset()) - dag_req.set_time_zone_offset(org_req.time_zone_offset()); -} - -void DAGQueryBlockInterpreter::executeRemoteQuery(DAGPipeline & pipeline) -{ - // remote query containing agg/limit/topN can not running - // in parellel, but current remote query is running in - // parellel, so just disable this corner case. - if (query_block.aggregation || query_block.limitOrTopN) - throw TiFlashException("Remote query containing agg or limit or topN is not supported", Errors::Coprocessor::BadRequest); - const auto & ts = query_block.source->tbl_scan(); - std::vector cop_key_ranges; - cop_key_ranges.reserve(ts.ranges_size()); - for (const auto & range : ts.ranges()) - { - cop_key_ranges.emplace_back(range.low(), range.high()); - } - sort(cop_key_ranges.begin(), cop_key_ranges.end()); - - ::tipb::DAGRequest dag_req; - - copyExecutorTreeWithLocalTableScan(dag_req, query_block.root, *dagContext().dag_request); - DAGSchema schema; - ColumnsWithTypeAndName columns; - BoolVec is_ts_column; - std::vector source_columns; - for (int i = 0; i < static_cast(query_block.output_field_types.size()); i++) - { - dag_req.add_output_offsets(i); - ColumnInfo info = TiDB::fieldTypeToColumnInfo(query_block.output_field_types[i]); - String col_name = query_block.qb_column_prefix + "col_" + std::to_string(i); - schema.push_back(std::make_pair(col_name, info)); - is_ts_column.push_back(query_block.output_field_types[i].tp() == TiDB::TypeTimestamp); - source_columns.emplace_back(col_name, getDataTypeByFieldTypeForComputingLayer(query_block.output_field_types[i])); - final_project.emplace_back(col_name, ""); - } - - dag_req.set_collect_execution_summaries(dagContext().collect_execution_summaries); - executeRemoteQueryImpl(pipeline, cop_key_ranges, dag_req, schema); - - analyzer = std::make_unique(std::move(source_columns), context); + auto & profile_streams_info = dagContext().getProfileStreamsMap()[key]; + profile_streams_info.qb_id = query_block.id; + pipeline.transform([&profile_streams_info](auto & stream) { profile_streams_info.input_streams.push_back(stream); }); } void DAGQueryBlockInterpreter::executeRemoteQueryImpl( @@ -1042,11 +922,6 @@ void DAGQueryBlockInterpreter::executeExtraCastAndSelection( // like final_project.emplace_back(col.name, query_block.qb_column_prefix + col.name); void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) { - if (query_block.isRemoteQuery()) - { - executeRemoteQuery(pipeline); - return; - } SubqueryForSet right_query; if (query_block.source->tp() == tipb::ExecType::TypeJoin) { @@ -1110,7 +985,6 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) { // execute aggregation executeAggregation(pipeline, res.before_aggregation, res.aggregation_keys, res.aggregation_collators, res.aggregate_descriptions); - recordProfileStreams(pipeline, query_block.aggregation_name); } if (res.before_having) @@ -1216,8 +1090,8 @@ void DAGQueryBlockInterpreter::executeExchangeSender(DAGPipeline & pipeline) partition_col_id, collators, exchange_sender.tp(), - context.getSettings().dag_records_per_chunk, - context.getSettings().batch_send_min_limit, + context.getSettingsRef().dag_records_per_chunk, + context.getSettingsRef().batch_send_min_limit, stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response dagContext()); stream = std::make_shared(stream, std::move(response_writer), taskLogger()); diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index b86f84c2af8..cb4e11cde6b 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -39,7 +39,6 @@ class DAGQueryBlockInterpreter BlockInputStreams execute(); private: - void executeRemoteQuery(DAGPipeline & pipeline); void executeImpl(DAGPipeline & pipeline); void executeTS(const tipb::TableScan & ts, DAGPipeline & pipeline); void executeJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query); diff --git a/dbms/src/Flash/Coprocessor/DAGStringConverter.cpp b/dbms/src/Flash/Coprocessor/DAGStringConverter.cpp deleted file mode 100644 index bca45e1899c..00000000000 --- a/dbms/src/Flash/Coprocessor/DAGStringConverter.cpp +++ /dev/null @@ -1,236 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ -namespace ErrorCodes -{ -extern const int UNKNOWN_TABLE; -extern const int COP_BAD_DAG_REQUEST; -extern const int NOT_IMPLEMENTED; -} // namespace ErrorCodes - -void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) -{ - TableID table_id; - if (ts.has_table_id()) - { - table_id = ts.table_id(); - } - else - { - // do not have table id - throw TiFlashException("Table id not specified in table scan executor", Errors::Coprocessor::BadRequest); - } - auto & tmt_ctx = context.getTMTContext(); - auto storage = tmt_ctx.getStorages().get(table_id); - if (storage == nullptr) - { - throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Coprocessor::BadRequest); - } - - const auto managed_storage = std::dynamic_pointer_cast(storage); - if (!managed_storage) - { - throw TiFlashException("Only Manageable table is supported in DAG request", Errors::Coprocessor::BadRequest); - } - - if (ts.columns_size() == 0) - { - // no column selected, must be something wrong - throw TiFlashException("No column is selected in table scan executor", Errors::Coprocessor::BadRequest); - } - for (const tipb::ColumnInfo & ci : ts.columns()) - { - ColumnID cid = ci.column_id(); - if (cid == -1) - { - // Column ID -1 returns the handle column - auto pk_handle_col = storage->getTableInfo().getPKHandleColumn(); - auto pair = storage->getColumns().getPhysical( - pk_handle_col.has_value() ? pk_handle_col->get().name : MutableSupport::tidb_pk_column_name); - columns_from_ts.push_back(pair); - continue; - } - auto name = managed_storage->getTableInfo().getColumnName(cid); - auto pair = managed_storage->getColumns().getPhysical(name); - columns_from_ts.push_back(pair); - } - ss << "FROM " << storage->getDatabaseName() << "." << storage->getTableName() << " "; -} - -void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss) -{ - bool first = true; - for (const tipb::Expr & expr : sel.conditions()) - { - auto s = exprToString(expr, getCurrentColumns()); - if (first) - { - ss << "WHERE "; - first = false; - } - else - { - ss << "AND "; - } - ss << s << " "; - } -} - -void DAGStringConverter::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) -{ - ss << "LIMIT " << limit.limit() << " "; -} - -void DAGStringConverter::buildProjString(const tipb::Projection & proj, std::stringstream & ss) -{ - ss << "PROJECTION "; - bool first = true; - for (auto & expr : proj.exprs()) - { - if (first) - first = false; - else - ss << ", "; - auto name = exprToString(expr, getCurrentColumns()); - ss << name; - } -} - -void DAGStringConverter::buildAggString(const tipb::Aggregation & agg, std::stringstream & ss) -{ - for (auto & agg_func : agg.agg_func()) - { - if (!agg_func.has_field_type()) - throw TiFlashException("Agg func without field type", Errors::Coprocessor::BadRequest); - columns_from_agg.emplace_back(exprToString(agg_func, getCurrentColumns()), getDataTypeByFieldTypeForComputingLayer(agg_func.field_type())); - } - if (agg.group_by_size() != 0) - { - ss << "GROUP BY "; - bool first = true; - for (auto & group_by : agg.group_by()) - { - if (first) - first = false; - else - ss << ", "; - auto name = exprToString(group_by, getCurrentColumns()); - ss << name; - if (!group_by.has_field_type()) - throw TiFlashException("group by expr without field type", Errors::Coprocessor::BadRequest); - columns_from_agg.emplace_back(name, getDataTypeByFieldTypeForComputingLayer(group_by.field_type())); - } - } - afterAgg = true; -} -void DAGStringConverter::buildTopNString(const tipb::TopN & topN, std::stringstream & ss) -{ - ss << "ORDER BY "; - bool first = true; - for (auto & order_by_item : topN.order_by()) - { - if (first) - first = false; - else - ss << ", "; - ss << exprToString(order_by_item.expr(), getCurrentColumns()) << " "; - ss << (order_by_item.desc() ? "DESC" : "ASC"); - } - ss << " LIMIT " << topN.limit() << " "; -} - -//todo return the error message -void DAGStringConverter::buildString(const tipb::Executor & executor, std::stringstream & ss) -{ - switch (executor.tp()) - { - case tipb::ExecType::TypeTableScan: - return buildTSString(executor.tbl_scan(), ss); - case tipb::ExecType::TypeJoin: - case tipb::ExecType::TypeIndexScan: - // index scan not supported - throw TiFlashException("IndexScan is not supported", Errors::Coprocessor::Unimplemented); - case tipb::ExecType::TypeSelection: - return buildSelString(executor.selection(), ss); - case tipb::ExecType::TypeAggregation: - // stream agg is not supported, treated as normal agg - case tipb::ExecType::TypeStreamAgg: - return buildAggString(executor.aggregation(), ss); - case tipb::ExecType::TypeTopN: - return buildTopNString(executor.topn(), ss); - case tipb::ExecType::TypeLimit: - return buildLimitString(executor.limit(), ss); - case tipb::ExecType::TypeProjection: - return buildProjString(executor.projection(), ss); - case tipb::ExecType::TypeExchangeSender: - case tipb::ExecType::TypeExchangeReceiver: - throw TiFlashException("Mpp executor is not supported", Errors::Coprocessor::Unimplemented); - case tipb::ExecType::TypeKill: - throw TiFlashException("Kill executor is not supported", Errors::Coprocessor::Unimplemented); - } -} - -bool isProject(const tipb::Executor &) -{ - // currently, project is not pushed so always return false - return false; -} -DAGStringConverter::DAGStringConverter(Context & context_, const tipb::DAGRequest & dag_request_) - : context(context_) - , dag_request(dag_request_) -{ - afterAgg = false; -} - -String DAGStringConverter::buildSqlString() -{ - std::stringstream query_buf; - std::stringstream project; - for (const tipb::Executor & executor : dag_request.executors()) - { - buildString(executor, query_buf); - } - if (!isProject(dag_request.executors(dag_request.executors_size() - 1))) - { - //append final project - project << "SELECT "; - bool first = true; - auto current_columns = getCurrentColumns(); - std::vector output_index; - if (afterAgg) - for (UInt64 i = 0; i < current_columns.size(); i++) - output_index.push_back(i); - else - for (UInt64 index : dag_request.output_offsets()) - output_index.push_back(index); - - for (UInt64 index : output_index) - { - if (first) - { - first = false; - } - else - { - project << ", "; - } - project << current_columns[index].name; - } - project << " "; - } - return project.str() + query_buf.str(); -} - -} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGStringConverter.h b/dbms/src/Flash/Coprocessor/DAGStringConverter.h deleted file mode 100644 index d686faa43e3..00000000000 --- a/dbms/src/Flash/Coprocessor/DAGStringConverter.h +++ /dev/null @@ -1,50 +0,0 @@ -#pragma once - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#include -#include -#pragma GCC diagnostic pop - -#include - -namespace DB -{ -class Context; - -class DAGStringConverter -{ -public: - DAGStringConverter(Context & context_, const tipb::DAGRequest & dag_request_); - - ~DAGStringConverter() = default; - - String buildSqlString(); - - const std::vector & getCurrentColumns() - { - if (afterAgg) - { - return columns_from_agg; - } - return columns_from_ts; - } - -protected: - void buildTSString(const tipb::TableScan & ts, std::stringstream & ss); - void buildSelString(const tipb::Selection & sel, std::stringstream & ss); - void buildLimitString(const tipb::Limit & limit, std::stringstream & ss); - void buildProjString(const tipb::Projection & proj, std::stringstream & ss); - void buildAggString(const tipb::Aggregation & agg, std::stringstream & ss); - void buildTopNString(const tipb::TopN & topN, std::stringstream & ss); - void buildString(const tipb::Executor & executor, std::stringstream & ss); - -protected: - Context & context; - const tipb::DAGRequest & dag_request; - std::vector columns_from_ts; - std::vector columns_from_agg; - bool afterAgg; -}; - -} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index 2bba6851f8e..4918fb03d7f 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include @@ -60,7 +59,7 @@ void InterpreterDAG::initMPPExchangeReceiver(const DAGQueryBlock & dag_query_blo std::make_shared( context.getTMTContext().getKVCluster(), context.getTMTContext().getMPPTaskManager(), - context.getSettings().enable_local_tunnel), + context.getSettingsRef().enable_local_tunnel), dag_query_block.source->exchange_receiver(), dagContext().getMPPTaskMeta(), max_streams, diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index fb8f430c68d..d276aa39045 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -237,8 +237,8 @@ void MPPTask::prepare(const mpp::DispatchTaskRequest & task_request) mpp::TaskMeta task_meta; if (!task_meta.ParseFromString(exchange_sender.encoded_task_meta(i))) throw TiFlashException("Failed to decode task meta info in ExchangeSender", Errors::Coprocessor::BadRequest); - bool is_local = context->getSettings().enable_local_tunnel && meta.address() == task_meta.address(); - MPPTunnelPtr tunnel = std::make_shared(task_meta, task_request.meta(), timeout, task_cancelled_callback, context->getSettings().max_threads, is_local, log); + bool is_local = context->getSettingsRef().enable_local_tunnel && meta.address() == task_meta.address(); + MPPTunnelPtr tunnel = std::make_shared(task_meta, task_request.meta(), timeout, task_cancelled_callback, context->getSettingsRef().max_threads, is_local, log); LOG_FMT_DEBUG(log, "begin to register the tunnel {}", tunnel->id()); registerTunnel(MPPTaskId{task_meta.start_ts(), task_meta.task_id()}, tunnel); tunnel_set->addTunnel(tunnel); diff --git a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp index bc9a63760a1..11d4eb93ff3 100644 --- a/dbms/src/Flash/Mpp/MPPTunnelSet.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnelSet.cpp @@ -13,6 +13,14 @@ inline mpp::MPPDataPacket serializeToPacket(const tipb::SelectResponse & respons throw Exception(fmt::format("Fail to serialize response, response size: {}", response.ByteSizeLong())); return packet; } + +void checkPacketSize(size_t size) +{ + static constexpr size_t max_packet_size = 1u << 31; + if (size >= max_packet_size) + throw Exception(fmt::format("Packet is too large to send, size : {}", size)); +} + } // namespace template @@ -51,6 +59,7 @@ void MPPTunnelSetBase::write(tipb::SelectResponse & response) template void MPPTunnelSetBase::write(mpp::MPPDataPacket & packet) { + checkPacketSize(packet.ByteSizeLong()); tunnels[0]->write(packet); auto tunnels_size = tunnels.size(); if (tunnels_size > 1) @@ -78,6 +87,7 @@ void MPPTunnelSetBase::write(tipb::SelectResponse & response, int16_t pa template void MPPTunnelSetBase::write(mpp::MPPDataPacket & packet, int16_t partition_id) { + checkPacketSize(packet.ByteSizeLong()); if (partition_id != 0 && !packet.data().empty()) packet.mutable_data()->clear(); diff --git a/dbms/src/Functions/FunctionsComparison.h b/dbms/src/Functions/FunctionsComparison.h index 64bd0d44928..1fc2f090371 100644 --- a/dbms/src/Functions/FunctionsComparison.h +++ b/dbms/src/Functions/FunctionsComparison.h @@ -973,7 +973,7 @@ class FunctionComparison : public IFunction return true; else throw Exception( - fmt::format("Illegal column {} of second argument of function ", col_right_untyped->getName(), getName()), + fmt::format("Illegal column {} of second argument of function {}", col_right_untyped->getName(), getName()), ErrorCodes::ILLEGAL_COLUMN); } @@ -1683,7 +1683,7 @@ class FunctionStrcmp : public FunctionComparison if (!success) { throw Exception( - fmt::format("Function {} executed on invalid arguments [col_left={}][col_right={}]", getName(), col_left_untyped->getName(), col_right_untyped->getName())); + fmt::format("Function {} executed on invalid arguments [col_left={}] [col_right={}]", getName(), col_left_untyped->getName(), col_right_untyped->getName())); } } diff --git a/dbms/src/Functions/FunctionsConditional.h b/dbms/src/Functions/FunctionsConditional.h index 5bb1f9e609a..a80cdac0ef3 100644 --- a/dbms/src/Functions/FunctionsConditional.h +++ b/dbms/src/Functions/FunctionsConditional.h @@ -368,7 +368,7 @@ class FunctionIf : public IFunction return true; else throw Exception( - fmt::format("Illegal column {} of third argument of function {}", block.getByPosition(arguments[2]).column->getName(), getName()), + fmt::format("Illegal column {} of third argument of function {}", block.getByPosition(arguments[2]).column->getName(), getName()), ErrorCodes::ILLEGAL_COLUMN); } else if (col_arr_left && col_arr_left_elems) @@ -814,7 +814,7 @@ class FunctionIf : public IFunction } else throw Exception( - fmt::format("Illegal column {} of first argument of function {}. Must be ColumnUInt8 or ColumnConstUInt8." + arg_cond.column->getName(), getName()), + fmt::format("Illegal column {} of first argument of function {}. Must be ColumnUInt8 or ColumnConstUInt8.", arg_cond.column->getName(), getName()), ErrorCodes::ILLEGAL_COLUMN); return true; } diff --git a/dbms/src/Functions/FunctionsDateTime.h b/dbms/src/Functions/FunctionsDateTime.h index f4ea634a3a8..e1cc5879453 100644 --- a/dbms/src/Functions/FunctionsDateTime.h +++ b/dbms/src/Functions/FunctionsDateTime.h @@ -1323,7 +1323,7 @@ struct DateTimeAddIntervalImpl else { throw Exception( - fmt::format("Illegal column {} of first argument of function ", block.getByPosition(arguments[0]).column->getName(), Transform::name), + fmt::format("Illegal column {} of first argument of function {}", block.getByPosition(arguments[0]).column->getName(), Transform::name), ErrorCodes::ILLEGAL_COLUMN); } } @@ -1426,7 +1426,7 @@ struct DateTimeAddIntervalImplgetName(), Transform::name), + fmt::format("Illegal column {} of first argument of function {}", block.getByPosition(arguments[0]).column->getName(), Transform::name), ErrorCodes::ILLEGAL_COLUMN); } } @@ -2419,7 +2419,7 @@ class FunctionMyTimeZoneConvertByOffset : public IFunction if (!checkDataType(arguments[0].type.get())) throw Exception( - fmt::format("Illegal type {} of first argument of function {}.Should be MyDateTime", arguments[0].type->getName(), getName()), + fmt::format("Illegal type {} of first argument of function {}. Should be MyDateTime", arguments[0].type->getName(), getName()), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); if (!arguments[1].type->isInteger()) throw Exception( diff --git a/dbms/src/Functions/FunctionsMath.h b/dbms/src/Functions/FunctionsMath.h index abb8bb7e862..d6330deb120 100644 --- a/dbms/src/Functions/FunctionsMath.h +++ b/dbms/src/Functions/FunctionsMath.h @@ -225,6 +225,7 @@ struct UnaryFunctionVectorized #else #define UnaryFunctionVectorized UnaryFunctionPlain +#define UnaryFunctionNullableVectorized UnaryFunctionNullablePlain #endif @@ -592,6 +593,16 @@ bool log2args(double b, double e, double & result) return false; } +bool sqrtNullable(double x, double & result) +{ + if (x < 0) + { + return true; + } + result = sqrt(x); + return false; +} + double sign(double x) { return (x > 0) - (x < 0); @@ -703,7 +714,7 @@ using FunctionExp2 = FunctionMathUnaryFloat64>; using FunctionExp10 = FunctionMathUnaryFloat64>; using FunctionLog10 = FunctionMathUnaryFloat64>; -using FunctionSqrt = FunctionMathUnaryFloat64>; +using FunctionSqrt = FunctionMathUnaryFloat64Nullable>; using FunctionCbrt = FunctionMathUnaryFloat64 +#include +#include +#include +namespace DB +{ +namespace tests +{ +class StringCharLength : public FunctionTest +{ +protected: + static ColumnWithTypeAndName toNullableVec(const std::vector> & v) + { + return createColumn>(v); + } + + static ColumnWithTypeAndName toNullableVec(const std::vector> & v) + { + return createColumn>(v); + } + + static ColumnWithTypeAndName toVec(const std::vector> & v) + { + std::vector strings; + strings.reserve(v.size()); + for (std::optional s : v) + { + strings.push_back(s.value()); + } + return createColumn(strings); + } + + static ColumnWithTypeAndName toVec(const std::vector> & v) + { + std::vector ints; + ints.reserve(v.size()); + for (std::optional i : v) + { + ints.push_back(i.value()); + } + return createColumn(ints); + } + + static ColumnWithTypeAndName toConst(const String & s) + { + return createConstColumn(1, s); + } + + static ColumnWithTypeAndName toConst(const UInt64 i) + { + return createConstColumn(1, i); + } +}; + +TEST_F(StringCharLength, charLengthVector) +{ + std::vector> candidate_strings = {"", "a", "do you know the length?", "你知道字符串的长度吗?", "你知道字符串的 length 吗??"}; + std::vector> expect = {0, 1, 23, 11, 18}; + ASSERT_COLUMN_EQ( + toNullableVec(expect), + executeFunction( + "lengthUTF8", + toNullableVec(candidate_strings))); + + ASSERT_COLUMN_EQ( + toVec(expect), + executeFunction( + "lengthUTF8", + toVec(candidate_strings))); + + std::vector> candidate_strings_null = {{}}; + std::vector> expect_null = {{}}; + ASSERT_COLUMN_EQ( + toNullableVec(expect_null), + executeFunction( + "lengthUTF8", + toNullableVec(candidate_strings_null))); +} + +TEST_F(StringCharLength, charLengthConst) +{ + ASSERT_COLUMN_EQ( + toConst(0), + executeFunction( + "lengthUTF8", + toConst(""))); + + ASSERT_COLUMN_EQ( + toConst(23), + executeFunction( + "lengthUTF8", + toConst("do you know the length?"))); + + ASSERT_COLUMN_EQ( + toConst(11), + executeFunction( + "lengthUTF8", + toConst("你知道字符串的长度吗?"))); + + ASSERT_COLUMN_EQ( + toConst(18), + executeFunction( + "lengthUTF8", + toConst("你知道字符串的 length 吗??"))); +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Functions/tests/gtest_strings_search.cpp b/dbms/src/Functions/tests/gtest_strings_search.cpp index e176a700867..3f0f482083b 100644 --- a/dbms/src/Functions/tests/gtest_strings_search.cpp +++ b/dbms/src/Functions/tests/gtest_strings_search.cpp @@ -9,6 +9,57 @@ namespace tests { class StringMatch : public FunctionTest { +protected: + const String longStr = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzab" + "cdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdef" + "ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijkl" + "mnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqr" + "stuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvw" + "xyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; + + const String longPattern = "abcdefghijklmnopqrstuvwxyz_bcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz%abcdefghijklmnopqrstuvwxyz"; + + static ColumnWithTypeAndName toNullableVec(const std::vector> & v) + { + return createColumn>(v); + } + + static ColumnWithTypeAndName toNullableVec(const std::vector> & v) + { + return createColumn>(v); + } + + static ColumnWithTypeAndName toVec(const std::vector> & v) + { + std::vector strings; + strings.reserve(v.size()); + for (std::optional s : v) + { + strings.push_back(s.value()); + } + return createColumn(strings); + } + + static ColumnWithTypeAndName toVec(const std::vector> & v) + { + std::vector ints; + ints.reserve(v.size()); + for (std::optional i : v) + { + ints.push_back(i.value()); + } + return createColumn(ints); + } + + static ColumnWithTypeAndName toConst(const String & s) + { + return createConstColumn(1, s); + } + + static ColumnWithTypeAndName toConst(const UInt8 i) + { + return createConstColumn(1, i); + } }; TEST_F(StringMatch, Like3ArgsVectorWithVector) @@ -139,6 +190,155 @@ try } } CATCH -} // namespace tests +TEST_F(StringMatch, LikeVectorWithVector) +{ + std::vector> haystack = {"我爱tiflash", "我爱tiflash", "", "a", "", "a", "a", "a", "ab", "ab", "a%", "aaaa", "aaaa", "aabaababaabbab", "a", "abab", "abab", "abcdefghijklmn", "a", longStr}; + std::vector> needle = {"我_tif%", "%爱ti%", "", "a", "", "%", "a%", "%a", "a%", "ab", "ab", "a%", "aaab%", "aab%a%aab%b", "_", "_b__", "_b_", "a%", "abcdefghijklmn%", longPattern}; + std::vector> expect = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 1}; + ASSERT_COLUMN_EQ( + toNullableVec(expect), + executeFunction( + "like", + toNullableVec(haystack), + toNullableVec(needle))); + + ASSERT_COLUMN_EQ( + toVec(expect), + executeFunction( + "like", + toVec(haystack), + toVec(needle))); + + std::vector> haystackNull = {{}, "a"}; + std::vector> needleNull = {"我_tif%", {}}; + std::vector> expectNull = {{}, {}}; + ASSERT_COLUMN_EQ( + toNullableVec(expectNull), + executeFunction( + "like", + toNullableVec(haystackNull), + toNullableVec(needleNull))); +} + +TEST_F(StringMatch, LikeConstWithVector) +{ + std::vector> needle = {"", "a", "", "%", "a%", "%a", "a%", "ab", "ab", "a%", "aaab%", "aab%a%aab%b", "_", "_b__", "_b_", longPattern}; + std::vector> expect = {0, 0, 0, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0}; + std::vector> expect1 = {0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1}; + ASSERT_COLUMN_EQ( + toNullableVec(expect), + executeFunction( + "like", + toConst("abcaba"), + toNullableVec(needle))); + + ASSERT_COLUMN_EQ( + toVec(expect), + executeFunction( + "like", + toConst("abcaba"), + toVec(needle))); + + ASSERT_COLUMN_EQ( + toVec(expect1), + executeFunction( + "like", + toConst(longStr), + toVec(needle))); + + std::vector> needleNull = {{}}; + std::vector> expectNull = {{}}; + ASSERT_COLUMN_EQ( + toNullableVec(expectNull), + executeFunction( + "like", + toConst("abc"), + toNullableVec(needleNull))); +} + +TEST_F(StringMatch, LikeVectorWithConst) +{ + std::vector> haystack = {"我爱tiflash", "", "a", "", "a", "a", "a", "ab", "ab", "a%", "aaaa", "aaaa", "aabaababaabbab", "a", "abab", "abab", longStr}; + std::vector> expect = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0}; + std::vector> expect1 = {1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + std::vector> expect2 = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + std::vector> expect3 = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}; + ASSERT_COLUMN_EQ( + toNullableVec(expect), + executeFunction( + "like", + toNullableVec(haystack), + toConst("%aa%"))); + + ASSERT_COLUMN_EQ( + toVec(expect), + executeFunction( + "like", + toVec(haystack), + toConst("%aa%"))); + + ASSERT_COLUMN_EQ( + toVec(expect1), + executeFunction( + "like", + toVec(haystack), + toConst("%爱tif%"))); + + ASSERT_COLUMN_EQ( + toVec(expect2), + executeFunction( + "like", + toVec(haystack), + toConst("%不爱tif%"))); + + ASSERT_COLUMN_EQ( + toVec(expect3), + executeFunction( + "like", + toVec(haystack), + toConst(longPattern))); + + std::vector> haystackNull = {{}}; + std::vector> expectNull = {{}}; + ASSERT_COLUMN_EQ( + toNullableVec(expectNull), + executeFunction( + "like", + toNullableVec(haystackNull), + toConst("abc"))); +} + +TEST_F(StringMatch, LikeConstWithConst) +{ + ASSERT_COLUMN_EQ( + toConst(1), + executeFunction( + "like", + toConst("resaasfe"), + toConst("%aa%"))); + + ASSERT_COLUMN_EQ( + toConst(0), + executeFunction( + "like", + toConst("abcde"), + toConst("%aa%"))); + + ASSERT_COLUMN_EQ( + toConst(1), + executeFunction( + "like", + toConst("我爱tiflash"), + toConst("%爱tif%"))); + + ASSERT_COLUMN_EQ( + toConst(0), + executeFunction( + "like", + toConst("我爱tiflash"), + toConst("%不爱tif%"))); +} + +} // namespace tests } // namespace DB diff --git a/dbms/src/IO/ChecksumBuffer.h b/dbms/src/IO/ChecksumBuffer.h index a0814968385..2f5bdc3634e 100644 --- a/dbms/src/IO/ChecksumBuffer.h +++ b/dbms/src/IO/ChecksumBuffer.h @@ -21,10 +21,10 @@ extern const Event Seek; namespace DB { -/* +/** * A frame consists of a header and a body that conforms the following structure: * - * + * \code * --------------------------------- * | > header | * | - bytes | @@ -35,7 +35,7 @@ namespace DB * | ... | * | ... | * --------------------------------- - * + * \endcode * * When writing a frame, we maintain the buffer than is of the exact size of the data part. * Whenever the buffer is full, we digest the whole buffer and update the header info, write back @@ -46,6 +46,8 @@ namespace DB * The `FramedChecksumWriteBuffer` should be used directly on the file; the stream's ending has no * special mark: that is it ends when the file reaches EOF mark. * + * To keep `PositionInFile` information and make sure the whole file is seekable by offset, one should + * never invoke `sync/next` by hand unless one knows that it is at the end of frame. */ @@ -54,10 +56,20 @@ class FramedChecksumWriteBuffer : public WriteBufferFromFileDescriptor { private: WritableFilePtr out; - size_t current_frame = 0; + size_t materialized_bytes = 0; + size_t frame_count = 0; const size_t frame_size; +#ifndef NDEBUG + bool has_incomplete_frame = false; +#endif void nextImpl() override { +#ifndef NDEBUG + if (offset() != this->working_buffer.size()) + { + has_incomplete_frame = true; + } +#endif size_t len = this->offset(); auto & frame = reinterpret_cast &>( *(this->working_buffer.begin() - sizeof(ChecksumFrame))); // align should not fail @@ -88,17 +100,36 @@ class FramedChecksumWriteBuffer : public WriteBufferFromFileDescriptor } } iter += count; + materialized_bytes += count; expected -= count; } ProfileEvents::increment(ProfileEvents::ChecksumBufferWriteBytes, len + sizeof(ChecksumFrame)); - - current_frame++; + frame_count++; } off_t doSeek(off_t, int) override { throw Exception("framed file is not seekable in writing mode"); } - off_t getPositionInFile() override { return current_frame * frame_size + offset(); } + // For checksum buffer, this is the **faked** file size without checksum header. + // Statistics will be inaccurate after `sync/next` operation in the middle of a frame because it will + // generate a frame without a full length. + off_t getPositionInFile() override + { +#ifndef NDEBUG + assert(has_incomplete_frame == false); +#endif + return frame_count * frame_size + offset(); + } + + // For checksum buffer, this is the real bytes to be materialized to disk. + // We normally have `materialized bytes != position in file` in the sense that, + // materialized bytes are referring to the real files on disk whereas position + // in file are to make the underlying checksum implementation opaque to above layers + // so that above buffers can do seek/read without knowing the existence of frame headers + off_t getMaterializedBytes() override + { + return materialized_bytes + ((offset() != 0) ? (sizeof(ChecksumFrame) + offset()) : 0); + } public: explicit FramedChecksumWriteBuffer(WritableFilePtr out_, size_t block_size_ = TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE) @@ -175,7 +206,7 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor auto & frame = reinterpret_cast &>( *(this->working_buffer.begin() - sizeof(ChecksumFrame))); // align should not fail - auto readHeader = [&]() -> bool { + auto read_header = [&]() -> bool { auto header_length = expectRead(working_buffer.begin() - sizeof(ChecksumFrame), sizeof(ChecksumFrame)); if (header_length == 0) return false; @@ -189,7 +220,7 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor return true; }; - auto readBody = [&]() { + auto read_body = [&]() { auto body_length = expectRead(buffer, frame.bytes); if (unlikely(body_length != frame.bytes)) { @@ -214,14 +245,14 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor while (size >= frame_size) { // read the header to our own memory area - // if readHeader returns false, then we are at the end of file - if (!readHeader()) + // if read_header returns false, then we are at the end of file + if (!read_header()) { return expected - size; } // read the body - readBody(); + read_body(); // check body if (!skip_checksum) diff --git a/dbms/src/IO/WriteBufferFromFileBase.h b/dbms/src/IO/WriteBufferFromFileBase.h index 9e4b980f03e..6f1a2fc6080 100644 --- a/dbms/src/IO/WriteBufferFromFileBase.h +++ b/dbms/src/IO/WriteBufferFromFileBase.h @@ -17,6 +17,10 @@ class WriteBufferFromFileBase : public BufferWithOwnMemory off_t seek(off_t off, int whence = SEEK_SET); void truncate(off_t length = 0); virtual off_t getPositionInFile() = 0; + virtual off_t getMaterializedBytes() + { + return getPositionInFile(); + } virtual void sync() = 0; virtual std::string getFileName() const = 0; virtual int getFD() const = 0; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index e50069a075a..17a519b1258 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -137,7 +137,7 @@ ExpressionAnalyzer::ExpressionAnalyzer( const SubqueriesForSets & subqueries_for_set_) : ast(ast_) , context(context_) - , settings(context.getSettings()) + , settings(context.getSettingsRef()) , subquery_depth(subquery_depth_) , source_columns(source_columns_) , required_result_columns(required_result_columns_.begin(), required_result_columns_.end()) diff --git a/dbms/src/Server/DTTool/DTTool.cpp b/dbms/src/Server/DTTool/DTTool.cpp index ed12ad0b97f..a74dd599854 100644 --- a/dbms/src/Server/DTTool/DTTool.cpp +++ b/dbms/src/Server/DTTool/DTTool.cpp @@ -12,9 +12,9 @@ static constexpr char MAIN_HELP[] = "Usage: dttool [args]\n" "Available Subcommands:\n" " help Print help message and exit.\n" - " migrate Migrate dmfile version.\n" - " inspect Inspect dmfile info.\n" - " bench Benchmark dmfile IO performance."; + " migrate Migrate dtfile version.\n" + " inspect Inspect dtfile info.\n" + " bench Benchmark dtfile IO performance."; // clang-format on extern "C" { diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index 7e10cba50c9..cbf86407f2d 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -30,7 +30,7 @@ static constexpr char BENCH_HELP[] = "Usage: bench [args]\n" "Available Arguments:\n" " --help Print help message and exit.\n" - " --version DMFile version. [default: 2] [available: 1, 2]\n" + " --version DTFile version. [default: 2] [available: 1, 2]\n" " --algorithm Checksum algorithm. [default: xxh3] [available: xxh3, city128, crc32, crc64, none]\n" " --frame Checksum frame length. [default: " TO_STRING(TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE) "]\n" " --column Column number. [default: 100]\n" @@ -215,7 +215,7 @@ int benchEntry(const std::vector & opts) auto version = vm["version"].as(); if (version < 1 || version > 2) { - std::cerr << "invalid dmfile version: " << version << std::endl; + std::cerr << "invalid dtfile version: " << version << std::endl; return -EINVAL; } auto algorithm_ = vm["algorithm"].as(); diff --git a/dbms/src/Server/DTTool/DTToolInspect.cpp b/dbms/src/Server/DTTool/DTToolInspect.cpp index 16e8968766e..a9af278a032 100644 --- a/dbms/src/Server/DTTool/DTToolInspect.cpp +++ b/dbms/src/Server/DTTool/DTToolInspect.cpp @@ -16,9 +16,9 @@ static constexpr char INSPECT_HELP[] = "Usage: inspect [args]\n" "Available Arguments:\n" " --help Print help message and exit.\n" - " --config-file Tiflash config file.\n" + " --config-file TiFlash config file.\n" " --check Iterate data files to check integrity.\n" - " --file-id Target DMFile ID.\n" + " --file-id Target DTFile ID.\n" " --imitative Use imitative context instead. (encryption is not supported in this mode)\n" " --workdir Target directory."; @@ -179,7 +179,6 @@ int inspectEntry(const std::vector & opts, RaftStoreFFIFunc ffi_fun auto workdir = vm["workdir"].as(); auto file_id = vm["file-id"].as(); - auto config_file = vm["config-file"].as(); auto args = InspectArgs{check, file_id, workdir}; if (imitative) @@ -189,6 +188,7 @@ int inspectEntry(const std::vector & opts, RaftStoreFFIFunc ffi_fun } else { + auto config_file = vm["config-file"].as(); CLIService service(inspectServiceMain, args, config_file, ffi_function); return service.run({""}); } diff --git a/dbms/src/Server/DTTool/DTToolMigrate.cpp b/dbms/src/Server/DTTool/DTToolMigrate.cpp index b812f848aa5..28c9586807f 100644 --- a/dbms/src/Server/DTTool/DTToolMigrate.cpp +++ b/dbms/src/Server/DTTool/DTToolMigrate.cpp @@ -40,7 +40,7 @@ static constexpr char MIGRATE_HELP[] = "Usage: migrate [args]\n" "Available Arguments:\n" " --help Print help message and exit.\n" - " --version Target dmfile version. [default: 2] [available: 1, 2]\n" + " --version Target dtfile version. [default: 2] [available: 1, 2]\n" " --algorithm Checksum algorithm. [default: xxh3] [available: xxh3, city128, crc32, crc64, none]\n" " --frame Checksum frame length. [default: " TO_STRING(TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE) "]\n" " --compression Compression method. [default: lz4] [available: lz4, lz4hc, zstd, none]\n" @@ -189,10 +189,10 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) keeper.setStorageVersion(DB::STORAGE_FORMAT_V2); break; default: - throw DB::Exception(fmt::format("invalid dmfile version: {}", args.version)); + throw DB::Exception(fmt::format("invalid dtfile version: {}", args.version)); } - LOG_FMT_INFO(logger, "creating new dmfile"); + LOG_FMT_INFO(logger, "creating new dtfile"); auto new_file = DB::DM::DMFile::create(args.file_id, keeper.migration_temp_dir.path(), false, std::move(option)); LOG_FMT_INFO(logger, "creating input stream"); @@ -212,7 +212,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) auto stat_iter = src_file->pack_stats.begin(); auto properties_iter = src_file->pack_properties.property().begin(); size_t counter = 0; - // iterate all blocks and rewrite them to new dmfile + // iterate all blocks and rewrite them to new dtfile while (auto block = input_stream->read()) { LOG_FMT_INFO(logger, "migrating block {} ( size: {} )", counter++, block.bytes()); @@ -296,7 +296,7 @@ int migrateEntry(const std::vector & opts, RaftStoreFFIFunc ffi_fun args.version = vm["version"].as(); if (args.version < 1 || args.version > 2) { - std::cerr << "invalid dmfile version: " << args.version << std::endl; + std::cerr << "invalid dtfile version: " << args.version << std::endl; return -EINVAL; } args.no_keep = no_keep; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 79e1bc1bf8e..f1a8b64d3af 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -2,6 +2,13 @@ #include #include +#ifndef NDEBUG +#include +#include +#include +#endif + + namespace DB { namespace DM @@ -274,7 +281,17 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) { size_t bytes_written = 0; - +#ifndef NDEBUG + auto examine_buffer_size = [](auto & buf, auto & fp) { + if (!fp.isEncryptionEnabled()) + { + auto fd = buf.getFD(); + struct stat file_stat = {}; + ::fstat(fd, &file_stat); + assert(buf.getMaterializedBytes() == file_stat.st_size); + } + }; +#endif if (options.flags.isSingleFile()) { auto callback = [&](const IDataType::SubstreamPath & substream) { @@ -312,6 +329,10 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) const auto stream_name = DMFile::getFileNameBase(col_id, substream); auto & stream = column_streams.at(stream_name); stream->flush(); +#ifndef NDEBUG + examine_buffer_size(*stream->mark_file, *this->file_provider); + examine_buffer_size(*stream->plain_file, *this->file_provider); +#endif bytes_written += stream->getWrittenBytes(); if (stream->minmaxes) @@ -326,7 +347,7 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) write_limiter); stream->minmaxes->write(*type, buf); buf.sync(); - bytes_written += buf.getPositionInFile(); + bytes_written += buf.getMaterializedBytes(); } else { @@ -339,7 +360,10 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) dmfile->configuration->getChecksumFrameLength()); stream->minmaxes->write(*type, *buf); buf->sync(); - bytes_written += buf->getPositionInFile(); + bytes_written += buf->getMaterializedBytes(); +#ifndef NDEBUG + examine_buffer_size(*buf, *this->file_provider); +#endif } } }; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index 49c20f7ca70..a4a5c481eb5 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -82,7 +82,7 @@ class DMFileWriter // Get written bytes of `plain_file` && `mark_file`. Should be called after `flush`. // Note that this class don't take responsible for serializing `minmaxes`, // bytes of `minmaxes` won't be counted in this method. - size_t getWrittenBytes() { return plain_file->getPositionInFile() + mark_file->getPositionInFile(); } + size_t getWrittenBytes() const { return plain_file->getMaterializedBytes() + mark_file->getMaterializedBytes(); } // compressed_buf -> plain_file WriteBufferFromFileBasePtr plain_file; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index afa8fcf1a6d..8f2b867c929 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -108,37 +108,12 @@ void PageDirectory::apply(PageEntriesEdit && edit) std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline UInt64 last_sequence = sequence.load(); - // stage 1, get the entry to be ref auto snap = createSnapshot(); - for (auto & r : edit.getRecords()) - { - // Set the version of inserted entries - r.version = PageVersionType(last_sequence + 1); - - if (r.type != WriteBatch::WriteType::REF) - { - continue; - } - auto iter = mvcc_table_directory.find(r.ori_page_id); - if (iter == mvcc_table_directory.end()) - { - throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR); - } - if (auto entry = iter->second->getEntry(last_sequence); entry) - { - // copy the entry to be ref - r.entry = *entry; - } - else - { - throw Exception(fmt::format("Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1), ErrorCodes::LOGICAL_ERROR); - } - } - // stage 2, persisted the changes to WAL + // stage 1, persisted the changes to WAL // wal.apply(edit); - // stage 3, create entry version list for pageId. nothing need to be rollback + // stage 2, create entry version list for pageId. nothing need to be rollback std::unordered_map> updating_locks; std::vector updating_pages; updating_pages.reserve(edit.size()); @@ -163,7 +138,7 @@ void PageDirectory::apply(PageEntriesEdit && edit) updating_pages.emplace_back(iter->second); } - // stage 4, there are no rollback since we already persist `edit` to WAL, just ignore error if any + // stage 3, there are no rollback since we already persist `edit` to WAL, just ignore error if any const auto & records = edit.getRecords(); for (size_t idx = 0; idx < records.size(); ++idx) { @@ -173,11 +148,31 @@ void PageDirectory::apply(PageEntriesEdit && edit) case WriteBatch::WriteType::PUT: [[fallthrough]]; case WriteBatch::WriteType::UPSERT: - [[fallthrough]]; + updating_pages[idx]->createNewVersion(last_sequence + 1, r.entry); + break; case WriteBatch::WriteType::REF: { - // Put/upsert/ref all should append a new version for this page - updating_pages[idx]->createNewVersion(last_sequence + 1, r.entry); + // We can't handle `REF` before other writes, because `PUT` and `REF` + // maybe in the same WriteBatch. + // Also we can't throw an exception if we can't find the origin page_id, + // because WAL have already applied the change and there is no + // mechanism to roll back changes in the WAL. + auto iter = mvcc_table_directory.find(r.ori_page_id); + if (iter == mvcc_table_directory.end()) + { + LOG_FMT_WARNING(log, "Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1); + break; + } + + if (auto entry = iter->second->getEntry(last_sequence); entry) + { + // copy the entry to be ref + updating_pages[idx]->createNewVersion(last_sequence + 1, *entry); + } + else + { + LOG_FMT_WARNING(log, "Trying to add ref from {} to non-exist {} with sequence={}", r.page_id, r.ori_page_id, last_sequence + 1); + } break; } case WriteBatch::WriteType::DEL: diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index 0ef63a29e0b..e1d601a634d 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -96,7 +96,6 @@ class PageEntriesEdit PageId page_id; PageId ori_page_id; PageEntryV3 entry; - PageVersionType version; }; using EditRecords = std::vector; diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index d039adcdf23..94a8099bcdc 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -608,6 +608,7 @@ try { PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -615,14 +616,17 @@ try dir.apply(std::move(edit)); } - { // Ref 3-> 999 + { // Ref 4-> 999 PageEntriesEdit edit; - edit.ref(3, 999); - ASSERT_THROW({ dir.apply(std::move(edit)); }, DB::Exception); + edit.put(3, entry3); + edit.ref(4, 999); + dir.apply(std::move(edit)); } auto snap1 = dir.createSnapshot(); + EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); - EXPECT_ENTRY_NOT_EXIST(dir, 3, snap1); + EXPECT_ENTRY_EQ(entry3, dir, 3, snap1); + EXPECT_ENTRY_NOT_EXIST(dir, 4, snap1); // TODO: restore, invalid ref page is filtered } diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index ad9b6c183d1..187e550dcd4 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -391,6 +391,59 @@ void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::strin Errors::DeltaTree::Internal); pool.dt_file_path_map.emplace(file_id, index); pool.main_path_infos[index].file_size_map.emplace(file_id, file_size); + +#ifndef NDEBUG + try + { + auto dmf_path = fmt::format("{}/stable/dmf_{}", path, file_id); + Poco::File dmf_file = {dmf_path}; + if (dmf_file.isFile()) + { + LOG_FMT_DEBUG( + pool.log, + "added new dtfile. [id={}] [path={}] [real_size={}] [reported_size={}]", + file_id, + path, + dmf_file.getSize(), + file_size); + } + else + { + size_t size_sum = 0; + auto get_folder_size = [](const Poco::File & target, size_t & counter) -> void { + auto get_folder_size_impl = [](const Poco::File & inner_target, size_t & inner_counter, auto & self) -> void { + std::vector files; + inner_target.list(files); + for (auto & i : files) + { + if (i.isFile()) + { + inner_counter += i.getSize(); + } + else + { + self(i, inner_counter, self); + } + } + }; + get_folder_size_impl(target, counter, get_folder_size_impl); + }; + get_folder_size(dmf_file, size_sum); + LOG_FMT_DEBUG( + pool.log, + "added new dtfile. [id={}] [path={}] [real_size={}] [reported_size={}]", + file_id, + path, + size_sum, + file_size); + } + } + catch (const Poco::Exception & exp) + { + LOG_FMT_WARNING(pool.log, "failed to get real size info for dtfile. [id={}] [path={}] [err={}]", file_id, path, exp.displayText()); + } +#endif + // update global used size pool.global_capacity->addUsedSize(path, file_size); } diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index d023ab7067a..0f884626407 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -551,7 +551,7 @@ void registerStorageLog(StorageFactory & factory) args.data_path, args.table_name, args.columns, - args.context.getSettings().max_compress_block_size); + args.context.getSettingsRef().max_compress_block_size); }); } diff --git a/dbms/src/Storages/StorageStripeLog.cpp b/dbms/src/Storages/StorageStripeLog.cpp index de1a09d87eb..a58f8530ab1 100644 --- a/dbms/src/Storages/StorageStripeLog.cpp +++ b/dbms/src/Storages/StorageStripeLog.cpp @@ -300,7 +300,7 @@ void registerStorageStripeLog(StorageFactory & factory) args.table_name, args.columns, args.attach, - args.context.getSettings().max_compress_block_size); + args.context.getSettingsRef().max_compress_block_size); }); } diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 5562291665c..6aef31c265c 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -386,7 +386,7 @@ void registerStorageTinyLog(StorageFactory & factory) args.table_name, args.columns, args.attach, - args.context.getSettings().max_compress_block_size); + args.context.getSettingsRef().max_compress_block_size); }); } diff --git a/dbms/src/TableFunctions/TableFunctionFactory.cpp b/dbms/src/TableFunctions/TableFunctionFactory.cpp index ce50f0ba966..ca4c32469df 100644 --- a/dbms/src/TableFunctions/TableFunctionFactory.cpp +++ b/dbms/src/TableFunctions/TableFunctionFactory.cpp @@ -24,7 +24,7 @@ TableFunctionPtr TableFunctionFactory::get( const std::string & name, const Context & context) const { - if (context.getSettings().readonly == 1) /** For example, for readonly = 2 - allowed. */ + if (context.getSettingsRef().readonly == 1) /** For example, for readonly = 2 - allowed. */ throw Exception("Table functions are forbidden in readonly mode", ErrorCodes::READONLY); auto it = functions.find(name); diff --git a/tests/fullstack-test/expr/sqrt.test b/tests/fullstack-test/expr/sqrt.test index 847e29069a2..2cf39bbe07c 100644 --- a/tests/fullstack-test/expr/sqrt.test +++ b/tests/fullstack-test/expr/sqrt.test @@ -6,11 +6,18 @@ mysql> insert into test.t values(1.21, 1.44, 0.25, 4) func> wait_table test t -mysql> select /*+ AGG_TO_COP(), READ_FROM_STORAGE(TIFLASH[t]) */ sum(sqrt(ld)), sum(sqrt(bd)), sum(sqrt(f)), sum(sqrt(i)) from test.t; +mysql> set @@tidb_enforce_mpp=on; select /*+ AGG_TO_COP(), READ_FROM_STORAGE(TIFLASH[t]) */ sum(sqrt(ld)), sum(sqrt(bd)), sum(sqrt(f)), sum(sqrt(i)) from test.t; +---------------+---------------+--------------+--------------+ | sum(sqrt(ld)) | sum(sqrt(bd)) | sum(sqrt(f)) | sum(sqrt(i)) | +---------------+---------------+--------------+--------------+ | 1.1 | 1.2 | 0.5 | 2 | +---------------+---------------+--------------+--------------+ -mysql> drop table if exists test.t \ No newline at end of file +mysql> set @@tidb_enforce_mpp=on; select /*+ READ_FROM_STORAGE(TIFLASH[t]) */ sqrt(f * -1.0) as v from test.t; ++------+ +| v | ++------+ +| NULL | ++------+ + +mysql> drop table if exists test.t diff --git a/tests/fullstack-test/expr/strcmp.test b/tests/fullstack-test/expr/strcmp.test index 660785d250c..b90d241b50a 100644 --- a/tests/fullstack-test/expr/strcmp.test +++ b/tests/fullstack-test/expr/strcmp.test @@ -4,16 +4,6 @@ mysql> alter table test.cmp set tiflash replica 1 func> wait_table test cmp -mysql> set @@tidb_enforce_mpp = 1; explain select strcmp(a, b) from test.cmp; -+---------------------------+----------+--------------+---------------+------------------------------------------+ -| id | estRows | task | access object | operator info | -+---------------------------+----------+--------------+---------------+------------------------------------------+ -| TableReader_11 | 10000.00 | root | | data:ExchangeSender_10 | -| └─ExchangeSender_10 | 10000.00 | cop[tiflash] | | ExchangeType: PassThrough | -| └─Projection_4 | 10000.00 | cop[tiflash] | | strcmp(test.cmp.a, test.cmp.b)->Column#4 | -| └─TableFullScan_9 | 10000.00 | cop[tiflash] | table:cmp | keep order:false, stats:pseudo | -+---------------------------+----------+--------------+---------------+------------------------------------------+ - mysql> set @@tidb_enforce_mpp = 1; select strcmp(a, b) from test.cmp; +--------------+ | strcmp(a, b) |