Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GJ-4] makeTPC-H Q6 runnable with Velox's calculation #5

Merged
merged 7 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ Not all the operators and functions are added. Our initial plan is to pass TPCH

Operator stat info is pretty useful to understand Spark's execution status. With this design we can only collect info for transform operator which is a combination of operators. We need to find ways to send native operators' stat info to Spark driver.

Memory management is an essential feature in Spark. It's even more important to Spark Native runtime. Unlike JVM, not all OOM error can be captured which leads to ugly segment fault error in strange places like JNI call. We need to register each (large) memory allocation in executor's memory management service. Spark can control the memory used in each task. We implemented this in Gazelle-plugin. We will port the function to Gazelle-jni but need to define a set of APIs the native library can call back.

Spark's RDD cache is columnar batch based. In Gazelle-plugin, we use the arrow record batch directly without any memcpy. We can build the same functionality in Gazelle-JNI.

pyspark support needs to be ported as well. If input data format is Arrow, we can send the data to pyspark directly. No memcpy

UDF support. We need to create the interface which use columnar batch as inptu. So customer can port their current UDF to columnar batch based. If it's Arrow record batch based, user can utilize Arrow's JAVA or C++ API to implement their UDF, debug without Spark, then register to Spark.

Ideally if all native library can return arrow record batch, we can share much features in Spark's JVM. Spark already have Apache arrow dependency, we can make arrow format as Spark's basic columnar format. The problem is that native library may not be 100% compitable with Arrow format, then there will be a transform between their native format and Arrow, usually it's not cheap.

# Contact

Expand Down
12 changes: 12 additions & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Security Policy

## Report a Vulnerability

Please report security issues or vulnerabilities to the [Intel® Security Center].

For more information on how Intel® works to resolve security issues, see
[Vulnerability Handling Guidelines].

[Intel® Security Center]:https://www.intel.com/security

