Skip to content

Commit

Permalink
[POAE7-1598] [Velox2Substrait] function convertor using yaml format (o…
Browse files Browse the repository at this point in the history
…ap-project#37)

* init commit of function registry

* code refactor

* code structure re-org

* partial implementation for function lookup and function collector

* complete the function signature lookup development

* parse substrait extension yaml files as SubstraitExtension

* parse substrait unknown type

* parse substrait unknown type

* parse substrait unknown type

* add if/switch test case

* revert change for FilterProjectTest.cpp

* revert CMakeLists.txt

* typo

* code refactor  for function lookup

* remove virtual

* move defintion of  SubstraitFunction outside of SubstraitExtension.h

* revert typeUtils

* add comments

* code style tidy

* optimize code structure of  SubstraitType.h

* add test case for type lookup

* code style fix

* consolidate code of function variant decode & code refactor

* revert change of CMakeLists.txt

* use VELOX_NYI to throw exception instead of using std runtime directly

* revert CMakeLists.txt

* use VELOX_NYI to throw exception instead of using std runtime directly

* add lookup scalar function and aggregate function by provided reference and extension

* support lookup function with signature and function mappings

* revert format change for CMakeLists.txt

* revert format change for CMakeLists.txt

* rollback change of velox/CMakeLists.txt

* comments provided for SubstraitFunctionMappings.h
use ASSERT_EQ instead of ASSERT_TRUE for value comparasion

* some code style fix and class renaming

* provide the default implementation for SubstraitFunctionMappings.h

* code  refactor to make code more clean

* support signature with difference wildcard type

* support wildcard function variant with different types
  • Loading branch information
chaojun-zhang authored and ZJie1 committed Oct 18, 2022
1 parent 133bd58 commit fc28645
Show file tree
Hide file tree
Showing 51 changed files with 4,887 additions and 168 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "third_party/xsimd"]
path = third_party/xsimd
url = https://github.com/xtensor-stack/xsimd.git
[submodule "third_party/yaml-cpp"]
path = third_party/yaml-cpp
url = https://github.com/jbeder/yaml-cpp.git
3 changes: 3 additions & 0 deletions third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ if(NOT VELOX_DISABLE_GOOGLETEST)
endif()

add_subdirectory(xsimd)
set(YAML_CPP_BUILD_TESTS OFF CACHE BOOL "Enable testing")
include_directories(yaml-cpp/include)
add_subdirectory(yaml-cpp)

if(VELOX_ENABLE_ARROW)
find_package(Thrift)
Expand Down
1 change: 1 addition & 0 deletions third_party/yaml-cpp
Submodule yaml-cpp added at c90c08
2 changes: 1 addition & 1 deletion velox/substrait/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ add_library(velox_substrait_plan_converter ${SRCS})
target_include_directories(velox_substrait_plan_converter
PUBLIC ${PROTO_OUTPUT_DIR})
target_link_libraries(velox_substrait_plan_converter velox_connector
velox_dwio_dwrf_common)
velox_dwio_dwrf_common yaml-cpp)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
338 changes: 338 additions & 0 deletions velox/substrait/SubstraitExtension.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <yaml-cpp/yaml.h>
#include <sstream>

#include "velox/substrait/SubstraitExtension.h"
#include "velox/substrait/SubstraitSignature.h"

