Skip to content

Commit

Permalink
[GLUTEN-6666][VL] Use custom SparkExprToSubfieldFilterParser (#6754)
Browse files Browse the repository at this point in the history
Removes separating filter relevant code from Gluten. With a custom filter parser registered, we are able to use Velox provided filter extraction.
  • Loading branch information
rui-mo authored Nov 13, 2024
1 parent 61033b7 commit d6326f0
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 1,520 deletions.
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ set(VELOX_SRCS
memory/VeloxMemoryManager.cc
operators/functions/RegistrationAllFunctions.cc
operators/functions/RowConstructorWithNull.cc
operators/functions/SparkExprToSubfieldFilterParser.cc
operators/serializer/VeloxColumnarToRowConverter.cc
operators/serializer/VeloxColumnarBatchSerializer.cc
operators/serializer/VeloxRowToColumnarConverter.cc
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "compute/VeloxRuntime.h"
#include "config/VeloxConfig.h"
#include "jni/JniFileSystem.h"
#include "operators/functions/SparkExprToSubfieldFilterParser.h"
#include "udf/UdfLoader.h"
#include "utils/Exception.h"
#include "velox/common/caching/SsdCache.h"
Expand Down Expand Up @@ -155,6 +156,8 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
velox::parquet::registerParquetReaderFactory();
velox::parquet::registerParquetWriterFactory();
velox::orc::registerOrcReaderFactory();
velox::exec::ExprToSubfieldFilterParser::registerParserFactory(
[]() { return std::make_shared<SparkExprToSubfieldFilterParser>(); });

// Register Velox functions
registerAllFunctions();
Expand Down
103 changes: 103 additions & 0 deletions cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "operators/functions/SparkExprToSubfieldFilterParser.h"

namespace gluten {

using namespace facebook::velox;

bool SparkExprToSubfieldFilterParser::toSparkSubfield(const core::ITypedExpr* field, common::Subfield& subfield) {
std::vector<std::unique_ptr<common::Subfield::PathElement>> path;
for (auto* current = field;;) {
if (auto* fieldAccess = dynamic_cast<const core::FieldAccessTypedExpr*>(current)) {
path.push_back(std::make_unique<common::Subfield::NestedField>(fieldAccess->name()));
} else if (dynamic_cast<const core::DereferenceTypedExpr*>(current)) {
return false;
} else if (dynamic_cast<const core::InputTypedExpr*>(current) == nullptr) {
return false;
} else {
break;
}

if (!current->inputs().empty()) {
return false;
} else {
break;
}
}
std::reverse(path.begin(), path.end());
subfield = common::Subfield(std::move(path));
return true;
}

std::unique_ptr<common::Filter> SparkExprToSubfieldFilterParser::leafCallToSubfieldFilter(
const core::CallTypedExpr& call,
common::Subfield& subfield,
core::ExpressionEvaluator* evaluator,
bool negated) {
if (call.inputs().empty()) {
return nullptr;
}

const auto* leftSide = call.inputs()[0].get();

if (call.name() == "equalto") {
if (toSparkSubfield(leftSide, subfield)) {
return negated ? makeNotEqualFilter(call.inputs()[1], evaluator) : makeEqualFilter(call.inputs()[1], evaluator);
}
} else if (call.name() == "lessthanorequal") {
if (toSparkSubfield(leftSide, subfield)) {
return negated ? makeGreaterThanFilter(call.inputs()[1], evaluator)
: makeLessThanOrEqualFilter(call.inputs()[1], evaluator);
}
} else if (call.name() == "lessthan") {
if (toSparkSubfield(leftSide, subfield)) {
return negated ? makeGreaterThanOrEqualFilter(call.inputs()[1], evaluator)
: makeLessThanFilter(call.inputs()[1], evaluator);
}
} else if (call.name() == "greaterthanorequal") {
if (toSparkSubfield(leftSide, subfield)) {
return negated ? makeLessThanFilter(call.inputs()[1], evaluator)
: makeGreaterThanOrEqualFilter(call.inputs()[1], evaluator);
}
} else if (call.name() == "greaterthan") {
if (toSparkSubfield(leftSide, subfield)) {
return negated ? makeLessThanOrEqualFilter(call.inputs()[1], evaluator)
: makeGreaterThanFilter(call.inputs()[1], evaluator);
}
} else if (call.name() == "in") {
if (toSparkSubfield(leftSide, subfield)) {
return makeInFilter(call.inputs()[1], evaluator, negated);
}
} else if (call.name() == "isnull") {
if (toSparkSubfield(leftSide, subfield)) {
if (negated) {
return exec::isNotNull();
}
return exec::isNull();
}
} else if (call.name() == "isnotnull") {
if (toSparkSubfield(leftSide, subfield)) {
if (negated) {
return exec::isNull();
}
return exec::isNotNull();
}
}
return nullptr;
}
} // namespace gluten
37 changes: 37 additions & 0 deletions cpp/velox/operators/functions/SparkExprToSubfieldFilterParser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/expression/ExprToSubfieldFilter.h"

namespace gluten {

/// Parses Spark expression into subfield filter. Differences from Presto's parser include:
/// 1) Some Spark functions are registered under different names.
/// 2) The supported functions vary.
/// 3) Filter push-down on nested fields is disabled.
class SparkExprToSubfieldFilterParser : public facebook::velox::exec::ExprToSubfieldFilterParser {
public:
std::unique_ptr<facebook::velox::common::Filter> leafCallToSubfieldFilter(
const facebook::velox::core::CallTypedExpr& call,
facebook::velox::common::Subfield& subfield,
facebook::velox::core::ExpressionEvaluator* evaluator,
bool negated) override;

private:
// Compared to the upstream 'toSubfield', the push-down of filter on nested field is disabled.
bool toSparkSubfield(const facebook::velox::core::ITypedExpr* field, facebook::velox::common::Subfield& subfield);
};
} // namespace gluten
Loading

0 comments on commit d6326f0

Please sign in to comment.