[Vulnerability Handling Guidelines]:https://www.intel.com/content/www/us/en/security-center/vulnerability-handling-guidelines.html
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ case class TransformGuardRule() extends Rule[SparkPlan] {
// insertRowGuardRecursive(plan)
case plan if !tryConvertToTransformer(plan) =>
insertRowGuard(plan)
// FIXME: A tmp workaround for single Agg
case a: HashAggregateExec
if !a.child.isInstanceOf[ProjectExec] && !a.child.isInstanceOf[FilterExec] =>
insertRowGuard(a)
case p: BroadcastQueryStageExec =>
p
case other =>
Expand Down
19 changes: 16 additions & 3 deletions native-sql-engine/cpp/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ set(VELOX_DWIO_DWRF_READER_LIB_PATH "${VELOX_BUILD_PATH}/dwio/dwrf/reader/libvel
set(VELOX_EXTERNAL_DUCKDB_LIB_PATH "${VELOX_BUILD_PATH}/external/duckdb/libduckdb.a")
set(VELOX_DUCKDB_PARSER_LIB_PATH "${VELOX_BUILD_PATH}/duckdb/conversion/libvelox_duckdb_parser.a")
set(VELOX_DUCKDB_CONVERSION_LIB_PATH "${VELOX_BUILD_PATH}/duckdb/conversion/libvelox_duckdb_conversion.a")
set(VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/libvelox_functions_prestosql.a")
set(VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/registration/libvelox_functions_prestosql.a")
set(VELOX_FUNCTIONS_PRESTOSQL_IMPL_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/libvelox_functions_prestosql_impl.a")
set(VELOX_FUNCTIONS_PRESTOSQL_JSON_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/json/libvelox_functions_json.a")
set(VELOX_FUNCTIONS_PRESTOSQL_HYPERLOGLOG_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/hyperloglog/libvelox_functions_hyperloglog.a")
set(VELOX_FUNCTIONS_PRESTOSQL_AGG_LIB_PATH "${VELOX_BUILD_PATH}/functions/prestosql/aggregates/libvelox_aggregates.a")
set(VELOX_FUNCTIONS_LIB_PATH "${VELOX_BUILD_PATH}/functions/lib/libvelox_functions_lib.a")
set(VELOX_TYPE_TZ_LIB_PATH "${VELOX_BUILD_PATH}/type/tz/libvelox_type_tz.a")
set(VELOX_EXTERNAL_MD5_LIB_PATH "${VELOX_BUILD_PATH}/external/md5/libmd5.a")
set(VELOX_EXPRESSION_LIB_PATH "${VELOX_BUILD_PATH}/expression/libvelox_expression.a")
set(VELOX_BUFFER_LIB_PATH "${VELOX_BUILD_PATH}/buffer/libvelox_buffer.a")
set(FOLLY_LIB_PATH "/usr/local/lib/libfolly.a")
set(IBERTY_LIB_PATH "/usr/lib64/libiberty.a")
set(GLOG_LIB_PATH "/usr/local/lib64/libglog.so")
Expand Down Expand Up @@ -477,13 +479,15 @@ macro(build_velox_exec)
add_library(facebook::velox::duckdb::parser STATIC IMPORTED)
add_library(facebook::velox::duckdb::conversion STATIC IMPORTED)
add_library(facebook::velox::functions::prestosql STATIC IMPORTED)
add_library(facebook::velox::functions::prestosql::impl STATIC IMPORTED)
add_library(facebook::velox::functions::json STATIC IMPORTED)
add_library(facebook::velox::functions::hyperloglog STATIC IMPORTED)
add_library(facebook::velox::functions::prestosql::agg STATIC IMPORTED)
add_library(facebook::velox::functions::lib STATIC IMPORTED)
add_library(facebook::velox::type::tz STATIC IMPORTED)
add_library(facebook::velox::external::md5 STATIC IMPORTED)
add_library(facebook::velox::expression STATIC IMPORTED)
add_library(facebook::velox::buffer STATIC IMPORTED)
add_library(folly STATIC IMPORTED)
add_library(iberty STATIC IMPORTED)
add_library(doubleconversion SHARED IMPORTED)
Expand Down Expand Up @@ -623,6 +627,10 @@ macro(build_velox_exec)
PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_LIB_PATH}"
INTERFACE_INCLUDE_DIRECTORIES
"${BINARY_RELEASE_DIR}/include")
set_target_properties(facebook::velox::functions::prestosql::impl
PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_IMPL_LIB_PATH}"
INTERFACE_INCLUDE_DIRECTORIES
"${BINARY_RELEASE_DIR}/include")
set_target_properties(facebook::velox::functions::json
PROPERTIES IMPORTED_LOCATION "${VELOX_FUNCTIONS_PRESTOSQL_JSON_LIB_PATH}"
INTERFACE_INCLUDE_DIRECTORIES
Expand Down Expand Up @@ -651,19 +659,22 @@ macro(build_velox_exec)
PROPERTIES IMPORTED_LOCATION "${VELOX_EXPRESSION_LIB_PATH}"
INTERFACE_INCLUDE_DIRECTORIES
"${BINARY_RELEASE_DIR}/include")
set_target_properties(facebook::velox::buffer
PROPERTIES IMPORTED_LOCATION "${VELOX_BUFFER_LIB_PATH}"
INTERFACE_INCLUDE_DIRECTORIES
"${BINARY_RELEASE_DIR}/include")


target_link_libraries(spark_columnar_jni
LINK_PUBLIC facebook::velox::functions::prestosql::agg
LINK_PUBLIC facebook::velox::exec
LINK_PUBLIC facebook::velox::connector
LINK_PUBLIC facebook::velox::connector::hive::part
LINK_PUBLIC facebook::velox::dwio::dwrf::proto
LINK_PUBLIC facebook::velox::functions::prestosql
LINK_PUBLIC facebook::velox::functions::prestosql::impl
LINK_PUBLIC facebook::velox::functions::json
LINK_PUBLIC facebook::velox::functions::hyperloglog
LINK_PUBLIC facebook::velox::functions::lib
LINK_PUBLIC facebook::velox::expression
LINK_PUBLIC facebook::velox::vector
LINK_PUBLIC facebook::velox::exec::test::util
LINK_PUBLIC facebook::velox::parse::parser
Expand All @@ -673,6 +684,7 @@ macro(build_velox_exec)
LINK_PUBLIC facebook::velox::common::process
LINK_PUBLIC facebook::velox::common::base::exception
LINK_PUBLIC facebook::velox::connector::hive
LINK_PUBLIC facebook::velox::expression
LINK_PUBLIC facebook::velox::dwio::dwrf::writer
LINK_PUBLIC facebook::velox::dwio::dwrf::reader
LINK_PUBLIC facebook::velox::dwio::dwrf::common
Expand All @@ -688,6 +700,7 @@ macro(build_velox_exec)
LINK_PUBLIC facebook::velox::core::config
LINK_PUBLIC facebook::velox::type::tz
LINK_PUBLIC facebook::velox::external::md5
LINK_PUBLIC facebook::velox::buffer
LINK_PUBLIC gtest
LINK_PUBLIC folly
LINK_PUBLIC iberty
Expand Down
2 changes: 0 additions & 2 deletions native-sql-engine/cpp/src/jni/jni_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,8 @@ arrow::Status ParseSubstraitPlan(
env->ReleaseByteArrayElements(exprs_arr, exprs_bytes, JNI_ABORT);
return arrow::Status::UnknownError("Unable to parse");
}
std::cout << "Start to parse to plan" << std::endl;
auto parser = std::make_shared<SubstraitParser>();
parser->ParsePlan(ws_plan);
std::cout << "Finish parsing to plan" << std::endl;
*out_iter = parser->getResIter();
return arrow::Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion native-sql-engine/cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWi
// Get the ws iter
gandiva::ExpressionVector ws_expr_vector;
gandiva::FieldVector ws_ret_types;
std::cout << "start to parse" << std::endl;
// std::cout << "start to parse" << std::endl;
std::shared_ptr<ResultIterator<arrow::RecordBatch>> res_iter;
msg = ParseSubstraitPlan(env, ws_exprs_arr, &ws_expr_vector, &ws_ret_types, &res_iter);
if (!msg.ok()) {
Expand Down Expand Up @@ -384,6 +384,7 @@ JNIEXPORT jobject JNICALL Java_com_intel_oap_vectorized_BatchIterator_nativeNext
std::shared_ptr<arrow::RecordBatch> out;
if (!iter->HasNext()) return nullptr;
JniAssertOkOrThrow(iter->Next(&out), "nativeNext: get Next() failed");
// arrow::PrettyPrint(*out.get(), 2, &std::cout);
jbyteArray serialized_record_batch =
JniGetOrThrow(ToBytes(env, out), "Error deserializing message");
return serialized_record_batch;
Expand Down
107 changes: 82 additions & 25 deletions native-sql-engine/cpp/src/proto/substrait_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

#include "substrait_utils.h"

#include <arrow/array/array_primitive.h>
#include <arrow/array/data.h>
#include <arrow/array/util.h>
#include <arrow/record_batch.h>
#include <arrow/type_fwd.h>

namespace substrait = io::substrait;
using namespace facebook::velox;
using namespace facebook::velox::exec;
Expand All @@ -25,7 +31,7 @@ using namespace facebook::velox::connector;
using namespace facebook::velox::dwio::common;

SubstraitParser::SubstraitParser() {
std::cout << "construct SubstraitParser" << std::endl;
// std::cout << "construct SubstraitParser" << std::endl;
if (!initialized) {
initialized = true;
// Setup
Expand All @@ -39,8 +45,7 @@ SubstraitParser::SubstraitParser() {
registerConnector(hiveConnector);
dwrf::registerDwrfReaderFactory();
// Register Velox functions
functions::registerFunctions();
functions::registerVectorFunctions();
functions::prestosql::registerAllFunctions();
aggregate::registerSumAggregate<aggregate::SumAggregate>("sum");
}
}
Expand All @@ -57,7 +62,7 @@ void SubstraitParser::ParseLiteral(const substrait::Expression::Literal& slit) {
switch (slit.literal_type_case()) {
case substrait::Expression_Literal::LiteralTypeCase::kFp64: {
double val = slit.fp64();
std::cout << "double lit: " << val << std::endl;
// std::cout << "double lit: " << val << std::endl;
break;
}
case substrait::Expression_Literal::LiteralTypeCase::kBoolean: {
Expand All @@ -77,7 +82,7 @@ void SubstraitParser::ParseScalarFunction(
}
auto function_id = sfunc.id().id();
auto function_name = FindFunction(function_id);
std::cout << "function_name: " << function_name << std::endl;
// std::cout << "function_name: " << function_name << std::endl;
auto out_type = sfunc.output_type();
ParseType(out_type);
}
Expand All @@ -87,7 +92,7 @@ void SubstraitParser::ParseReferenceSegment(const ::substrait::ReferenceSegment&
case substrait::ReferenceSegment::ReferenceTypeCase::kStructField: {
auto sfield = sref.struct_field();
auto field_id = sfield.field();
std::cout << "field_id: " << field_id << std::endl;
// std::cout << "field_id: " << field_id << std::endl;
break;
}
default:
Expand All @@ -104,7 +109,7 @@ void SubstraitParser::ParseFieldReference(const substrait::FieldReference& sfiel
break;
}
case substrait::FieldReference::ReferenceTypeCase::kMaskedReference: {
std::cout << "not supported" << std::endl;
// std::cout << "not supported" << std::endl;
break;
}
default:
Expand Down Expand Up @@ -204,7 +209,7 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg,
for (auto& grouping : groupings) {
auto grouping_fields = grouping.input_fields();
for (auto& grouping_field : grouping_fields) {
std::cout << "Agg grouping_field: " << grouping_field << std::endl;
// std::cout << "Agg grouping_field: " << grouping_field << std::endl;
}
}
// Parse measures
Expand All @@ -220,15 +225,15 @@ void SubstraitParser::ParseAggregateRel(const substrait::AggregateRel& sagg,
break;
}
auto function_id = aggFunction.id().id();
std::cout << "Agg Function id: " << function_id << std::endl;
// std::cout << "Agg Function id: " << function_id << std::endl;
auto args = aggFunction.args();
for (auto arg : args) {
ParseExpression(arg);
}
}
auto agg_phase = sagg.phase();
// Parse Input and Output types
std::cout << "Agg input and output:" << std::endl;
// std::cout << "Agg input and output:" << std::endl;
for (auto& stype : sagg.input_types()) {
ParseType(stype);
}
Expand Down Expand Up @@ -305,7 +310,7 @@ void SubstraitParser::ParseReadRel(const substrait::ReadRel& sread,
velox_type_list.push_back(GetVeloxType(sub_type->type));
}
auto& sfilter = sread.filter();
std::cout << "filter pushdown: " << std::endl;
// std::cout << "filter pushdown: " << std::endl;
ParseExpression(sfilter);
hive::SubfieldFilters filters;
filters[common::Subfield(col_name_list[3])] = std::make_unique<common::DoubleRange>(
Expand Down Expand Up @@ -356,7 +361,7 @@ void SubstraitParser::ParsePlan(const substrait::Plan& splan) {
auto id = sfmap.function_id().id();
auto name = sfmap.name();
functions_map_[id] = name;
std::cout << "Function id: " << id << ", name: " << name << std::endl;
// std::cout << "Function id: " << id << ", name: " << name << std::endl;
}
for (auto& srel : splan.relations()) {
ParseRel(srel);
Expand Down Expand Up @@ -421,29 +426,76 @@ class SubstraitParser::WholeStageResultIterator
}

bool HasNext() override {
std::vector<RowVectorPtr> result;
addSplits_(cursor_->task().get());
while (cursor_->moveNext()) {
result.push_back(cursor_->current());
addSplits_(cursor_->task().get());
if (!may_has_next_) {
return false;
}
int64_t num_rows = 0;
for (auto outputVector : result) {
// std::cout << "out size: " << outputVector->size() << std::endl;
num_rows += outputVector->size();
for (size_t i = 0; i < outputVector->size(); ++i) {
std::cout << outputVector->toString(i) << std::endl;
if (num_rows_ > 0) {
return true;
} else {
addSplits_(cursor_->task().get());
if (cursor_->moveNext()) {
result_ = cursor_->current();
num_rows_ += result_->size();
return true;
} else {
may_has_next_ = false;
return false;
}
}
std::cout << "num_rows: " << num_rows << std::endl;
return false;
}

arrow::Status CopyBuffer(const uint8_t* from, uint8_t* to, int64_t copy_bytes) {
// ARROW_ASSIGN_OR_RAISE(*out, AllocateBuffer(size * length, memory_pool_));
// uint8_t* buffer_data = (*out)->mutable_data();
std::memcpy(to, from, copy_bytes);
// double val = *(double*)buffer_data;
// std::cout << "buffler val: " << val << std::endl;
return arrow::Status::OK();
}

arrow::Status Next(std::shared_ptr<arrow::RecordBatch>* out) override {
// FIXME: only one-col case is considered
auto col_num = 1;
std::vector<std::shared_ptr<arrow::Array>> out_arrays;
for (int idx = 0; idx < col_num; idx++) {
arrow::ArrayData out_data;
out_data.type = arrow::float64();
out_data.buffers.resize(2);
out_data.length = num_rows_;
auto vec = result_->childAt(idx)->as<FlatVector<double>>();
uint64_t array_null_count = 0;
std::optional<int32_t> null_count = vec->getNullCount();
std::shared_ptr<arrow::Buffer> val_buffer = nullptr;
if (null_count) {
int32_t vec_null_count = *null_count;
array_null_count += vec_null_count;
const uint64_t* rawNulls = vec->rawNulls();
// FIXME: set BitMap
}
out_data.null_count = array_null_count;
uint8_t* raw_result = vec->mutableRawValues<uint8_t>();
auto bytes = sizeof(double);
auto data_buffer = std::make_shared<arrow::Buffer>(raw_result, bytes * num_rows_);
out_data.buffers[0] = val_buffer;
out_data.buffers[1] = data_buffer;
std::shared_ptr<arrow::Array> out_array =
MakeArray(std::make_shared<arrow::ArrayData>(std::move(out_data)));
out_arrays.push_back(out_array);
// int ref_count = vec->mutableValues(0)->refCount();
}
// auto typed_array = std::dynamic_pointer_cast<arrow::DoubleArray>(out_arrays[0]);
// for (int i = 0; i < typed_array->length(); i++) {
// std::cout << "array val: " << typed_array->GetView(i) << std::endl;
// }
std::vector<std::shared_ptr<arrow::Field>> ret_types = {
arrow::field("res", arrow::float64())};
*out = arrow::RecordBatch::Make(arrow::schema(ret_types), num_rows_, out_arrays);
num_rows_ = 0;
return arrow::Status::OK();
}

private:
arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool();
std::shared_ptr<PlanBuilder> plan_builder_;
std::unique_ptr<TaskCursor> cursor_;
std::vector<exec::Split> splits_;
Expand All @@ -454,4 +506,9 @@ class SubstraitParser::WholeStageResultIterator
std::vector<std::string> paths_;
std::vector<u_int64_t> starts_;
std::vector<u_int64_t> lengths_;
// FIXME: use the setted one
uint64_t batch_size_ = 10000;
uint64_t num_rows_ = 0;
bool may_has_next_ = true;
RowVectorPtr result_;
};
5 changes: 2 additions & 3 deletions native-sql-engine/cpp/src/proto/substrait_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
#include "selection.pb.h"
#include "type.pb.h"
#include "type_expressions.pb.h"

#include "velox/buffer/Buffer.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/dwio/common/Options.h"
#include "velox/dwio/dwrf/reader/DwrfReader.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/functions/prestosql/SimpleFunctions.h"
#include "velox/functions/prestosql/VectorFunctions.h"
#include "velox/functions/prestosql/aggregates/SumAggregate.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
Expand Down