Skip to content

Commit

Permalink
YQ-3894 RD supported pushdown for SafeCast, ToBytes, FlatMap (ydb-pla…
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 2, 2024
1 parent 91ebb71 commit 965ba57
Show file tree
Hide file tree
Showing 12 changed files with 836 additions and 656 deletions.
1,067 changes: 507 additions & 560 deletions ydb/library/yql/providers/common/pushdown/collection.cpp

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions ydb/library/yql/providers/common/pushdown/physical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicat

}

TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings) {
NPushdown::TPredicateNode MakePushdownNode(const NNodes::TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings) {
auto lambdaArg = lambda.Args().Arg(0).Ptr();

YQL_LOG(TRACE) << "Push filter. Initial filter lambda: " << NCommon::ExprToPrettyString(ctx, lambda.Ref());
Expand All @@ -54,7 +54,11 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, TExprBase(lambdaArg), TExprBase(lambdaArg), settings);
YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");

NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos, settings);
return SplitForPartialPushdown(predicateTree, ctx, pos, settings);
}

TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings) {
NPushdown::TPredicateNode predicateToPush = MakePushdownNode(lambda, ctx, pos, settings);
if (!predicateToPush.IsValid()) {
return {};
}
Expand All @@ -64,7 +68,7 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
.Args({"filter_row"})
.Body<TExprApplier>()
.Apply(predicateToPush.ExprNode.Cast())
.With(TExprBase(lambdaArg), "filter_row")
.With(lambda.Args().Arg(0), "filter_row")
.Build()
.Done();
// clang-format on
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/providers/common/pushdown/physical_opt.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#pragma once

#include "predicate_node.h"

#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/ast/yql_pos_handle.h>
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>

namespace NYql::NPushdown {

NPushdown::TPredicateNode MakePushdownNode(const NNodes::TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings);
NNodes::TMaybeNode<NNodes::TCoLambda> MakePushdownPredicate(const NNodes::TCoLambda& lambda, TExprContext& ctx, const TPositionHandle& pos, const TSettings& settings);

} // namespace NYql::NPushdown
10 changes: 10 additions & 0 deletions ydb/library/yql/providers/common/pushdown/predicate_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ bool TPredicateNode::IsValid() const {
return res && ExprNode.IsValid();
}

bool TPredicateNode::IsEmpty() const {
if (!ExprNode || !IsValid()) {
return true;
}
if (const auto maybeBool = ExprNode.Maybe<NNodes::TCoBool>()) {
return TStringBuf(maybeBool.Cast().Literal()) == "true"sv;
}
return false;
}

void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op) {
auto predicatesSize = predicates.size();
if (predicatesSize == 0) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/common/pushdown/predicate_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct TPredicateNode {
~TPredicateNode();

bool IsValid() const;
bool IsEmpty() const;
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op);

NNodes::TMaybeNode<NNodes::TExprBase> ExprNode;
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/providers/common/pushdown/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ struct TSettings {
// May be partially pushdowned as:
// $A OR $C
// In case of unsupported / complicated expressions $B and $D
SplitOrOperator = 1 << 22
SplitOrOperator = 1 << 22,
ToBytesFromStringExpressions = 1 << 23, // ToBytes(string like)
FlatMapOverOptionals = 1 << 24 // FlatMap(Optional<T>, Lmabda (T) -> Optional<U>)
};

explicit TSettings(NLog::EComponent logComponent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ message TExpression {
TExpression else_expression = 3;
}

// CAST($value AS $type)
message TCast {
TExpression value = 1;
Ydb.Type type = 2;
}

message TNull {
}

Expand All @@ -368,6 +374,8 @@ message TExpression {
TCoalesce coalesce = 5;

TIf if = 6;

TCast cast = 7;
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace NYql::NConnector::NApi {
namespace NYql {

bool IsEmptyFilterPredicate(const NNodes::TCoLambda& lambda);
bool SerializeFilterPredicate(const NNodes::TExprBase& predicateBody, const NNodes::TCoArgument& predicateArgument, NConnector::NApi::TPredicate* proto, TStringBuilder& err);
bool SerializeFilterPredicate(const NNodes::TCoLambda& predicate, NConnector::NApi::TPredicate* proto, TStringBuilder& err);
TString FormatWhere(const NConnector::NApi::TPredicate& predicate);
} // namespace NYql
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ namespace NYql::NGenericPushDown {
case NYql::NConnector::NApi::TExpression::kNull:
case NYql::NConnector::NApi::TExpression::kCoalesce:
case NYql::NConnector::NApi::TExpression::kIf:
case NYql::NConnector::NApi::TExpression::kCast:
case NYql::NConnector::NApi::TExpression::PAYLOAD_NOT_SET:
return false;
}
Expand All @@ -70,6 +71,7 @@ namespace NYql::NGenericPushDown {
case NYql::NConnector::NApi::TExpression::kNull:
case NYql::NConnector::NApi::TExpression::kCoalesce:
case NYql::NConnector::NApi::TExpression::kIf:
case NYql::NConnector::NApi::TExpression::kCast:
case NYql::NConnector::NApi::TExpression::PAYLOAD_NOT_SET:
return false;
}
Expand Down Expand Up @@ -281,6 +283,7 @@ namespace NYql::NGenericPushDown {
case NYql::NConnector::NApi::TExpression::kNull:
case NYql::NConnector::NApi::TExpression::kCoalesce:
case NYql::NConnector::NApi::TExpression::kIf:
case NYql::NConnector::NApi::TExpression::kCast:
case NYql::NConnector::NApi::TExpression::PAYLOAD_NOT_SET:
return Triple::Unknown;
}
Expand Down
14 changes: 5 additions & 9 deletions ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ namespace {
// Operator features
EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 |
EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator |
EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators | DivisionExpressions |
EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators | DivisionExpressions | EFlag::CastExpression |
EFlag::ToBytesFromStringExpressions | EFlag::FlatMapOverOptionals |

// Split features
EFlag::SplitOrOperator
Expand Down Expand Up @@ -267,19 +268,14 @@ class TPqLogicalOptProposalTransformer : public TOptimizeTransformerBase {
return node;
}

auto newFilterLambda = MakePushdownPredicate(flatmap.Lambda(), ctx, node.Pos(), TPushdownSettings());
if (!newFilterLambda) {
return node;
}

auto predicate = newFilterLambda.Cast();
if (NYql::IsEmptyFilterPredicate(predicate)) {
NPushdown::TPredicateNode predicate = MakePushdownNode(flatmap.Lambda(), ctx, node.Pos(), TPushdownSettings());
if (predicate.IsEmpty()) {
return node;
}

TStringBuilder err;
NYql::NConnector::NApi::TPredicate predicateProto;
if (!NYql::SerializeFilterPredicate(predicate, &predicateProto, err)) {
if (!NYql::SerializeFilterPredicate(predicate.ExprNode.Cast(), flatmap.Lambda().Args().Arg(0), &predicateProto, err)) {
ctx.AddWarning(TIssue(ctx.GetPosition(node.Pos()), "Failed to serialize filter predicate for source: " + err));
return node;
}
Expand Down
20 changes: 14 additions & 6 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,10 @@ def test_filters_non_optional_field(self, kikimr, client):
sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL)) WHERE '''
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String NOT NULL, event String NOT NULL, nested Json NOT NULL)) WHERE '''
data = [
'{"time": 101, "data": "hello1", "event": "event1"}',
'{"time": 102, "data": "hello2", "event": "event2"}']
'{"time": 101, "data": "hello1", "event": "event1", "nested": {"xyz": "key"}}',
'{"time": 102, "data": "hello2", "event": "event2", "nested": ["abc", "key"]}']
filter = "time > 101;"
expected = ['102']
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` > 101)')
Expand All @@ -330,6 +330,10 @@ def test_filters_non_optional_field(self, kikimr, client):
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE ((`event` IS DISTINCT FROM `data`) AND (`event` IN (\\"1\\"')
filter = ' IF(event = "event2", event IS DISTINCT FROM data, FALSE)'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE IF((`event` = \\"event2\\"), (`event` IS DISTINCT FROM `data`), FALSE)')
filter = ' nested REGEXP ".*abc.*"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (CAST(`nested` AS String) REGEXP \\".*abc.*\\")')
filter = ' CAST(nested AS String) REGEXP ".*abc.*"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (CAST(`nested` AS String) REGEXP \\".*abc.*\\")')

@yq_v1
def test_filters_optional_field(self, kikimr, client):
Expand All @@ -341,10 +345,10 @@ def test_filters_optional_field(self, kikimr, client):
sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String, flag Bool, field1 UInt8, field2 Int64)) WHERE '''
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String, flag Bool, field1 UInt8, field2 Int64, nested Json)) WHERE '''
data = [
'{"time": 101, "data": "hello1", "event": "event1", "flag": false, "field1": 5, "field2": 5}',
'{"time": 102, "data": "hello2", "event": "event2", "flag": true, "field1": 5, "field2": 1005}']
'{"time": 101, "data": "hello1", "event": "event1", "flag": false, "field1": 5, "field2": 5, "nested": {"xyz": "key"}}',
'{"time": 102, "data": "hello2", "event": "event2", "flag": true, "field1": 5, "field2": 1005, "nested": ["abc", "key"]}']
expected = ['102']
filter = 'data = "hello2"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`data` = \\"hello2\\")')
Expand Down Expand Up @@ -380,6 +384,10 @@ def test_filters_optional_field(self, kikimr, client):
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (NOT (COALESCE(`event`, \\"\\") REGEXP \\"e.*e.*t1\\"))')
filter = " event ?? '' REGEXP data ?? '' OR time = 102"
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE ((COALESCE(`event`, \\"\\") REGEXP COALESCE(`data`, \\"\\")) OR (`time` = 102))')
filter = ' nested REGEXP ".*abc.*"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (IF((`nested` IS NOT NULL), CAST(`nested` AS String), NULL) REGEXP \\".*abc.*\\")')
filter = ' CAST(nested AS String) REGEXP ".*abc.*"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (CAST(`nested` AS String?) REGEXP \\".*abc.*\\")')

@yq_v1
def test_filter_missing_fields(self, kikimr, client):
Expand Down

0 comments on commit 965ba57

Please sign in to comment.