namespace YAML {

using namespace facebook::velox::substrait;

static bool decodeFunctionVariant(
const Node& node,
SubstraitFunctionVariant& function) {
auto& returnType = node["return"];
if (returnType && returnType.IsScalar()) {
/// return type can be an expression
const auto& returnExpr = returnType.as<std::string>();
std::stringstream ss(returnExpr);
std::string lastReturnType;
while (std::getline(ss, lastReturnType, '\n')) {
}
function.returnType = SubstraitType::decode(lastReturnType);
auto& args = node["args"];
if (args && args.IsSequence()) {
for (auto& arg : args) {
if (arg["options"]) { // enum argument
auto enumArgument = std::make_shared<SubstraitEnumArgument>(
arg.as<SubstraitEnumArgument>());
function.arguments.emplace_back(enumArgument);
} else if (arg["value"]) { // value argument
auto valueArgument = std::make_shared<SubstraitValueArgument>(
arg.as<SubstraitValueArgument>());
function.arguments.emplace_back(valueArgument);
} else { // type argument
auto typeArgument = std::make_shared<SubstraitTypeArgument>(
arg.as<SubstraitTypeArgument>());
function.arguments.emplace_back(typeArgument);
}
}
}
return true;
}
return false;
}

template <>
struct convert<SubstraitEnumArgument> {
static bool decode(const Node& node, SubstraitEnumArgument& argument) {
// 'options' is required property
auto& options = node["options"];
if (options && options.IsSequence()) {
auto& required = node["required"];
argument.required = required && required.as<bool>();
return true;
} else {
return false;
}
}
};

template <>
struct convert<SubstraitValueArgument> {
static bool decode(const Node& node, SubstraitValueArgument& argument) {
auto& value = node["value"];
if (value && value.IsScalar()) {
auto valueType = value.as<std::string>();
argument.type = SubstraitType::decode(valueType);
return true;
}
return false;
}
};

template <>
struct convert<SubstraitTypeArgument> {
static bool decode(const Node& node, SubstraitTypeArgument& argument) {
// no properties need to populate for type argument, just return true if
// 'type' element exists.
return node["type"];
}
};

template <>
struct convert<SubstraitScalarFunctionVariant> {
static bool decode(
const Node& node,
SubstraitScalarFunctionVariant& function) {
return decodeFunctionVariant(node, function);
};
};

template <>
struct convert<SubstraitAggregateFunctionVariant> {
static bool decode(
const Node& node,
SubstraitAggregateFunctionVariant& function) {
return decodeFunctionVariant(node, function);
}
};

template <>
struct convert<SubstraitScalarFunction> {
static bool decode(const Node& node, SubstraitScalarFunction& function) {
auto& name = node["name"];
if (name && name.IsScalar()) {
function.name = name.as<std::string>();
auto& impls = node["impls"];
if (impls && impls.IsSequence() && impls.size() > 0) {
for (auto& impl : impls) {
auto scalarFunctionVariant =
impl.as<SubstraitScalarFunctionVariant>();
scalarFunctionVariant.name = function.name;
function.impls.emplace_back(
std::make_shared<SubstraitScalarFunctionVariant>(
scalarFunctionVariant));
}
}
return true;
}
return false;
}
};

template <>
struct convert<SubstraitAggregateFunction> {
static bool decode(const Node& node, SubstraitAggregateFunction& function) {
auto& name = node["name"];
if (name && name.IsScalar()) {
function.name = name.as<std::string>();
auto& impls = node["impls"];
if (impls && impls.IsSequence() && impls.size() > 0) {
for (auto& impl : impls) {
auto aggregateFunctionVariant =
impl.as<SubstraitAggregateFunctionVariant>();
aggregateFunctionVariant.name = function.name;
function.impls.emplace_back(
std::make_shared<SubstraitAggregateFunctionVariant>(
aggregateFunctionVariant));
}
}
return true;
}
return false;
}
};

template <>
struct convert<facebook::velox::substrait::SubstraitTypeAnchor> {
static bool decode(
const Node& node,
facebook::velox::substrait::SubstraitTypeAnchor& typeAnchor) {
auto& name = node["name"];
if (name && name.IsScalar()) {
typeAnchor.name = name.as<std::string>();
return true;
}
return false;
}
};

template <>
struct convert<facebook::velox::substrait::SubstraitExtension> {
static bool decode(
const Node& node,
facebook::velox::substrait::SubstraitExtension& extension) {
auto& scalarFunctions = node["scalar_functions"];
auto& aggregateFunctions = node["aggregate_functions"];
const bool scalarFunctionsExists =
scalarFunctions && scalarFunctions.IsSequence();
const bool aggregateFunctionsExists =
aggregateFunctions && aggregateFunctions.IsSequence();
if (!scalarFunctionsExists && !aggregateFunctionsExists) {
return false;
}

if (scalarFunctionsExists) {
for (auto& scalarFunctionNode : scalarFunctions) {
const auto& scalarFunction =
scalarFunctionNode.as<SubstraitScalarFunction>();
for (auto& scalaFunctionVariant : scalarFunction.impls) {
extension.scalarFunctionVariants.emplace_back(scalaFunctionVariant);
}
}
}

if (aggregateFunctionsExists) {
for (auto& aggregateFunctionNode : aggregateFunctions) {
const auto& aggregateFunction =
aggregateFunctionNode.as<SubstraitAggregateFunction>();
for (auto& aggregateFunctionVariant : aggregateFunction.impls) {
extension.aggregateFunctionVariants.emplace_back(
aggregateFunctionVariant);
}
}
}

auto& types = node["types"];
if (types && types.IsSequence()) {
for (auto& type : types) {
auto typeAnchor = type.as<SubstraitTypeAnchor>();
extension.types.emplace_back(
std::make_shared<SubstraitTypeAnchor>(typeAnchor));
}
}

return true;
}
};

} // namespace YAML

