From 817dc99cfd5845b0fc2cfabde847f9dc44b4cd3a Mon Sep 17 00:00:00 2001 From: Iuliia Sidorina Date: Sat, 7 Dec 2024 10:43:44 +0100 Subject: [PATCH] fix(kqp): allow null keys for stream join with secondary index (#12363) --- ydb/core/kqp/common/kqp_yql.cpp | 80 +++++++++++++++++++ ydb/core/kqp/common/kqp_yql.h | 28 ++++++- .../kqp/executer_actor/kqp_tasks_graph.cpp | 1 + ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 6 +- ydb/core/kqp/host/kqp_type_ann.cpp | 32 +++----- .../kqp/opt/logical/kqp_opt_log_extract.cpp | 2 +- .../kqp/opt/logical/kqp_opt_log_indexes.cpp | 26 +++--- ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp | 23 ++++-- .../kqp/opt/logical/kqp_opt_log_ranges.cpp | 4 +- .../logical/kqp_opt_log_ranges_predext.cpp | 8 +- .../opt/physical/kqp_opt_phy_build_stage.cpp | 10 ++- .../kqp/query_compiler/kqp_query_compiler.cpp | 31 ++++--- .../kqp/runtime/kqp_stream_lookup_worker.cpp | 13 ++- ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp | 5 -- ydb/core/protos/kqp.proto | 1 + ydb/core/protos/kqp_physical.proto | 1 + 16 files changed, 194 insertions(+), 77 deletions(-) diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index 3c311a2baf56..f6a0456c226d 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -484,4 +484,84 @@ TAutoPtr GetDqIntegrationPeepholeTransformer(bool beforeDqTra return dqIntegrationPeepholePipeline.Build(); } +NNodes::TCoNameValueTupleList TKqpStreamLookupSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const { + TVector settings; + + auto strategyTypeToString = [](const EStreamLookupStrategyType& type) { + switch (type) { + case EStreamLookupStrategyType::Unspecified: + break; + case EStreamLookupStrategyType::LookupRows: + return LookupStrategyName; + case EStreamLookupStrategyType::LookupJoinRows: + return LookupJoinStrategyName; + case EStreamLookupStrategyType::LookupSemiJoinRows: + return LookupSemiJoinStrategyName; + } + + YQL_ENSURE(false, "Unspecified stream lookup startegy type: " << type); + }; + + settings.emplace_back( + Build(ctx, pos) + .Name().Build(StrategySettingName) + .Value().Build(strategyTypeToString(Strategy)) + .Done() + ); + + if (AllowNullKeys) { + settings.emplace_back( + Build(ctx, pos) + .Name().Build(AllowNullKeysSettingName) + .Done() + ); + } + + return Build(ctx, pos) + .Add(settings) + .Done(); +} + +TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TCoNameValueTupleList& list) { + TKqpStreamLookupSettings settings; + + auto getLookupStrategyType = [](const TStringBuf& type) { + if (type == LookupStrategyName) { + return EStreamLookupStrategyType::LookupRows; + } else if (type == LookupJoinStrategyName) { + return EStreamLookupStrategyType::LookupJoinRows; + } else if (type == LookupSemiJoinStrategyName) { + return EStreamLookupStrategyType::LookupSemiJoinRows; + } else { + YQL_ENSURE(false, "Unknown stream lookup startegy type: " << type); + } + }; + + for (const auto& tuple : list) { + auto name = tuple.Name().Value(); + if (name == StrategySettingName) { + YQL_ENSURE(tuple.Value().Maybe()); + settings.Strategy = getLookupStrategyType(tuple.Value().Cast().Value()); + } else if (name == AllowNullKeysSettingName) { + settings.AllowNullKeys = true; + } else { + YQL_ENSURE(false, "Unknown KqpStreamLookup setting name '" << name << "'"); + } + } + + return settings; +} + +TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqlStreamLookupTable& node) { + return TKqpStreamLookupSettings::Parse(node.Settings()); +} + +TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqlStreamLookupIndex& node) { + return TKqpStreamLookupSettings::Parse(node.Settings()); +} + +TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqpCnStreamLookup& node) { + return TKqpStreamLookupSettings::Parse(node.Settings()); +} + } // namespace NYql diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 37b7dd8555ed..7613ef942832 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -46,9 +46,31 @@ struct TKqpPhyTxSettings { constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource"; constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName"; -static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv; -static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv; -static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv; +enum class EStreamLookupStrategyType { + Unspecified, + LookupRows, + LookupJoinRows, + LookupSemiJoinRows, +}; + +struct TKqpStreamLookupSettings { + static constexpr TStringBuf StrategySettingName = "Strategy"; + static constexpr TStringBuf AllowNullKeysSettingName = "AllowNullKeys"; + + // stream lookup strategy types + static constexpr std::string_view LookupStrategyName = "LookupRows"sv; + static constexpr std::string_view LookupJoinStrategyName = "LookupJoinRows"sv; + static constexpr std::string_view LookupSemiJoinStrategyName = "LookupSemiJoinRows"sv; + + bool AllowNullKeys = false; + EStreamLookupStrategyType Strategy = EStreamLookupStrategyType::Unspecified; + + NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; + static TKqpStreamLookupSettings Parse(const NNodes::TKqlStreamLookupTable& node); + static TKqpStreamLookupSettings Parse(const NNodes::TKqlStreamLookupIndex& node); + static TKqpStreamLookupSettings Parse(const NNodes::TKqpCnStreamLookup& node); + static TKqpStreamLookupSettings Parse(const NNodes::TCoNameValueTupleList& node); +}; struct TKqpReadTableSettings { static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys"; diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index ec171c1c0b5f..c03601569927 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -420,6 +420,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf settings->SetLookupStrategy(streamLookup.GetLookupStrategy()); settings->SetKeepRowsOrder(streamLookup.GetKeepRowsOrder()); + settings->SetAllowNullKeys(streamLookup.GetAllowNullKeys()); TTransform streamLookupTransform; streamLookupTransform.Type = "StreamLookupInputTransformer"; diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index b8f78803d853..d998c0540f8d 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -186,7 +186,7 @@ "Base": "TKqlLookupTableBase", "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"}, "Children": [ - {"Index": 3, "Name": "LookupStrategy", "Type": "TCoAtom"} + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"} ] }, { @@ -211,7 +211,7 @@ "Base": "TKqlLookupIndexBase", "Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"}, "Children": [ - {"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"} + {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"} ] }, { @@ -467,7 +467,7 @@ {"Index": 1, "Name": "Table", "Type": "TKqpTable"}, {"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}, {"Index": 3, "Name": "InputType", "Type": "TExprBase"}, - {"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"} + {"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"} ] }, { diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 6f674b8522b1..c560104ab571 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -501,14 +501,11 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons const TStructExprType* structType = nullptr; if (isStreamLookup) { - auto lookupStrategy = node->Child(TKqlStreamLookupTable::Match(node.Get()) ? - TKqlStreamLookupTable::idx_LookupStrategy : TKqlStreamLookupIndex::idx_LookupStrategy); - if (!EnsureAtom(*lookupStrategy, ctx)) { - return TStatus::Error; - } - - if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName - || lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) { + TCoNameValueTupleList settingsNode{node->ChildPtr(TKqlStreamLookupTable::Match(node.Get()) ? + TKqlStreamLookupTable::idx_Settings : TKqlStreamLookupIndex::idx_Settings)}; + auto settings = TKqpStreamLookupSettings::Parse(settingsNode); + if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows + || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) { if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) { return TStatus::Error; @@ -1649,13 +1646,6 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext } TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)}; - - if (!EnsureAtom(*node->Child(TKqpCnStreamLookup::idx_LookupStrategy), ctx)) { - return TStatus::Error; - } - - TCoAtom lookupStrategy(node->Child(TKqpCnStreamLookup::idx_LookupStrategy)); - auto inputTypeNode = node->Child(TKqpCnStreamLookup::idx_InputType); if (!EnsureType(*inputTypeNode, ctx)) { @@ -1670,7 +1660,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext YQL_ENSURE(inputItemType); - if (lookupStrategy.Value() == TKqpStreamLookupStrategyName) { + TCoNameValueTupleList settingsNode{node->ChildPtr(TKqpCnStreamLookup::idx_Settings)}; + auto settings = TKqpStreamLookupSettings::Parse(settingsNode); + if (settings.Strategy == EStreamLookupStrategyType::LookupRows) { if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) { return TStatus::Error; } @@ -1689,8 +1681,8 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext node->SetTypeAnn(ctx.MakeType(rowType)); - } else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName - || lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) { + } else if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows + || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) { if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) { return TStatus::Error; @@ -1736,8 +1728,8 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext node->SetTypeAnn(ctx.MakeType(outputItemType)); } else { - ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_LookupStrategy)->Pos()), - TStringBuilder() << "Unexpected lookup strategy: " << lookupStrategy.Value())); + ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_Settings)->Pos()), + TStringBuilder() << "Unexpected lookup strategy: " << settings.Strategy)); return TStatus::Error; } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp index 3e16026ac034..2507d3b31c86 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp @@ -235,7 +235,7 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx, .Table(streamLookup.Table()) .LookupKeys(streamLookup.LookupKeys()) .Columns(usedColumns.Cast()) - .LookupStrategy(streamLookup.LookupStrategy()) + .Settings(streamLookup.Settings()) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp index d0655c90255e..ef465f206fef 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp @@ -317,11 +317,13 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx, } if (useStreamLookup) { + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; return Build(ctx, read.Pos()) .Table(read.Table()) .LookupKeys(readIndexTable.Ptr()) .Columns(read.Columns()) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, read.Pos())) .Done(); } else { return Build(ctx, read.Pos()) @@ -362,11 +364,13 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const if (!needDataRead) { if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) { + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; return Build(ctx, node.Pos()) .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(lookupIndex.LookupKeys()) .Columns(lookupIndex.Columns()) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); } @@ -380,18 +384,20 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const auto keyColumnsList = BuildKeyColumnsList(tableDesc, node.Pos(), ctx); if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) { + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; TExprBase lookupIndexTable = Build(ctx, node.Pos()) .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(lookupIndex.LookupKeys()) .Columns(keyColumnsList) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); return Build(ctx, node.Pos()) .Table(lookupIndex.Table()) .LookupKeys(lookupIndexTable.Ptr()) .Columns(lookupIndex.Columns()) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); } @@ -417,6 +423,8 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, } auto streamLookupIndex = node.Maybe().Cast(); + auto settings = TKqpStreamLookupSettings::Parse(streamLookupIndex); + settings.AllowNullKeys = true; const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path()); const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue()); @@ -427,7 +435,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(streamLookupIndex.LookupKeys()) .Columns(streamLookupIndex.Columns()) - .LookupStrategy().Build(streamLookupIndex.LookupStrategy()) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); } @@ -437,13 +445,11 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, .Table(BuildTableMeta(*indexMeta, node.Pos(), ctx)) .LookupKeys(streamLookupIndex.LookupKeys()) .Columns(keyColumnsList) - .LookupStrategy().Build(streamLookupIndex.LookupStrategy()) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); TMaybeNode lookupKeys; - YQL_ENSURE(streamLookupIndex.LookupStrategy().Maybe()); - TString lookupStrategy = streamLookupIndex.LookupStrategy().Maybe().Cast().StringValue(); - if (lookupStrategy == TKqpStreamLookupJoinStrategyName || lookupStrategy == TKqpStreamLookupSemiJoinStrategyName) { + if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) { // Result type of lookupIndexTable: list>>, // expected input type for main table stream join: list, left_row>>, // so we should transform list>> to list, left_row>> @@ -471,7 +477,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx, .Table(streamLookupIndex.Table()) .LookupKeys(lookupKeys.Cast()) .Columns(streamLookupIndex.Columns()) - .LookupStrategy().Build(streamLookupIndex.LookupStrategy()) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index 40f0f27285c5..21d2134f6c1f 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -187,6 +187,8 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, { if (kqpCtx.IsScanQuery()) { YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin, "Stream lookup is not enabled for index lookup join"); + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; return Build(ctx, pos) .Table(table) .LookupKeys() @@ -198,7 +200,7 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, .Columns(columns) .Index() .Build(indexName) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, pos)) .Done(); } @@ -222,6 +224,8 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, { if (kqpCtx.IsScanQuery()) { YQL_ENSURE(kqpCtx.Config->EnableKqpScanQueryStreamIdxLookupJoin, "Stream lookup is not enabled for index lookup join"); + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; return Build(ctx, pos) .Table(table) .LookupKeys() @@ -231,11 +235,13 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, .Build() .Build() .Columns(columns) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, pos)) .Done(); } if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) { + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; return Build(ctx, pos) .Table(table) .LookupKeys() @@ -245,7 +251,7 @@ TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, .Build() .Build() .Columns(columns) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, pos)) .Done(); } @@ -397,9 +403,10 @@ TMaybeNode BuildKqpStreamIndexLookupJoin( } } - auto strategy = join.JoinType().Value() == "LeftSemi" - ? TKqpStreamLookupSemiJoinStrategyName - : TKqpStreamLookupJoinStrategyName; + TKqpStreamLookupSettings settings; + settings.Strategy = join.JoinType().Value() == "LeftSemi" + ? EStreamLookupStrategyType::LookupSemiJoinRows + : EStreamLookupStrategyType::LookupJoinRows; TMaybeNode lookupJoin; if (indexName) { @@ -408,14 +415,14 @@ TMaybeNode BuildKqpStreamIndexLookupJoin( .LookupKeys(leftInput) .Columns(lookupColumns.Cast()) .Index().Build(indexName) - .LookupStrategy().Build(strategy) + .Settings(settings.BuildNode(ctx, join.Pos())) .Done(); } else { lookupJoin = Build(ctx, join.Pos()) .Table(rightLookup.MainTable) .LookupKeys(leftInput) .Columns(lookupColumns.Cast()) - .LookupStrategy().Build(strategy) + .Settings(settings.BuildNode(ctx, join.Pos())) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp index f62d79049895..069bacafc65d 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges.cpp @@ -155,11 +155,13 @@ TExprBase KqpRewriteLookupTable(const TExprBase& node, TExprContext& ctx, const return node; } + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; return Build(ctx, lookup.Pos()) .Table(lookup.Table()) .LookupKeys(lookup.LookupKeys()) .Columns(lookup.Columns()) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, lookup.Pos())) .Done(); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp index 897d88e7a52b..c655760ce415 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp @@ -282,13 +282,15 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx if (indexName) { if (kqpCtx.IsScanQuery()) { if (kqpCtx.Config->EnableKqpScanQueryStreamLookup) { + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; result = Build(ctx, node.Pos()) .Table(read.Table()) .Columns(read.Columns()) .LookupKeys(keys) .Index(indexName.Cast()) .LookupKeys(keys) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); } } else { @@ -302,11 +304,13 @@ TExprBase KqpPushExtractedPredicateToReadTable(TExprBase node, TExprContext& ctx } else { if (kqpCtx.IsScanQuery()) { if (kqpCtx.Config->EnableKqpScanQueryStreamLookup) { + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; result = Build(ctx, node.Pos()) .Table(read.Table()) .Columns(read.Columns()) .LookupKeys(keys) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); } } else { diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 9d466d637653..8b09b77ca742 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -604,6 +604,8 @@ NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, N YQL_ENSURE(lookupKeys.Maybe(), "Expected list iterator as LookupKeys, but got: " << KqpExprToPrettyString(lookupKeys, ctx)); + TKqpStreamLookupSettings settings; + settings.Strategy = EStreamLookupStrategyType::LookupRows; TNodeOnNodeOwnedMap replaceMap; TVector newInputs; TVector newArgs; @@ -640,7 +642,7 @@ NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, N .Table(lookupTable.Table()) .Columns(lookupTable.Columns()) .InputType(ExpandType(node.Pos(), *keysPrecompute.Ref().GetTypeAnn(), ctx)) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); newInputs.emplace_back(std::move(cnStreamLookup)); @@ -675,7 +677,7 @@ NYql::NNodes::TExprBase KqpRewriteLookupTablePhy(NYql::NNodes::TExprBase node, N .Table(lookupTable.Table()) .Columns(lookupTable.Columns()) .InputType(ExpandType(node.Pos(), *lookupKeysList.Ref().GetTypeAnn(), ctx)) - .LookupStrategy().Build(TKqpStreamLookupStrategyName) + .Settings(settings.BuildNode(ctx, node.Pos())) .Done(); newInputs.emplace_back(std::move(cnStreamLookup)); @@ -727,7 +729,7 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase .Table(lookup.Table()) .Columns(lookup.Columns()) .InputType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx)) - .LookupStrategy(lookup.LookupStrategy()) + .Settings(lookup.Settings()) .Done(); } else if (lookup.LookupKeys().Maybe()) { @@ -738,7 +740,7 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase .Table(lookup.Table()) .Columns(lookup.Columns()) .InputType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx)) - .LookupStrategy(lookup.LookupStrategy()) + .Settings(lookup.Settings()) .Done(); } else { return node; diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index cc69404c02ca..675ceb754f32 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -86,20 +86,19 @@ NKqpProto::TKqpPhyInternalBinding::EType GetPhyInternalBindingType(const std::st return bindingType; } -NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view strategy) { - NKqpProto::EStreamLookupStrategy lookupStrategy = NKqpProto::EStreamLookupStrategy::UNSPECIFIED; - - if (strategy == "LookupRows"sv) { - lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP; - } else if (strategy == "LookupJoinRows"sv) { - lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN; - } else if (strategy == "LookupSemiJoinRows"sv) { - lookupStrategy = NKqpProto::EStreamLookupStrategy::SEMI_JOIN; +NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(EStreamLookupStrategyType strategy) { + switch (strategy) { + case EStreamLookupStrategyType::Unspecified: + break; + case EStreamLookupStrategyType::LookupRows: + return NKqpProto::EStreamLookupStrategy::LOOKUP; + case EStreamLookupStrategyType::LookupJoinRows: + return NKqpProto::EStreamLookupStrategy::JOIN; + case EStreamLookupStrategyType::LookupSemiJoinRows: + return NKqpProto::EStreamLookupStrategy::SEMI_JOIN; } - YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED, - "Unexpected stream lookup strategy: " << strategy); - return lookupStrategy; + YQL_ENSURE(false, "Unspecified stream lookup strategy: " << strategy); } void FillTableId(const TKqpTable& table, NKqpProto::TKqpPhyTableId& tableProto) { @@ -1380,9 +1379,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { const auto resultItemType = resultType->Cast()->GetItemType(); streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv)); - YQL_ENSURE(streamLookup.LookupStrategy().Maybe()); - TString lookupStrategy = streamLookup.LookupStrategy().Maybe().Cast().StringValue(); - streamLookupProto.SetLookupStrategy(GetStreamLookupStrategy(lookupStrategy)); + auto settings = TKqpStreamLookupSettings::Parse(streamLookup); + streamLookupProto.SetLookupStrategy(GetStreamLookupStrategy(settings.Strategy)); + streamLookupProto.SetAllowNullKeys(settings.AllowNullKeys); streamLookupProto.SetKeepRowsOrder(Config->OrderPreservingLookupJoinEnabled()); switch (streamLookupProto.GetLookupStrategy()) { @@ -1442,7 +1441,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { break; } default: - YQL_ENSURE(false, "Unexpected lookup strategy for stream lookup: " << lookupStrategy); + YQL_ENSURE(false, "Unexpected lookup strategy for stream lookup: " << settings.Strategy); } return; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index c523314384e8..78dc47933b73 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -599,18 +599,23 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { break; } - auto hasNulls = [](const TOwnedCellVec& cellVec) { + auto isKeyAllowed = [&](const TOwnedCellVec& cellVec) { + if (Settings.HasAllowNullKeys() && Settings.GetAllowNullKeys()) { + return true; + } + + // otherwise we can't use nulls as lookup keys for (const auto& cell : cellVec) { if (cell.IsNull()) { - return true; + return false; } } - return false; + return true; }; UnprocessedRows.pop_front(); - if (!hasNulls(joinKey)) { // don't use nulls as lookup keys, because null != null + if (isKeyAllowed(joinKey)) { std::vector > partitions; if (joinKey.size() < KeyColumns.size()) { // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 8693c77452a2..5cf7ac6701c8 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -4523,11 +4523,6 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda setting.SetValue("1"); auto serverSettings = TKikimrSettings() .SetKqpSettings({setting}); - - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(false); - serverSettings.SetAppConfig(appConfig); - TKikimrRunner kikimr(serverSettings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 2ce495241b68..c34ebabaee22 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -749,6 +749,7 @@ message TKqpStreamLookupSettings { optional bool AllowInconsistentReads = 9 [default = false]; optional uint32 LockNodeId = 10; optional bool KeepRowsOrder = 11; + optional bool AllowNullKeys = 12; } message TKqpSequencerSettings { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 1b7ceecd3c5f..ed21394f534e 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -288,6 +288,7 @@ message TKqpPhyCnStreamLookup { bytes ResultType = 5; EStreamLookupStrategy LookupStrategy = 6; bool KeepRowsOrder = 7; + bool AllowNullKeys = 8; } message TKqpPhyCnSequencer {