From 3bb3cb99b3038b940d5989e68407bed9e56c64e1 Mon Sep 17 00:00:00 2001 From: FelixYBW Date: Wed, 8 Dec 2021 22:12:34 +0800 Subject: [PATCH 1/7] update readme --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index d21007e46aba..4946c82f4233 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,15 @@ Not all the operators and functions are added. Our initial plan is to pass TPCH Operator stat info is pretty useful to understand Spark's execution status. With this design we can only collect info for transform operator which is a combination of operators. We need to find ways to send native operators' stat info to Spark driver. +Memory management is an essential feature in Spark. It requires each (large) memory allocation need to register in executor's memory management service. We implemented this in Gazelle-plugin. We will port the function to Gazelle-jni but need to define a set of APIs the native library can call back. + +Spark's RDD cache is columnar batch based. In Gazelle-plugin, we use the arrow record batch directly without any memcpy. We can build the same functionality in Gazelle-JNI. + +pyspark support needs to be ported as well. If input data format is Arrow, we can send the data to pyspark directly. No memcpy + +UDF support. We need to create the interface which use columnar batch as inptu. So customer can port their current UDF to columnar batch based. If it's Arrow record batch based, user can utilize Arrow's JAVA or C++ API to implement their UDF, debug without Spark, then register to Spark. + +Ideally if all native library can return arrow record batch, we can share much features in Spark's JVM. Spark already have Apache arrow dependency, we can make arrow format as Spark's basic columnar format. The problem is that native library may not be 100% compitable with Arrow format, then there will be a transform between their native format and Arrow, usually it's not cheap. # Contact From d78c22ef62867497af20c62fb016bcb448989c6b Mon Sep 17 00:00:00 2001 From: FelixYBW Date: Thu, 9 Dec 2021 13:28:06 +0800 Subject: [PATCH 2/7] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4946c82f4233..d140ac77de0a 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ Not all the operators and functions are added. Our initial plan is to pass TPCH Operator stat info is pretty useful to understand Spark's execution status. With this design we can only collect info for transform operator which is a combination of operators. We need to find ways to send native operators' stat info to Spark driver. -Memory management is an essential feature in Spark. It requires each (large) memory allocation need to register in executor's memory management service. We implemented this in Gazelle-plugin. We will port the function to Gazelle-jni but need to define a set of APIs the native library can call back. +Memory management is an essential feature in Spark. It's even more important to Spark Native runtime. Unlike JVM, not all OOM error can be captured which leads to ugly segment fault error in strange places like JNI call. We need to register each (large) memory allocation in executor's memory management service. Spark can control the memory used in each task. We implemented this in Gazelle-plugin. We will port the function to Gazelle-jni but need to define a set of APIs the native library can call back. Spark's RDD cache is columnar batch based. In Gazelle-plugin, we use the arrow record batch directly without any memcpy. We can build the same functionality in Gazelle-JNI. From c6d2399f0ba868f34ef458c082f569e9cd5a121a Mon Sep 17 00:00:00 2001 From: weiting-chen Date: Tue, 14 Dec 2021 09:45:28 +0800 Subject: [PATCH 3/7] Update SECURITY.md --- SECURITY.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 SECURITY.md diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 000000000000..eb482d90983d --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,12 @@ +# Security Policy + +## Report a Vulnerability + +Please report security issues or vulnerabilities to the [Intel® Security Center]. + +For more information on how Intel® works to resolve security issues, see +[Vulnerability Handling Guidelines]. + +[Intel® Security Center]:https://www.intel.com/security + +[Vulnerability Handling Guidelines]:https://www.intel.com/content/www/us/en/security-center/vulnerability-handling-guidelines.html From 45b888514569b0ceb102cc1d7731fa66d25aa504 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Thu, 9 Dec 2021 13:28:59 +0800 Subject: [PATCH 4/7] format conversion --- .../columnar/ColumnarGuardRule.scala | 4 + native-sql-engine/cpp/src/jni/jni_common.h | 2 - native-sql-engine/cpp/src/jni/jni_wrapper.cc | 2 +- .../cpp/src/proto/substrait_utils.cc | 119 ++++++++++++++---- .../cpp/src/proto/substrait_utils.h | 1 - 5 files changed, 101 insertions(+), 27 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index ab47dd8cd93e..ae3295ca8364 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -283,6 +283,10 @@ case class TransformGuardRule() extends Rule[SparkPlan] { // insertRowGuardRecursive(plan) case plan if !tryConvertToTransformer(plan) => insertRowGuard(plan) + // FIXME: A tmp workaround for single Agg + case a: HashAggregateExec + if !a.child.isInstanceOf[ProjectExec] && !a.child.isInstanceOf[FilterExec] => + insertRowGuard(a) case p: BroadcastQueryStageExec => p case other => diff --git a/native-sql-engine/cpp/src/jni/jni_common.h b/native-sql-engine/cpp/src/jni/jni_common.h index f276eb9ca7a4..f8c2ec933627 100644 --- a/native-sql-engine/cpp/src/jni/jni_common.h +++ b/native-sql-engine/cpp/src/jni/jni_common.h @@ -345,10 +345,8 @@ arrow::Status ParseSubstraitPlan( env->ReleaseByteArrayElements(exprs_arr, exprs_bytes, JNI_ABORT); return arrow::Status::UnknownError("Unable to parse"); } - std::cout << "Start to parse to plan" << std::endl; auto parser = std::make_shared(); parser->ParsePlan(ws_plan); - std::cout << "Finish parsing to plan" << std::endl; *out_iter = parser->getResIter(); return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index b54a9e8802dd..75a0c59000ed 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -353,7 +353,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWi // Get the ws iter gandiva::ExpressionVector ws_expr_vector; gandiva::FieldVector ws_ret_types; - std::cout << "start to parse" << std::endl; + // std::cout << "start to parse" << std::endl; std::shared_ptr> res_iter; msg = ParseSubstraitPlan(env, ws_exprs_arr, &ws_expr_vector, &ws_ret_types, &res_iter); if (!msg.ok()) { diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.cc b/native-sql-engine/cpp/src/proto/substrait_utils.cc index 366a22b7f0f1..ab751289b794 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.cc +++ b/native-sql-engine/cpp/src/proto/substrait_utils.cc @@ -17,6 +17,12 @@ #include "substrait_utils.h" +#include +#include +#include +#include +#include + namespace substrait = io::substrait; using namespace facebook::velox; using namespace facebook::velox::exec; @@ -25,7 +31,7 @@ using namespace facebook::velox::connector; using namespace facebook::velox::dwio::common; SubstraitParser::SubstraitParser() { - std::cout << "construct SubstraitParser" << std::endl; + // std::cout << "construct SubstraitParser" << std::endl; if (!initialized) { initialized = true; // Setup @@ -57,7 +63,7 @@ void SubstraitParser::ParseLiteral(const substrait::Expression::Literal& slit) { switch (slit.literal_type_case()) { case substrait::Expression_Literal::LiteralTypeCase::kFp64: { double val = slit.fp64(); - std::cout << "double lit: " << val << std::endl; + // std::cout << "double lit: " << val << std::endl; break; } case substrait::Expression_Literal::LiteralTypeCase::kBoolean: { @@ -77,7 +83,7 @@ void SubstraitParser::ParseScalarFunction( } auto function_id = sfunc.id().id(); auto function_name = FindFunction(function_id); - std::cout << "function_name: " << function_name << std::endl; + // std::cout << "function_name: " << function_name << std::endl; auto out_type = sfunc.output_type(); ParseType(out_type); } @@ -87,7 +93,7 @@ void SubstraitParser::ParseReferenceSegment(const ::substrait::ReferenceSegment& case substrait::ReferenceSegment::ReferenceTypeCase::kStructField: { auto sfield = sref.struct_field(); auto field_id = sfield.field(); - std::cout << "field_id: " << field_id << std::endl; + // std::cout << "field_id: " << field_id << std::endl; break; } default: @@ -104,7 +110,7 @@ void SubstraitParser::ParseFieldReference(const substrait::FieldReference& sfiel break; } case substrait::FieldReference::ReferenceTypeCase::kMaskedReference: { - std::cout << "not supported" << std::endl; + // std::cout << "not supported" << std::endl; break; } default: @@ -204,7 +210,7 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg, for (auto& grouping : groupings) { auto grouping_fields = grouping.input_fields(); for (auto& grouping_field : grouping_fields) { - std::cout << "Agg grouping_field: " << grouping_field << std::endl; + // std::cout << "Agg grouping_field: " << grouping_field << std::endl; } } // Parse measures @@ -220,7 +226,7 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg, break; } auto function_id = aggFunction.id().id(); - std::cout << "Agg Function id: " << function_id << std::endl; + // std::cout << "Agg Function id: " << function_id << std::endl; auto args = aggFunction.args(); for (auto arg : args) { ParseExpression(arg); @@ -228,7 +234,7 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg, } auto agg_phase = sagg.phase(); // Parse Input and Output types - std::cout << "Agg input and output:" << std::endl; + // std::cout << "Agg input and output:" << std::endl; for (auto& stype : sagg.input_types()) { ParseType(stype); } @@ -305,7 +311,7 @@ void SubstraitParser::ParseReadRel(const substrait::ReadRel& sread, velox_type_list.push_back(GetVeloxType(sub_type->type)); } auto& sfilter = sread.filter(); - std::cout << "filter pushdown: " << std::endl; + // std::cout << "filter pushdown: " << std::endl; ParseExpression(sfilter); hive::SubfieldFilters filters; filters[common::Subfield(col_name_list[3])] = std::make_unique( @@ -356,7 +362,7 @@ void SubstraitParser::ParsePlan(const substrait::Plan& splan) { auto id = sfmap.function_id().id(); auto name = sfmap.name(); functions_map_[id] = name; - std::cout << "Function id: " << id << ", name: " << name << std::endl; + // std::cout << "Function id: " << id << ", name: " << name << std::endl; } for (auto& srel : splan.relations()) { ParseRel(srel); @@ -421,29 +427,91 @@ class SubstraitParser::WholeStageResultIterator } bool HasNext() override { - std::vector result; - addSplits_(cursor_->task().get()); - while (cursor_->moveNext()) { - result.push_back(cursor_->current()); - addSplits_(cursor_->task().get()); + if (!may_has_next_) { + return false; } - int64_t num_rows = 0; - for (auto outputVector : result) { - // std::cout << "out size: " << outputVector->size() << std::endl; - num_rows += outputVector->size(); - for (size_t i = 0; i < outputVector->size(); ++i) { - std::cout << outputVector->toString(i) << std::endl; + if (num_rows_ > 0) { + return true; + } else { + addSplits_(cursor_->task().get()); + if (cursor_->moveNext()) { + auto current_vec = cursor_->current(); + result_.push_back(current_vec); + num_rows_ += current_vec->size(); + return true; + } else { + may_has_next_ = false; + return false; } } - std::cout << "num_rows: " << num_rows << std::endl; - return false; + } + + arrow::Status CopyBuffer(const uint8_t* from, int64_t length, + std::shared_ptr* out) { + ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(8 * length, memory_pool_)); + uint8_t* buffer_data = (*out)->mutable_data(); + std::memcpy(buffer_data, from, 8 * length); + // double val = *(double*)buffer_data; + // std::cout << "buffler val: " << val << std::endl; + return arrow::Status::OK(); } arrow::Status Next(std::shared_ptr* out) override { + addSplits_(cursor_->task().get()); + while (num_rows_ < batch_size_ && cursor_->moveNext()) { + auto current_vec = cursor_->current(); + result_.push_back(current_vec); + num_rows_ += current_vec->size(); + // FIXME: need to control the batch size to be exactly the same as the setted one. + if (num_rows_ < batch_size_) { + addSplits_(cursor_->task().get()); + } + } + // Fake RowVector to Arrow RecordBatch + std::vector> out_arrays; + for (auto row_vec : result_) { + // FIXME: only one-col case is considered + auto vec_type = row_vec->childAt(0)->type(); + auto vec = row_vec->childAt(0)->as>(); + const uint8_t* raw_result = vec->rawValues(); + std::optional null_count = vec->getNullCount(); + if (null_count) { + int32_t vec_null_count = *null_count; + const uint64_t* rawNulls = vec->rawNulls(); + // FIXME: handle BitMap + } + int32_t vec_length = vec->size(); + + arrow::ArrayData out_data; + out_data.buffers.resize(2); + out_data.length = vec_length; + out_data.type = arrow::float64(); + out_data.null_count = 0; + // out_data.child_data.resize(1); + // for (auto& data : out_data.child_data) { + // data = std::make_shared(); + // } + CopyBuffer(raw_result, vec_length, &out_data.buffers[1]); + std::shared_ptr out_array = + MakeArray(std::make_shared(std::move(out_data))); + // auto typed_array = + // std::dynamic_pointer_cast(out_array); + // for (int i = 0; i < typed_array->length(); i++) { + // double val = typed_array->GetView(i); + // std::cout << "val: " << val << std::endl; + // } + out_arrays.push_back(out_array); + } + auto batch_len = (num_rows_ > batch_size_) ? batch_size_ : num_rows_; + std::vector> ret_types = { + arrow::field("res", arrow::float64())}; + *out = arrow::RecordBatch::Make(arrow::schema(ret_types), batch_len, out_arrays); + num_rows_ = (num_rows_ > batch_size_) ? (num_rows_ - batch_size_) : (num_rows_ - batch_len); return arrow::Status::OK(); } private: + arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool(); std::shared_ptr plan_builder_; std::unique_ptr cursor_; std::vector splits_; @@ -454,4 +522,9 @@ class SubstraitParser::WholeStageResultIterator std::vector paths_; std::vector starts_; std::vector lengths_; + // FIXME: use the setted one + uint64_t batch_size_ = 10000; + uint64_t num_rows_ = 0; + bool may_has_next_ = true; + std::vector result_; }; diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.h b/native-sql-engine/cpp/src/proto/substrait_utils.h index d4fd23325a23..5b2f9587529a 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.h +++ b/native-sql-engine/cpp/src/proto/substrait_utils.h @@ -25,7 +25,6 @@ #include "selection.pb.h" #include "type.pb.h" #include "type_expressions.pb.h" - #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/common/Options.h" From de54c7e51c028cf982b303d05685e3f27cfd7050 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Fri, 10 Dec 2021 18:35:35 +0800 Subject: [PATCH 5/7] support more general cases when converting data format --- .../cpp/src/proto/substrait_utils.cc | 117 ++++++++++++------ 1 file changed, 81 insertions(+), 36 deletions(-) diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.cc b/native-sql-engine/cpp/src/proto/substrait_utils.cc index ab751289b794..fbd69109f3ec 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.cc +++ b/native-sql-engine/cpp/src/proto/substrait_utils.cc @@ -446,11 +446,10 @@ class SubstraitParser::WholeStageResultIterator } } - arrow::Status CopyBuffer(const uint8_t* from, int64_t length, - std::shared_ptr* out) { - ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(8 * length, memory_pool_)); - uint8_t* buffer_data = (*out)->mutable_data(); - std::memcpy(buffer_data, from, 8 * length); + arrow::Status CopyBuffer(const uint8_t* from, uint8_t* to, int64_t copy_bytes) { + // ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(size * length, memory_pool_)); + // uint8_t* buffer_data = (*out)->mutable_data(); + std::memcpy(to, from, copy_bytes); // double val = *(double*)buffer_data; // std::cout << "buffler val: " << val << std::endl; return arrow::Status::OK(); @@ -462,51 +461,95 @@ class SubstraitParser::WholeStageResultIterator auto current_vec = cursor_->current(); result_.push_back(current_vec); num_rows_ += current_vec->size(); - // FIXME: need to control the batch size to be exactly the same as the setted one. + // If num_rows_ > batch_size_, the last RowVector needs to be sliced. + // In this way, the batch size can be exactly the same as the setting. if (num_rows_ < batch_size_) { addSplits_(cursor_->task().get()); } } - // Fake RowVector to Arrow RecordBatch + // Convert RowVector to Arrow RecordBatch + auto batch_len = (num_rows_ > batch_size_) ? batch_size_ : num_rows_; + // FIXME: only one-col case is considered + auto col_num = 1; std::vector> out_arrays; - for (auto row_vec : result_) { - // FIXME: only one-col case is considered - auto vec_type = row_vec->childAt(0)->type(); - auto vec = row_vec->childAt(0)->as>(); - const uint8_t* raw_result = vec->rawValues(); - std::optional null_count = vec->getNullCount(); - if (null_count) { - int32_t vec_null_count = *null_count; - const uint64_t* rawNulls = vec->rawNulls(); - // FIXME: handle BitMap - } - int32_t vec_length = vec->size(); - + for (int idx = 0; idx < col_num; idx++) { arrow::ArrayData out_data; - out_data.buffers.resize(2); - out_data.length = vec_length; out_data.type = arrow::float64(); - out_data.null_count = 0; - // out_data.child_data.resize(1); - // for (auto& data : out_data.child_data) { - // data = std::make_shared(); - // } - CopyBuffer(raw_result, vec_length, &out_data.buffers[1]); + out_data.buffers.resize(2); + out_data.length = batch_len; + auto bytes = sizeof(double); + ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], + AllocateBuffer(bytes * batch_len, memory_pool_)); + // FIXME: allocate null bitmap + uint64_t array_null_count = 0; + uint64_t current_len = 0; + uint8_t* buffer_data = (out_data.buffers[1])->mutable_data(); + if (in_offset_ > 0 && last_rv_) { + // Last RV has unconsumed rows, need to consume them first + auto vec = last_rv_->childAt(idx)->as>(); + std::optional null_count = vec->getNullCount(); + if (null_count) { + int32_t vec_null_count = *null_count; + array_null_count += vec_null_count; + const uint64_t* rawNulls = vec->rawNulls(); + // FIXME: copy BitMap + } + const uint8_t* raw_result = vec->rawValues(); + int32_t vec_length = vec->size() - in_offset_; + if (current_len + vec_length <= batch_size_) { + CopyBuffer(raw_result + bytes * in_offset_, buffer_data + bytes * current_len, + bytes * vec_length); + current_len += vec_length; + // Last RV is totally consumed. + last_rv_ = nullptr; + in_offset_ = 0; + } else { + // Only part of this RowVector will be copied. + auto needed_length = batch_size_ - current_len; + CopyBuffer(raw_result + bytes * in_offset_, buffer_data + bytes * current_len, + bytes * needed_length); + in_offset_ += needed_length; + current_len += needed_length; + } + } + for (auto row_vec : result_) { + // FIXME: use vec_type to infer the used types + auto vec_type = row_vec->childAt(idx)->type(); + auto vec = row_vec->childAt(idx)->as>(); + std::optional null_count = vec->getNullCount(); + if (null_count) { + int32_t vec_null_count = *null_count; + array_null_count += vec_null_count; + const uint64_t* rawNulls = vec->rawNulls(); + // FIXME: copy BitMap + } + const uint8_t* raw_result = vec->rawValues(); + int32_t vec_length = vec->size(); + if (current_len + vec_length <= batch_size_) { + CopyBuffer(raw_result, buffer_data + bytes * current_len, bytes * vec_length); + current_len += vec_length; + } else { + // Only part of this RowVector will be copied. + auto needed_length = batch_size_ - current_len; + CopyBuffer(raw_result, buffer_data + bytes * current_len, + bytes * needed_length); + in_offset_ = needed_length; + current_len += needed_length; + last_rv_ = row_vec; + } + } + // FIXME: array_null_count can be wrong when the slicing of a RowVector is + // required. + out_data.null_count = array_null_count; std::shared_ptr out_array = MakeArray(std::make_shared(std::move(out_data))); - // auto typed_array = - // std::dynamic_pointer_cast(out_array); - // for (int i = 0; i < typed_array->length(); i++) { - // double val = typed_array->GetView(i); - // std::cout << "val: " << val << std::endl; - // } out_arrays.push_back(out_array); } - auto batch_len = (num_rows_ > batch_size_) ? batch_size_ : num_rows_; std::vector> ret_types = { arrow::field("res", arrow::float64())}; *out = arrow::RecordBatch::Make(arrow::schema(ret_types), batch_len, out_arrays); - num_rows_ = (num_rows_ > batch_size_) ? (num_rows_ - batch_size_) : (num_rows_ - batch_len); + num_rows_ = + (num_rows_ > batch_size_) ? (num_rows_ - batch_size_) : (num_rows_ - batch_len); return arrow::Status::OK(); } @@ -527,4 +570,6 @@ class SubstraitParser::WholeStageResultIterator uint64_t num_rows_ = 0; bool may_has_next_ = true; std::vector result_; + RowVectorPtr last_rv_ = nullptr; + uint64_t in_offset_ = 0; }; From 18a9935d576a2971fc70b1d0938e58bda098f0ab Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Mon, 13 Dec 2021 16:18:38 +0800 Subject: [PATCH 6/7] rebase --- native-sql-engine/cpp/src/CMakeLists.txt | 11 +++++++++-- native-sql-engine/cpp/src/proto/substrait_utils.cc | 3 +-- native-sql-engine/cpp/src/proto/substrait_utils.h | 3 +-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index 40022a93b240..ba9a15e1e76c 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -47,7 +47,8 @@ set(VELOX_DWIO_DWRF_READER_LIB_PATH "${VELOX_BUILD_PATH}/dwio/dwrf/reader/libvel set(VELOX_EXTERNAL_DUCKDB_LIB_PATH "${VELOX_BUILD_PATH}/external/duckdb/libduckdb.a") set(VELOX_DUCKDB_PARSER_LIB_PATH "${VELOX_BUILD_PATH}/duckdb/conversion/libvelox_duckdb_parser.a") set(VELOX_DUCKDB_CONVERSION_LIB_PATH "${VELOX_BUILD_PATH}/duckdb/conversion/libvelox_duckdb_conversion.a") -set(VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/libvelox_functions_prestosql.a") +set(VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/registration/libvelox_functions_prestosql.a") +set(VELOX_FUNCTIONS_PRESTOSQL_IMPL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/libvelox_functions_prestosql_impl.a") set(VELOX_FUNCTIONS_PRESTOSQL_JSON_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/json/libvelox_functions_json.a") set(VELOX_FUNCTIONS_PRESTOSQL_HYPERLOGLOG_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/hyperloglog/libvelox_functions_hyperloglog.a") set(VELOX_FUNCTIONS_PRESTOSQL_AGG_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/aggregates/libvelox_aggregates.a") @@ -477,6 +478,7 @@ macro(build_velox_exec) add_library(facebook::velox::duckdb::parser STATIC IMPORTED) add_library(facebook::velox::duckdb::conversion STATIC IMPORTED) add_library(facebook::velox::functions::prestosql STATIC IMPORTED) + add_library(facebook::velox::functions::prestosql::impl STATIC IMPORTED) add_library(facebook::velox::functions::json STATIC IMPORTED) add_library(facebook::velox::functions::hyperloglog STATIC IMPORTED) add_library(facebook::velox::functions::prestosql::agg STATIC IMPORTED) @@ -623,6 +625,10 @@ macro(build_velox_exec) PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH}" INTERFACE_INCLUDE_DIRECTORIES "${BINARY_RELEASE_DIR}/include") + set_target_properties(facebook::velox::functions::prestosql::impl + PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_IMPL_LIB_PATH}" + INTERFACE_INCLUDE_DIRECTORIES + "${BINARY_RELEASE_DIR}/include") set_target_properties(facebook::velox::functions::json PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_JSON_LIB_PATH}" INTERFACE_INCLUDE_DIRECTORIES @@ -660,10 +666,10 @@ macro(build_velox_exec) LINK_PUBLIC facebook::velox::connector::hive::part LINK_PUBLIC facebook::velox::dwio::dwrf::proto LINK_PUBLIC facebook::velox::functions::prestosql + LINK_PUBLIC facebook::velox::functions::prestosql::impl LINK_PUBLIC facebook::velox::functions::json LINK_PUBLIC facebook::velox::functions::hyperloglog LINK_PUBLIC facebook::velox::functions::lib - LINK_PUBLIC facebook::velox::expression LINK_PUBLIC facebook::velox::vector LINK_PUBLIC facebook::velox::exec::test::util LINK_PUBLIC facebook::velox::parse::parser @@ -673,6 +679,7 @@ macro(build_velox_exec) LINK_PUBLIC facebook::velox::common::process LINK_PUBLIC facebook::velox::common::base::exception LINK_PUBLIC facebook::velox::connector::hive + LINK_PUBLIC facebook::velox::expression LINK_PUBLIC facebook::velox::dwio::dwrf::writer LINK_PUBLIC facebook::velox::dwio::dwrf::reader LINK_PUBLIC facebook::velox::dwio::dwrf::common diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.cc b/native-sql-engine/cpp/src/proto/substrait_utils.cc index fbd69109f3ec..838dee9b8aac 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.cc +++ b/native-sql-engine/cpp/src/proto/substrait_utils.cc @@ -45,8 +45,7 @@ SubstraitParser::SubstraitParser() { registerConnector(hiveConnector); dwrf::registerDwrfReaderFactory(); // Register Velox functions - functions::registerFunctions(); - functions::registerVectorFunctions(); + functions::prestosql::registerAllFunctions(); aggregate::registerSumAggregate("sum"); } } diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.h b/native-sql-engine/cpp/src/proto/substrait_utils.h index 5b2f9587529a..40bd43b3fd83 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.h +++ b/native-sql-engine/cpp/src/proto/substrait_utils.h @@ -31,8 +31,7 @@ #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/functions/prestosql/SimpleFunctions.h" -#include "velox/functions/prestosql/VectorFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/functions/prestosql/aggregates/SumAggregate.h" using namespace facebook::velox; From e9febfda682c04947f281ed65a155b68e7192fa6 Mon Sep 17 00:00:00 2001 From: Rui Mo Date: Mon, 13 Dec 2021 20:18:40 +0800 Subject: [PATCH 7/7] remove memcopy --- native-sql-engine/cpp/src/CMakeLists.txt | 8 +- native-sql-engine/cpp/src/jni/jni_wrapper.cc | 1 + .../cpp/src/proto/substrait_utils.cc | 108 ++++-------------- .../cpp/src/proto/substrait_utils.h | 3 +- 4 files changed, 34 insertions(+), 86 deletions(-) diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index ba9a15e1e76c..2a51616c6608 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -56,6 +56,7 @@ set(VELOX_FUNCTIONS_LIB_PATH "${VELOX_BUILD_PATH}/functions/lib/libvelox_functio set(VELOX_TYPE_TZ_LIB_PATH "${VELOX_BUILD_PATH}/type/tz/libvelox_type_tz.a") set(VELOX_EXTERNAL_MD5_LIB_PATH "${VELOX_BUILD_PATH}/external/md5/libmd5.a") set(VELOX_EXPRESSION_LIB_PATH "${VELOX_BUILD_PATH}/expression/libvelox_expression.a") +set(VELOX_BUFFER_LIB_PATH "${VELOX_BUILD_PATH}/buffer/libvelox_buffer.a") set(FOLLY_LIB_PATH "/usr/local/lib/libfolly.a") set(IBERTY_LIB_PATH "/usr/lib64/libiberty.a") set(GLOG_LIB_PATH "/usr/local/lib64/libglog.so") @@ -486,6 +487,7 @@ macro(build_velox_exec) add_library(facebook::velox::type::tz STATIC IMPORTED) add_library(facebook::velox::external::md5 STATIC IMPORTED) add_library(facebook::velox::expression STATIC IMPORTED) + add_library(facebook::velox::buffer STATIC IMPORTED) add_library(folly STATIC IMPORTED) add_library(iberty STATIC IMPORTED) add_library(doubleconversion SHARED IMPORTED) @@ -657,8 +659,11 @@ macro(build_velox_exec) PROPERTIES IMPORTED_LOCATION "${VELOX_EXPRESSION_LIB_PATH}" INTERFACE_INCLUDE_DIRECTORIES "${BINARY_RELEASE_DIR}/include") + set_target_properties(facebook::velox::buffer + PROPERTIES IMPORTED_LOCATION "${VELOX_BUFFER_LIB_PATH}" + INTERFACE_INCLUDE_DIRECTORIES + "${BINARY_RELEASE_DIR}/include") - target_link_libraries(spark_columnar_jni LINK_PUBLIC facebook::velox::functions::prestosql::agg LINK_PUBLIC facebook::velox::exec @@ -695,6 +700,7 @@ macro(build_velox_exec) LINK_PUBLIC facebook::velox::core::config LINK_PUBLIC facebook::velox::type::tz LINK_PUBLIC facebook::velox::external::md5 + LINK_PUBLIC facebook::velox::buffer LINK_PUBLIC gtest LINK_PUBLIC folly LINK_PUBLIC iberty diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index 75a0c59000ed..23c910c1373a 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -384,6 +384,7 @@ JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_BatchIterator_nativeNext std::shared_ptr out; if (!iter->HasNext()) return nullptr; JniAssertOkOrThrow(iter->Next(&out), "nativeNext: get Next() failed"); + // arrow::PrettyPrint(*out.get(), 2, &std::cout); jbyteArray serialized_record_batch = JniGetOrThrow(ToBytes(env, out), "Error deserializing message"); return serialized_record_batch; diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.cc b/native-sql-engine/cpp/src/proto/substrait_utils.cc index 838dee9b8aac..197621afd2d1 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.cc +++ b/native-sql-engine/cpp/src/proto/substrait_utils.cc @@ -434,9 +434,8 @@ class SubstraitParser::WholeStageResultIterator } else { addSplits_(cursor_->task().get()); if (cursor_->moveNext()) { - auto current_vec = cursor_->current(); - result_.push_back(current_vec); - num_rows_ += current_vec->size(); + result_ = cursor_->current(); + num_rows_ += result_->size(); return true; } else { may_has_next_ = false; @@ -455,19 +454,6 @@ class SubstraitParser::WholeStageResultIterator } arrow::Status Next(std::shared_ptr* out) override { - addSplits_(cursor_->task().get()); - while (num_rows_ < batch_size_ && cursor_->moveNext()) { - auto current_vec = cursor_->current(); - result_.push_back(current_vec); - num_rows_ += current_vec->size(); - // If num_rows_ > batch_size_, the last RowVector needs to be sliced. - // In this way, the batch size can be exactly the same as the setting. - if (num_rows_ < batch_size_) { - addSplits_(cursor_->task().get()); - } - } - // Convert RowVector to Arrow RecordBatch - auto batch_len = (num_rows_ > batch_size_) ? batch_size_ : num_rows_; // FIXME: only one-col case is considered auto col_num = 1; std::vector> out_arrays; @@ -475,80 +461,36 @@ class SubstraitParser::WholeStageResultIterator arrow::ArrayData out_data; out_data.type = arrow::float64(); out_data.buffers.resize(2); - out_data.length = batch_len; - auto bytes = sizeof(double); - ARROW_ASSIGN_OR_RAISE(out_data.buffers[1], - AllocateBuffer(bytes * batch_len, memory_pool_)); - // FIXME: allocate null bitmap + out_data.length = num_rows_; + auto vec = result_->childAt(idx)->as>(); uint64_t array_null_count = 0; - uint64_t current_len = 0; - uint8_t* buffer_data = (out_data.buffers[1])->mutable_data(); - if (in_offset_ > 0 && last_rv_) { - // Last RV has unconsumed rows, need to consume them first - auto vec = last_rv_->childAt(idx)->as>(); - std::optional null_count = vec->getNullCount(); - if (null_count) { - int32_t vec_null_count = *null_count; - array_null_count += vec_null_count; - const uint64_t* rawNulls = vec->rawNulls(); - // FIXME: copy BitMap - } - const uint8_t* raw_result = vec->rawValues(); - int32_t vec_length = vec->size() - in_offset_; - if (current_len + vec_length <= batch_size_) { - CopyBuffer(raw_result + bytes * in_offset_, buffer_data + bytes * current_len, - bytes * vec_length); - current_len += vec_length; - // Last RV is totally consumed. - last_rv_ = nullptr; - in_offset_ = 0; - } else { - // Only part of this RowVector will be copied. - auto needed_length = batch_size_ - current_len; - CopyBuffer(raw_result + bytes * in_offset_, buffer_data + bytes * current_len, - bytes * needed_length); - in_offset_ += needed_length; - current_len += needed_length; - } + std::optional null_count = vec->getNullCount(); + std::shared_ptr val_buffer = nullptr; + if (null_count) { + int32_t vec_null_count = *null_count; + array_null_count += vec_null_count; + const uint64_t* rawNulls = vec->rawNulls(); + // FIXME: set BitMap } - for (auto row_vec : result_) { - // FIXME: use vec_type to infer the used types - auto vec_type = row_vec->childAt(idx)->type(); - auto vec = row_vec->childAt(idx)->as>(); - std::optional null_count = vec->getNullCount(); - if (null_count) { - int32_t vec_null_count = *null_count; - array_null_count += vec_null_count; - const uint64_t* rawNulls = vec->rawNulls(); - // FIXME: copy BitMap - } - const uint8_t* raw_result = vec->rawValues(); - int32_t vec_length = vec->size(); - if (current_len + vec_length <= batch_size_) { - CopyBuffer(raw_result, buffer_data + bytes * current_len, bytes * vec_length); - current_len += vec_length; - } else { - // Only part of this RowVector will be copied. - auto needed_length = batch_size_ - current_len; - CopyBuffer(raw_result, buffer_data + bytes * current_len, - bytes * needed_length); - in_offset_ = needed_length; - current_len += needed_length; - last_rv_ = row_vec; - } - } - // FIXME: array_null_count can be wrong when the slicing of a RowVector is - // required. out_data.null_count = array_null_count; + uint8_t* raw_result = vec->mutableRawValues(); + auto bytes = sizeof(double); + auto data_buffer = std::make_shared(raw_result, bytes * num_rows_); + out_data.buffers[0] = val_buffer; + out_data.buffers[1] = data_buffer; std::shared_ptr out_array = MakeArray(std::make_shared(std::move(out_data))); out_arrays.push_back(out_array); + // int ref_count = vec->mutableValues(0)->refCount(); } + // auto typed_array = std::dynamic_pointer_cast(out_arrays[0]); + // for (int i = 0; i < typed_array->length(); i++) { + // std::cout << "array val: " << typed_array->GetView(i) << std::endl; + // } std::vector> ret_types = { arrow::field("res", arrow::float64())}; - *out = arrow::RecordBatch::Make(arrow::schema(ret_types), batch_len, out_arrays); - num_rows_ = - (num_rows_ > batch_size_) ? (num_rows_ - batch_size_) : (num_rows_ - batch_len); + *out = arrow::RecordBatch::Make(arrow::schema(ret_types), num_rows_, out_arrays); + num_rows_ = 0; return arrow::Status::OK(); } @@ -568,7 +510,5 @@ class SubstraitParser::WholeStageResultIterator uint64_t batch_size_ = 10000; uint64_t num_rows_ = 0; bool may_has_next_ = true; - std::vector result_; - RowVectorPtr last_rv_ = nullptr; - uint64_t in_offset_ = 0; + RowVectorPtr result_; }; diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.h b/native-sql-engine/cpp/src/proto/substrait_utils.h index 40bd43b3fd83..bd69062b1b28 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.h +++ b/native-sql-engine/cpp/src/proto/substrait_utils.h @@ -25,14 +25,15 @@ #include "selection.pb.h" #include "type.pb.h" #include "type_expressions.pb.h" +#include "velox/buffer/Buffer.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/functions/prestosql/registration/RegistrationFunctions.h" #include "velox/functions/prestosql/aggregates/SumAggregate.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" using namespace facebook::velox; using namespace facebook::velox::exec;