Skip to content

Commit

Permalink
fix(kqp): allow null keys for stream join with secondary index (#12363)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored Dec 7, 2024
1 parent 27dba0c commit 817dc99
Show file tree
Hide file tree
Showing 16 changed files with 194 additions and 77 deletions.
80 changes: 80 additions & 0 deletions ydb/core/kqp/common/kqp_yql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,4 +484,84 @@ TAutoPtr<IGraphTransformer> GetDqIntegrationPeepholeTransformer(bool beforeDqTra
return dqIntegrationPeepholePipeline.Build();
}

NNodes::TCoNameValueTupleList TKqpStreamLookupSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const {
TVector<TCoNameValueTuple> settings;

auto strategyTypeToString = [](const EStreamLookupStrategyType& type) {
switch (type) {
case EStreamLookupStrategyType::Unspecified:
break;
case EStreamLookupStrategyType::LookupRows:
return LookupStrategyName;
case EStreamLookupStrategyType::LookupJoinRows:
return LookupJoinStrategyName;
case EStreamLookupStrategyType::LookupSemiJoinRows:
return LookupSemiJoinStrategyName;
}

YQL_ENSURE(false, "Unspecified stream lookup startegy type: " << type);
};

settings.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(StrategySettingName)
.Value<TCoAtom>().Build(strategyTypeToString(Strategy))
.Done()
);

if (AllowNullKeys) {
settings.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(AllowNullKeysSettingName)
.Done()
);
}

return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
.Done();
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TCoNameValueTupleList& list) {
TKqpStreamLookupSettings settings;

auto getLookupStrategyType = [](const TStringBuf& type) {
if (type == LookupStrategyName) {
return EStreamLookupStrategyType::LookupRows;
} else if (type == LookupJoinStrategyName) {
return EStreamLookupStrategyType::LookupJoinRows;
} else if (type == LookupSemiJoinStrategyName) {
return EStreamLookupStrategyType::LookupSemiJoinRows;
} else {
YQL_ENSURE(false, "Unknown stream lookup startegy type: " << type);
}
};

for (const auto& tuple : list) {
auto name = tuple.Name().Value();
if (name == StrategySettingName) {
YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
settings.Strategy = getLookupStrategyType(tuple.Value().Cast<TCoAtom>().Value());
} else if (name == AllowNullKeysSettingName) {
settings.AllowNullKeys = true;
} else {
YQL_ENSURE(false, "Unknown KqpStreamLookup setting name '" << name << "'");
}
}

return settings;
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqlStreamLookupTable& node) {
return TKqpStreamLookupSettings::Parse(node.Settings());
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqlStreamLookupIndex& node) {
return TKqpStreamLookupSettings::Parse(node.Settings());
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqpCnStreamLookup& node) {
return TKqpStreamLookupSettings::Parse(node.Settings());
}

} // namespace NYql
28 changes: 25 additions & 3 deletions ydb/core/kqp/common/kqp_yql.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,31 @@ struct TKqpPhyTxSettings {
constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource";
constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName";

static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;
static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;
enum class EStreamLookupStrategyType {
Unspecified,
LookupRows,
LookupJoinRows,
LookupSemiJoinRows,
};

struct TKqpStreamLookupSettings {
static constexpr TStringBuf StrategySettingName = "Strategy";
static constexpr TStringBuf AllowNullKeysSettingName = "AllowNullKeys";

// stream lookup strategy types
static constexpr std::string_view LookupStrategyName = "LookupRows"sv;
static constexpr std::string_view LookupJoinStrategyName = "LookupJoinRows"sv;
static constexpr std::string_view LookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;

bool AllowNullKeys = false;
EStreamLookupStrategyType Strategy = EStreamLookupStrategyType::Unspecified;

NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const;
static TKqpStreamLookupSettings Parse(const NNodes::TKqlStreamLookupTable& node);
static TKqpStreamLookupSettings Parse(const NNodes::TKqlStreamLookupIndex& node);
static TKqpStreamLookupSettings Parse(const NNodes::TKqpCnStreamLookup& node);
static TKqpStreamLookupSettings Parse(const NNodes::TCoNameValueTupleList& node);
};

struct TKqpReadTableSettings {
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf

settings->SetLookupStrategy(streamLookup.GetLookupStrategy());
settings->SetKeepRowsOrder(streamLookup.GetKeepRowsOrder());
settings->SetAllowNullKeys(streamLookup.GetAllowNullKeys());

TTransform streamLookupTransform;
streamLookupTransform.Type = "StreamLookupInputTransformer";
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
"Base": "TKqlLookupTableBase",
"Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"},
"Children": [
{"Index": 3, "Name": "LookupStrategy", "Type": "TCoAtom"}
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
},
{
Expand All @@ -211,7 +211,7 @@
"Base": "TKqlLookupIndexBase",
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"},
"Children": [
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
},
{
Expand Down Expand Up @@ -467,7 +467,7 @@
{"Index": 1, "Name": "Table", "Type": "TKqpTable"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 3, "Name": "InputType", "Type": "TExprBase"},
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
},
{
Expand Down
32 changes: 12 additions & 20 deletions ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,14 +501,11 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons

const TStructExprType* structType = nullptr;
if (isStreamLookup) {
auto lookupStrategy = node->Child(TKqlStreamLookupTable::Match(node.Get()) ?
TKqlStreamLookupTable::idx_LookupStrategy : TKqlStreamLookupIndex::idx_LookupStrategy);
if (!EnsureAtom(*lookupStrategy, ctx)) {
return TStatus::Error;
}

if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName
|| lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) {
TCoNameValueTupleList settingsNode{node->ChildPtr(TKqlStreamLookupTable::Match(node.Get()) ?
TKqlStreamLookupTable::idx_Settings : TKqlStreamLookupIndex::idx_Settings)};
auto settings = TKqpStreamLookupSettings::Parse(settingsNode);
if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows
|| settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {

if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -1649,13 +1646,6 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
}

TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)};

if (!EnsureAtom(*node->Child(TKqpCnStreamLookup::idx_LookupStrategy), ctx)) {
return TStatus::Error;
}

TCoAtom lookupStrategy(node->Child(TKqpCnStreamLookup::idx_LookupStrategy));

auto inputTypeNode = node->Child(TKqpCnStreamLookup::idx_InputType);

if (!EnsureType(*inputTypeNode, ctx)) {
Expand All @@ -1670,7 +1660,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

YQL_ENSURE(inputItemType);

if (lookupStrategy.Value() == TKqpStreamLookupStrategyName) {
TCoNameValueTupleList settingsNode{node->ChildPtr(TKqpCnStreamLookup::idx_Settings)};
auto settings = TKqpStreamLookupSettings::Parse(settingsNode);
if (settings.Strategy == EStreamLookupStrategyType::LookupRows) {
if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
Expand All @@ -1689,8 +1681,8 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));

} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName
|| lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) {
} else if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows
|| settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {

if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -1736,8 +1728,8 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputItemType));

} else {
ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_LookupStrategy)->Pos()),
TStringBuilder() << "Unexpected lookup strategy: " << lookupStrategy.Value()));
ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_Settings)->Pos()),
TStringBuilder() << "Unexpected lookup strategy: " << settings.Strategy));
return TStatus::Error;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx,
.Table(streamLookup.Table())
.LookupKeys(streamLookup.LookupKeys())
.Columns(usedColumns.Cast())
.LookupStrategy(streamLookup.LookupStrategy())
.Settings(streamLookup.Settings())
.Done();
}

