diff --git a/README.md b/README.md index d21007e46aba..d140ac77de0a 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,15 @@ Not all the operators and functions are added. Our initial plan is to pass TPCH Operator stat info is pretty useful to understand Spark's execution status. With this design we can only collect info for transform operator which is a combination of operators. We need to find ways to send native operators' stat info to Spark driver. +Memory management is an essential feature in Spark. It's even more important to Spark Native runtime. Unlike JVM, not all OOM error can be captured which leads to ugly segment fault error in strange places like JNI call. We need to register each (large) memory allocation in executor's memory management service. Spark can control the memory used in each task. We implemented this in Gazelle-plugin. We will port the function to Gazelle-jni but need to define a set of APIs the native library can call back. + +Spark's RDD cache is columnar batch based. In Gazelle-plugin, we use the arrow record batch directly without any memcpy. We can build the same functionality in Gazelle-JNI. + +pyspark support needs to be ported as well. If input data format is Arrow, we can send the data to pyspark directly. No memcpy + +UDF support. We need to create the interface which use columnar batch as inptu. So customer can port their current UDF to columnar batch based. If it's Arrow record batch based, user can utilize Arrow's JAVA or C++ API to implement their UDF, debug without Spark, then register to Spark. + +Ideally if all native library can return arrow record batch, we can share much features in Spark's JVM. Spark already have Apache arrow dependency, we can make arrow format as Spark's basic columnar format. The problem is that native library may not be 100% compitable with Arrow format, then there will be a transform between their native format and Arrow, usually it's not cheap. # Contact diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 000000000000..eb482d90983d --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,12 @@ +# Security Policy + +## Report a Vulnerability + +Please report security issues or vulnerabilities to the [Intel® Security Center]. + +For more information on how Intel® works to resolve security issues, see +[Vulnerability Handling Guidelines]. + +[Intel® Security Center]:https://www.intel.com/security + +[Vulnerability Handling Guidelines]:https://www.intel.com/content/www/us/en/security-center/vulnerability-handling-guidelines.html diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala index ab47dd8cd93e..ae3295ca8364 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/columnar/ColumnarGuardRule.scala @@ -283,6 +283,10 @@ case class TransformGuardRule() extends Rule[SparkPlan] { // insertRowGuardRecursive(plan) case plan if !tryConvertToTransformer(plan) => insertRowGuard(plan) + // FIXME: A tmp workaround for single Agg + case a: HashAggregateExec + if !a.child.isInstanceOf[ProjectExec] && !a.child.isInstanceOf[FilterExec] => + insertRowGuard(a) case p: BroadcastQueryStageExec => p case other => diff --git a/native-sql-engine/cpp/src/CMakeLists.txt b/native-sql-engine/cpp/src/CMakeLists.txt index 40022a93b240..2a51616c6608 100644 --- a/native-sql-engine/cpp/src/CMakeLists.txt +++ b/native-sql-engine/cpp/src/CMakeLists.txt @@ -47,7 +47,8 @@ set(VELOX_DWIO_DWRF_READER_LIB_PATH "${VELOX_BUILD_PATH}/dwio/dwrf/reader/libvel set(VELOX_EXTERNAL_DUCKDB_LIB_PATH "${VELOX_BUILD_PATH}/external/duckdb/libduckdb.a") set(VELOX_DUCKDB_PARSER_LIB_PATH "${VELOX_BUILD_PATH}/duckdb/conversion/libvelox_duckdb_parser.a") set(VELOX_DUCKDB_CONVERSION_LIB_PATH "${VELOX_BUILD_PATH}/duckdb/conversion/libvelox_duckdb_conversion.a") -set(VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/libvelox_functions_prestosql.a") +set(VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/registration/libvelox_functions_prestosql.a") +set(VELOX_FUNCTIONS_PRESTOSQL_IMPL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/libvelox_functions_prestosql_impl.a") set(VELOX_FUNCTIONS_PRESTOSQL_JSON_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/json/libvelox_functions_json.a") set(VELOX_FUNCTIONS_PRESTOSQL_HYPERLOGLOG_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/hyperloglog/libvelox_functions_hyperloglog.a") set(VELOX_FUNCTIONS_PRESTOSQL_AGG_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/aggregates/libvelox_aggregates.a") @@ -55,6 +56,7 @@ set(VELOX_FUNCTIONS_LIB_PATH "${VELOX_BUILD_PATH}/functions/lib/libvelox_functio set(VELOX_TYPE_TZ_LIB_PATH "${VELOX_BUILD_PATH}/type/tz/libvelox_type_tz.a") set(VELOX_EXTERNAL_MD5_LIB_PATH "${VELOX_BUILD_PATH}/external/md5/libmd5.a") set(VELOX_EXPRESSION_LIB_PATH "${VELOX_BUILD_PATH}/expression/libvelox_expression.a") +set(VELOX_BUFFER_LIB_PATH "${VELOX_BUILD_PATH}/buffer/libvelox_buffer.a") set(FOLLY_LIB_PATH "/usr/local/lib/libfolly.a") set(IBERTY_LIB_PATH "/usr/lib64/libiberty.a") set(GLOG_LIB_PATH "/usr/local/lib64/libglog.so") @@ -477,6 +479,7 @@ macro(build_velox_exec) add_library(facebook::velox::duckdb::parser STATIC IMPORTED) add_library(facebook::velox::duckdb::conversion STATIC IMPORTED) add_library(facebook::velox::functions::prestosql STATIC IMPORTED) + add_library(facebook::velox::functions::prestosql::impl STATIC IMPORTED) add_library(facebook::velox::functions::json STATIC IMPORTED) add_library(facebook::velox::functions::hyperloglog STATIC IMPORTED) add_library(facebook::velox::functions::prestosql::agg STATIC IMPORTED) @@ -484,6 +487,7 @@ macro(build_velox_exec) add_library(facebook::velox::type::tz STATIC IMPORTED) add_library(facebook::velox::external::md5 STATIC IMPORTED) add_library(facebook::velox::expression STATIC IMPORTED) + add_library(facebook::velox::buffer STATIC IMPORTED) add_library(folly STATIC IMPORTED) add_library(iberty STATIC IMPORTED) add_library(doubleconversion SHARED IMPORTED) @@ -623,6 +627,10 @@ macro(build_velox_exec) PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH}" INTERFACE_INCLUDE_DIRECTORIES "${BINARY_RELEASE_DIR}/include") + set_target_properties(facebook::velox::functions::prestosql::impl + PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_IMPL_LIB_PATH}" + INTERFACE_INCLUDE_DIRECTORIES + "${BINARY_RELEASE_DIR}/include") set_target_properties(facebook::velox::functions::json PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_JSON_LIB_PATH}" INTERFACE_INCLUDE_DIRECTORIES @@ -651,8 +659,11 @@ macro(build_velox_exec) PROPERTIES IMPORTED_LOCATION "${VELOX_EXPRESSION_LIB_PATH}" INTERFACE_INCLUDE_DIRECTORIES "${BINARY_RELEASE_DIR}/include") + set_target_properties(facebook::velox::buffer + PROPERTIES IMPORTED_LOCATION "${VELOX_BUFFER_LIB_PATH}" + INTERFACE_INCLUDE_DIRECTORIES + "${BINARY_RELEASE_DIR}/include") - target_link_libraries(spark_columnar_jni LINK_PUBLIC facebook::velox::functions::prestosql::agg LINK_PUBLIC facebook::velox::exec @@ -660,10 +671,10 @@ macro(build_velox_exec) LINK_PUBLIC facebook::velox::connector::hive::part LINK_PUBLIC facebook::velox::dwio::dwrf::proto LINK_PUBLIC facebook::velox::functions::prestosql + LINK_PUBLIC facebook::velox::functions::prestosql::impl LINK_PUBLIC facebook::velox::functions::json LINK_PUBLIC facebook::velox::functions::hyperloglog LINK_PUBLIC facebook::velox::functions::lib - LINK_PUBLIC facebook::velox::expression LINK_PUBLIC facebook::velox::vector LINK_PUBLIC facebook::velox::exec::test::util LINK_PUBLIC facebook::velox::parse::parser @@ -673,6 +684,7 @@ macro(build_velox_exec) LINK_PUBLIC facebook::velox::common::process LINK_PUBLIC facebook::velox::common::base::exception LINK_PUBLIC facebook::velox::connector::hive + LINK_PUBLIC facebook::velox::expression LINK_PUBLIC facebook::velox::dwio::dwrf::writer LINK_PUBLIC facebook::velox::dwio::dwrf::reader LINK_PUBLIC facebook::velox::dwio::dwrf::common @@ -688,6 +700,7 @@ macro(build_velox_exec) LINK_PUBLIC facebook::velox::core::config LINK_PUBLIC facebook::velox::type::tz LINK_PUBLIC facebook::velox::external::md5 + LINK_PUBLIC facebook::velox::buffer LINK_PUBLIC gtest LINK_PUBLIC folly LINK_PUBLIC iberty diff --git a/native-sql-engine/cpp/src/jni/jni_common.h b/native-sql-engine/cpp/src/jni/jni_common.h index f276eb9ca7a4..f8c2ec933627 100644 --- a/native-sql-engine/cpp/src/jni/jni_common.h +++ b/native-sql-engine/cpp/src/jni/jni_common.h @@ -345,10 +345,8 @@ arrow::Status ParseSubstraitPlan( env->ReleaseByteArrayElements(exprs_arr, exprs_bytes, JNI_ABORT); return arrow::Status::UnknownError("Unable to parse"); } - std::cout << "Start to parse to plan" << std::endl; auto parser = std::make_shared(); parser->ParsePlan(ws_plan); - std::cout << "Finish parsing to plan" << std::endl; *out_iter = parser->getResIter(); return arrow::Status::OK(); } diff --git a/native-sql-engine/cpp/src/jni/jni_wrapper.cc b/native-sql-engine/cpp/src/jni/jni_wrapper.cc index b54a9e8802dd..23c910c1373a 100644 --- a/native-sql-engine/cpp/src/jni/jni_wrapper.cc +++ b/native-sql-engine/cpp/src/jni/jni_wrapper.cc @@ -353,7 +353,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWi // Get the ws iter gandiva::ExpressionVector ws_expr_vector; gandiva::FieldVector ws_ret_types; - std::cout << "start to parse" << std::endl; + // std::cout << "start to parse" << std::endl; std::shared_ptr> res_iter; msg = ParseSubstraitPlan(env, ws_exprs_arr, &ws_expr_vector, &ws_ret_types, &res_iter); if (!msg.ok()) { @@ -384,6 +384,7 @@ JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_BatchIterator_nativeNext std::shared_ptr out; if (!iter->HasNext()) return nullptr; JniAssertOkOrThrow(iter->Next(&out), "nativeNext: get Next() failed"); + // arrow::PrettyPrint(*out.get(), 2, &std::cout); jbyteArray serialized_record_batch = JniGetOrThrow(ToBytes(env, out), "Error deserializing message"); return serialized_record_batch; diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.cc b/native-sql-engine/cpp/src/proto/substrait_utils.cc index 366a22b7f0f1..197621afd2d1 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.cc +++ b/native-sql-engine/cpp/src/proto/substrait_utils.cc @@ -17,6 +17,12 @@ #include "substrait_utils.h" +#include +#include +#include +#include +#include + namespace substrait = io::substrait; using namespace facebook::velox; using namespace facebook::velox::exec; @@ -25,7 +31,7 @@ using namespace facebook::velox::connector; using namespace facebook::velox::dwio::common; SubstraitParser::SubstraitParser() { - std::cout << "construct SubstraitParser" << std::endl; + // std::cout << "construct SubstraitParser" << std::endl; if (!initialized) { initialized = true; // Setup @@ -39,8 +45,7 @@ SubstraitParser::SubstraitParser() { registerConnector(hiveConnector); dwrf::registerDwrfReaderFactory(); // Register Velox functions - functions::registerFunctions(); - functions::registerVectorFunctions(); + functions::prestosql::registerAllFunctions(); aggregate::registerSumAggregate("sum"); } } @@ -57,7 +62,7 @@ void SubstraitParser::ParseLiteral(const substrait::Expression::Literal& slit) { switch (slit.literal_type_case()) { case substrait::Expression_Literal::LiteralTypeCase::kFp64: { double val = slit.fp64(); - std::cout << "double lit: " << val << std::endl; + // std::cout << "double lit: " << val << std::endl; break; } case substrait::Expression_Literal::LiteralTypeCase::kBoolean: { @@ -77,7 +82,7 @@ void SubstraitParser::ParseScalarFunction( } auto function_id = sfunc.id().id(); auto function_name = FindFunction(function_id); - std::cout << "function_name: " << function_name << std::endl; + // std::cout << "function_name: " << function_name << std::endl; auto out_type = sfunc.output_type(); ParseType(out_type); } @@ -87,7 +92,7 @@ void SubstraitParser::ParseReferenceSegment(const ::substrait::ReferenceSegment& case substrait::ReferenceSegment::ReferenceTypeCase::kStructField: { auto sfield = sref.struct_field(); auto field_id = sfield.field(); - std::cout << "field_id: " << field_id << std::endl; + // std::cout << "field_id: " << field_id << std::endl; break; } default: @@ -104,7 +109,7 @@ void SubstraitParser::ParseFieldReference(const substrait::FieldReference& sfiel break; } case substrait::FieldReference::ReferenceTypeCase::kMaskedReference: { - std::cout << "not supported" << std::endl; + // std::cout << "not supported" << std::endl; break; } default: @@ -204,7 +209,7 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg, for (auto& grouping : groupings) { auto grouping_fields = grouping.input_fields(); for (auto& grouping_field : grouping_fields) { - std::cout << "Agg grouping_field: " << grouping_field << std::endl; + // std::cout << "Agg grouping_field: " << grouping_field << std::endl; } } // Parse measures @@ -220,7 +225,7 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg, break; } auto function_id = aggFunction.id().id(); - std::cout << "Agg Function id: " << function_id << std::endl; + // std::cout << "Agg Function id: " << function_id << std::endl; auto args = aggFunction.args(); for (auto arg : args) { ParseExpression(arg); @@ -228,7 +233,7 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg, } auto agg_phase = sagg.phase(); // Parse Input and Output types - std::cout << "Agg input and output:" << std::endl; + // std::cout << "Agg input and output:" << std::endl; for (auto& stype : sagg.input_types()) { ParseType(stype); } @@ -305,7 +310,7 @@ void SubstraitParser::ParseReadRel(const substrait::ReadRel& sread, velox_type_list.push_back(GetVeloxType(sub_type->type)); } auto& sfilter = sread.filter(); - std::cout << "filter pushdown: " << std::endl; + // std::cout << "filter pushdown: " << std::endl; ParseExpression(sfilter); hive::SubfieldFilters filters; filters[common::Subfield(col_name_list[3])] = std::make_unique( @@ -356,7 +361,7 @@ void SubstraitParser::ParsePlan(const substrait::Plan& splan) { auto id = sfmap.function_id().id(); auto name = sfmap.name(); functions_map_[id] = name; - std::cout << "Function id: " << id << ", name: " << name << std::endl; + // std::cout << "Function id: " << id << ", name: " << name << std::endl; } for (auto& srel : splan.relations()) { ParseRel(srel); @@ -421,29 +426,76 @@ class SubstraitParser::WholeStageResultIterator } bool HasNext() override { - std::vector result; - addSplits_(cursor_->task().get()); - while (cursor_->moveNext()) { - result.push_back(cursor_->current()); - addSplits_(cursor_->task().get()); + if (!may_has_next_) { + return false; } - int64_t num_rows = 0; - for (auto outputVector : result) { - // std::cout << "out size: " << outputVector->size() << std::endl; - num_rows += outputVector->size(); - for (size_t i = 0; i < outputVector->size(); ++i) { - std::cout << outputVector->toString(i) << std::endl; + if (num_rows_ > 0) { + return true; + } else { + addSplits_(cursor_->task().get()); + if (cursor_->moveNext()) { + result_ = cursor_->current(); + num_rows_ += result_->size(); + return true; + } else { + may_has_next_ = false; + return false; } } - std::cout << "num_rows: " << num_rows << std::endl; - return false; + } + + arrow::Status CopyBuffer(const uint8_t* from, uint8_t* to, int64_t copy_bytes) { + // ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(size * length, memory_pool_)); + // uint8_t* buffer_data = (*out)->mutable_data(); + std::memcpy(to, from, copy_bytes); + // double val = *(double*)buffer_data; + // std::cout << "buffler val: " << val << std::endl; + return arrow::Status::OK(); } arrow::Status Next(std::shared_ptr* out) override { + // FIXME: only one-col case is considered + auto col_num = 1; + std::vector> out_arrays; + for (int idx = 0; idx < col_num; idx++) { + arrow::ArrayData out_data; + out_data.type = arrow::float64(); + out_data.buffers.resize(2); + out_data.length = num_rows_; + auto vec = result_->childAt(idx)->as>(); + uint64_t array_null_count = 0; + std::optional null_count = vec->getNullCount(); + std::shared_ptr val_buffer = nullptr; + if (null_count) { + int32_t vec_null_count = *null_count; + array_null_count += vec_null_count; + const uint64_t* rawNulls = vec->rawNulls(); + // FIXME: set BitMap + } + out_data.null_count = array_null_count; + uint8_t* raw_result = vec->mutableRawValues(); + auto bytes = sizeof(double); + auto data_buffer = std::make_shared(raw_result, bytes * num_rows_); + out_data.buffers[0] = val_buffer; + out_data.buffers[1] = data_buffer; + std::shared_ptr out_array = + MakeArray(std::make_shared(std::move(out_data))); + out_arrays.push_back(out_array); + // int ref_count = vec->mutableValues(0)->refCount(); + } + // auto typed_array = std::dynamic_pointer_cast(out_arrays[0]); + // for (int i = 0; i < typed_array->length(); i++) { + // std::cout << "array val: " << typed_array->GetView(i) << std::endl; + // } + std::vector> ret_types = { + arrow::field("res", arrow::float64())}; + *out = arrow::RecordBatch::Make(arrow::schema(ret_types), num_rows_, out_arrays); + num_rows_ = 0; return arrow::Status::OK(); } private: + arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool(); std::shared_ptr plan_builder_; std::unique_ptr cursor_; std::vector splits_; @@ -454,4 +506,9 @@ class SubstraitParser::WholeStageResultIterator std::vector paths_; std::vector starts_; std::vector lengths_; + // FIXME: use the setted one + uint64_t batch_size_ = 10000; + uint64_t num_rows_ = 0; + bool may_has_next_ = true; + RowVectorPtr result_; }; diff --git a/native-sql-engine/cpp/src/proto/substrait_utils.h b/native-sql-engine/cpp/src/proto/substrait_utils.h index d4fd23325a23..bd69062b1b28 100644 --- a/native-sql-engine/cpp/src/proto/substrait_utils.h +++ b/native-sql-engine/cpp/src/proto/substrait_utils.h @@ -25,16 +25,15 @@ #include "selection.pb.h" #include "type.pb.h" #include "type_expressions.pb.h" - +#include "velox/buffer/Buffer.h" #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/common/Options.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/functions/prestosql/SimpleFunctions.h" -#include "velox/functions/prestosql/VectorFunctions.h" #include "velox/functions/prestosql/aggregates/SumAggregate.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" using namespace facebook::velox; using namespace facebook::velox::exec;