namespace facebook::velox::substrait {

namespace {

std::string getSubstraitExtensionAbsolutePath() {
const std::string absolute_path = __FILE__;
auto const pos = absolute_path.find_last_of('/');
return absolute_path.substr(0, pos) + "/extensions/";
}

} // namespace

std::shared_ptr<SubstraitExtension> SubstraitExtension::loadExtension() {
std::vector<std::string> extensionFiles = {
"functions_aggregate_approx.yaml",
"functions_aggregate_generic.yaml",
"functions_arithmetic.yaml",
"functions_arithmetic_decimal.yaml",
"functions_boolean.yaml",
"functions_comparison.yaml",
"functions_datetime.yaml",
"functions_logarithmic.yaml",
"functions_rounding.yaml",
"functions_string.yaml",
"unknown.yaml",
};
const auto& extensionRootPath = getSubstraitExtensionAbsolutePath();
return loadExtension(extensionRootPath, extensionFiles);
}

std::shared_ptr<SubstraitExtension> SubstraitExtension::loadExtension(
const std::string& basePath,
const std::vector<std::string>& extensionFiles) {
std::vector<std::string> yamlExtensionFiles;
yamlExtensionFiles.reserve(extensionFiles.size());
for (auto& extensionFile : extensionFiles) {
auto const pos = basePath.find_last_of('/');
const auto& extensionUri = basePath.substr(0, pos) + "/" + extensionFile;
yamlExtensionFiles.emplace_back(extensionUri);
}
return loadExtension(yamlExtensionFiles);
}

std::shared_ptr<SubstraitExtension> SubstraitExtension::loadExtension(
const std::vector<std::string>& yamlExtensionFiles) {
SubstraitExtension mergedExtension;
for (const auto& extensionUri : yamlExtensionFiles) {
const auto& substraitExtension =
YAML::LoadFile(extensionUri).as<SubstraitExtension>();

for (auto& scalarFunctionVariant :
substraitExtension.scalarFunctionVariants) {
scalarFunctionVariant->uri = extensionUri;
mergedExtension.scalarFunctionVariants.emplace_back(
scalarFunctionVariant);
}

for (auto& aggregateFunctionVariant :
substraitExtension.aggregateFunctionVariants) {
aggregateFunctionVariant->uri = extensionUri;
mergedExtension.aggregateFunctionVariants.emplace_back(
aggregateFunctionVariant);
}

for (auto& type : substraitExtension.types) {
type->uri = extensionUri;
mergedExtension.types.emplace_back(type);
}
}
return std::make_shared<SubstraitExtension>(mergedExtension);
}

std::optional<SubstraitFunctionVariantPtr>
SubstraitExtension::lookupScalarFunction(const std::string& signature) const {
for (const auto& scalarFunctionVariant : scalarFunctionVariants) {
if (scalarFunctionVariant->signature() == signature) {
return std::make_optional(scalarFunctionVariant);
}
}
return std::nullopt;
}

std::optional<SubstraitFunctionVariantPtr>
SubstraitExtension::lookupAggregateFunction(
const std::string& signature) const {
for (const auto& aggregateFunctionVariant : aggregateFunctionVariants) {
if (aggregateFunctionVariant->signature() == signature) {
return std::make_optional(aggregateFunctionVariant);
}
}
return std::nullopt;
}

std::optional<SubstraitFunctionVariantPtr> SubstraitExtension::lookupFunction(
const std::string& signature) const {
const auto& function = this->lookupScalarFunction(signature);
if (!function.has_value()) {
return this->lookupAggregateFunction(signature);
}
return function;
}

std::optional<SubstraitFunctionVariantPtr> SubstraitExtension::lookupFunction(
const SubstraitFunctionMappingsPtr& functionMappings,
const std::string& signature) const {
const auto& functionSignature =
SubstraitSignature::signature(signature, functionMappings);
return this->lookupFunction(functionSignature);
}

} // namespace facebook::velox::substrait
Loading

0 comments on commit fc28645

Please sign in to comment.