Expand Down
26 changes: 16 additions & 10 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,13 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx,
}

if (useStreamLookup) {
TKqpStreamLookupSettings settings;
settings.Strategy = EStreamLookupStrategyType::LookupRows;
return Build<TKqlStreamLookupTable>(ctx, read.Pos())
.Table(read.Table())
.LookupKeys(readIndexTable.Ptr())
.Columns(read.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, read.Pos()))
.Done();
} else {
return Build<TKqlLookupTable>(ctx, read.Pos())
Expand Down Expand Up @@ -362,11 +364,13 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const

if (!needDataRead) {
if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
TKqpStreamLookupSettings settings;
settings.Strategy = EStreamLookupStrategyType::LookupRows;
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(lookupIndex.LookupKeys())
.Columns(lookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand All @@ -380,18 +384,20 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
auto keyColumnsList = BuildKeyColumnsList(tableDesc, node.Pos(), ctx);

if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
TKqpStreamLookupSettings settings;
settings.Strategy = EStreamLookupStrategyType::LookupRows;
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(lookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();

return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(lookupIndex.Table())
.LookupKeys(lookupIndexTable.Ptr())
.Columns(lookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand All @@ -417,6 +423,8 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
}

auto streamLookupIndex = node.Maybe<TKqlStreamLookupIndex>().Cast();
auto settings = TKqpStreamLookupSettings::Parse(streamLookupIndex);
settings.AllowNullKeys = true;

const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
Expand All @@ -427,7 +435,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand All @@ -437,13 +445,11 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();

TMaybeNode<TExprBase> lookupKeys;
YQL_ENSURE(streamLookupIndex.LookupStrategy().Maybe<TCoAtom>());
TString lookupStrategy = streamLookupIndex.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
if (lookupStrategy == TKqpStreamLookupJoinStrategyName || lookupStrategy == TKqpStreamLookupSemiJoinStrategyName) {
if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {
// Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>,
// expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>,
// so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>>
Expand Down Expand Up @@ -471,7 +477,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(streamLookupIndex.Table())
.LookupKeys(lookupKeys.Cast())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand Down
Loading

0 comments on commit 817dc99

Please sign in to comment.