Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kqp): allow null keys for stream join with secondary index #12363

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions ydb/core/kqp/common/kqp_yql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,4 +484,84 @@ TAutoPtr<IGraphTransformer> GetDqIntegrationPeepholeTransformer(bool beforeDqTra
return dqIntegrationPeepholePipeline.Build();
}

NNodes::TCoNameValueTupleList TKqpStreamLookupSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const {
TVector<TCoNameValueTuple> settings;

auto strategyTypeToString = [](const EStreamLookupStrategyType& type) {
switch (type) {
case EStreamLookupStrategyType::Unspecified:
break;
case EStreamLookupStrategyType::LookupRows:
return LookupStrategyName;
case EStreamLookupStrategyType::LookupJoinRows:
return LookupJoinStrategyName;
case EStreamLookupStrategyType::LookupSemiJoinRows:
return LookupSemiJoinStrategyName;
}

YQL_ENSURE(false, "Unspecified stream lookup startegy type: " << type);
};

settings.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(StrategySettingName)
.Value<TCoAtom>().Build(strategyTypeToString(Strategy))
.Done()
);

if (AllowNullKeys) {
settings.emplace_back(
Build<TCoNameValueTuple>(ctx, pos)
.Name().Build(AllowNullKeysSettingName)
.Done()
);
}

return Build<TCoNameValueTupleList>(ctx, pos)
.Add(settings)
.Done();
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TCoNameValueTupleList& list) {
TKqpStreamLookupSettings settings;

auto getLookupStrategyType = [](const TStringBuf& type) {
if (type == LookupStrategyName) {
return EStreamLookupStrategyType::LookupRows;
} else if (type == LookupJoinStrategyName) {
return EStreamLookupStrategyType::LookupJoinRows;
} else if (type == LookupSemiJoinStrategyName) {
return EStreamLookupStrategyType::LookupSemiJoinRows;
} else {
YQL_ENSURE(false, "Unknown stream lookup startegy type: " << type);
}
};

for (const auto& tuple : list) {
auto name = tuple.Name().Value();
if (name == StrategySettingName) {
YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
settings.Strategy = getLookupStrategyType(tuple.Value().Cast<TCoAtom>().Value());
} else if (name == AllowNullKeysSettingName) {
settings.AllowNullKeys = true;
} else {
YQL_ENSURE(false, "Unknown KqpStreamLookup setting name '" << name << "'");
}
}

return settings;
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqlStreamLookupTable& node) {
return TKqpStreamLookupSettings::Parse(node.Settings());
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqlStreamLookupIndex& node) {
return TKqpStreamLookupSettings::Parse(node.Settings());
}

TKqpStreamLookupSettings TKqpStreamLookupSettings::Parse(const NNodes::TKqpCnStreamLookup& node) {
return TKqpStreamLookupSettings::Parse(node.Settings());
}

} // namespace NYql
28 changes: 25 additions & 3 deletions ydb/core/kqp/common/kqp_yql.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,31 @@ struct TKqpPhyTxSettings {
constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource";
constexpr TStringBuf KqpTableSinkName = "KqpTableSinkName";

static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;
static constexpr std::string_view TKqpStreamLookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;
enum class EStreamLookupStrategyType {
Unspecified,
LookupRows,
LookupJoinRows,
LookupSemiJoinRows,
};

struct TKqpStreamLookupSettings {
static constexpr TStringBuf StrategySettingName = "Strategy";
static constexpr TStringBuf AllowNullKeysSettingName = "AllowNullKeys";

// stream lookup strategy types
static constexpr std::string_view LookupStrategyName = "LookupRows"sv;
static constexpr std::string_view LookupJoinStrategyName = "LookupJoinRows"sv;
static constexpr std::string_view LookupSemiJoinStrategyName = "LookupSemiJoinRows"sv;

bool AllowNullKeys = false;
EStreamLookupStrategyType Strategy = EStreamLookupStrategyType::Unspecified;

NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const;
static TKqpStreamLookupSettings Parse(const NNodes::TKqlStreamLookupTable& node);
static TKqpStreamLookupSettings Parse(const NNodes::TKqlStreamLookupIndex& node);
static TKqpStreamLookupSettings Parse(const NNodes::TKqpCnStreamLookup& node);
static TKqpStreamLookupSettings Parse(const NNodes::TCoNameValueTupleList& node);
};

struct TKqpReadTableSettings {
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf

settings->SetLookupStrategy(streamLookup.GetLookupStrategy());
settings->SetKeepRowsOrder(streamLookup.GetKeepRowsOrder());
settings->SetAllowNullKeys(streamLookup.GetAllowNullKeys());

TTransform streamLookupTransform;
streamLookupTransform.Type = "StreamLookupInputTransformer";
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
"Base": "TKqlLookupTableBase",
"Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"},
"Children": [
{"Index": 3, "Name": "LookupStrategy", "Type": "TCoAtom"}
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
},
{
Expand All @@ -211,7 +211,7 @@
"Base": "TKqlLookupIndexBase",
"Match": {"Type": "Callable", "Name": "KqlStreamLookupIndex"},
"Children": [
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
},
{
Expand Down Expand Up @@ -467,7 +467,7 @@
{"Index": 1, "Name": "Table", "Type": "TKqpTable"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"},
{"Index": 3, "Name": "InputType", "Type": "TExprBase"},
{"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList"}
]
},
{
Expand Down
32 changes: 12 additions & 20 deletions ydb/core/kqp/host/kqp_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,14 +501,11 @@ TStatus AnnotateLookupTable(const TExprNode::TPtr& node, TExprContext& ctx, cons

const TStructExprType* structType = nullptr;
if (isStreamLookup) {
auto lookupStrategy = node->Child(TKqlStreamLookupTable::Match(node.Get()) ?
TKqlStreamLookupTable::idx_LookupStrategy : TKqlStreamLookupIndex::idx_LookupStrategy);
if (!EnsureAtom(*lookupStrategy, ctx)) {
return TStatus::Error;
}

if (lookupStrategy->Content() == TKqpStreamLookupJoinStrategyName
|| lookupStrategy->Content() == TKqpStreamLookupSemiJoinStrategyName) {
TCoNameValueTupleList settingsNode{node->ChildPtr(TKqlStreamLookupTable::Match(node.Get()) ?
TKqlStreamLookupTable::idx_Settings : TKqlStreamLookupIndex::idx_Settings)};
auto settings = TKqpStreamLookupSettings::Parse(settingsNode);
if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows
|| settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {

if (!EnsureTupleType(node->Pos(), *lookupType, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -1649,13 +1646,6 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
}

TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)};

if (!EnsureAtom(*node->Child(TKqpCnStreamLookup::idx_LookupStrategy), ctx)) {
return TStatus::Error;
}

TCoAtom lookupStrategy(node->Child(TKqpCnStreamLookup::idx_LookupStrategy));

auto inputTypeNode = node->Child(TKqpCnStreamLookup::idx_InputType);

if (!EnsureType(*inputTypeNode, ctx)) {
Expand All @@ -1670,7 +1660,9 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

YQL_ENSURE(inputItemType);

if (lookupStrategy.Value() == TKqpStreamLookupStrategyName) {
TCoNameValueTupleList settingsNode{node->ChildPtr(TKqpCnStreamLookup::idx_Settings)};
auto settings = TKqpStreamLookupSettings::Parse(settingsNode);
if (settings.Strategy == EStreamLookupStrategyType::LookupRows) {
if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
Expand All @@ -1689,8 +1681,8 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext

node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));

} else if (lookupStrategy.Value() == TKqpStreamLookupJoinStrategyName
|| lookupStrategy.Value() == TKqpStreamLookupSemiJoinStrategyName) {
} else if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows
|| settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {

if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -1736,8 +1728,8 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputItemType));

} else {
ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_LookupStrategy)->Pos()),
TStringBuilder() << "Unexpected lookup strategy: " << lookupStrategy.Value()));
ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_Settings)->Pos()),
TStringBuilder() << "Unexpected lookup strategy: " << settings.Strategy));
return TStatus::Error;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ TExprBase KqpApplyExtractMembersToLookupTable(TExprBase node, TExprContext& ctx,
.Table(streamLookup.Table())
.LookupKeys(streamLookup.LookupKeys())
.Columns(usedColumns.Cast())
.LookupStrategy(streamLookup.LookupStrategy())
.Settings(streamLookup.Settings())
.Done();
}

Expand Down
26 changes: 16 additions & 10 deletions ydb/core/kqp/opt/logical/kqp_opt_log_indexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,13 @@ TExprBase DoRewriteIndexRead(const TReadMatch& read, TExprContext& ctx,
}

if (useStreamLookup) {
TKqpStreamLookupSettings settings;
settings.Strategy = EStreamLookupStrategyType::LookupRows;
return Build<TKqlStreamLookupTable>(ctx, read.Pos())
.Table(read.Table())
.LookupKeys(readIndexTable.Ptr())
.Columns(read.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, read.Pos()))
.Done();
} else {
return Build<TKqlLookupTable>(ctx, read.Pos())
Expand Down Expand Up @@ -362,11 +364,13 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const

if (!needDataRead) {
if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
TKqpStreamLookupSettings settings;
settings.Strategy = EStreamLookupStrategyType::LookupRows;
return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(lookupIndex.LookupKeys())
.Columns(lookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand All @@ -380,18 +384,20 @@ TExprBase KqpRewriteLookupIndex(const TExprBase& node, TExprContext& ctx, const
auto keyColumnsList = BuildKeyColumnsList(tableDesc, node.Pos(), ctx);

if (kqpCtx.Config->EnableKqpDataQueryStreamLookup) {
TKqpStreamLookupSettings settings;
settings.Strategy = EStreamLookupStrategyType::LookupRows;
TExprBase lookupIndexTable = Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(lookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();

return Build<TKqlStreamLookupTable>(ctx, node.Pos())
.Table(lookupIndex.Table())
.LookupKeys(lookupIndexTable.Ptr())
.Columns(lookupIndex.Columns())
.LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand All @@ -417,6 +423,8 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
}

auto streamLookupIndex = node.Maybe<TKqlStreamLookupIndex>().Cast();
auto settings = TKqpStreamLookupSettings::Parse(streamLookupIndex);
settings.AllowNullKeys = true;

const auto& tableDesc = GetTableData(*kqpCtx.Tables, kqpCtx.Cluster, streamLookupIndex.Table().Path());
const auto& [indexMeta, _] = tableDesc.Metadata->GetIndexMetadata(streamLookupIndex.Index().StringValue());
Expand All @@ -427,7 +435,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand All @@ -437,13 +445,11 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(BuildTableMeta(*indexMeta, node.Pos(), ctx))
.LookupKeys(streamLookupIndex.LookupKeys())
.Columns(keyColumnsList)
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();

TMaybeNode<TExprBase> lookupKeys;
YQL_ENSURE(streamLookupIndex.LookupStrategy().Maybe<TCoAtom>());
TString lookupStrategy = streamLookupIndex.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
if (lookupStrategy == TKqpStreamLookupJoinStrategyName || lookupStrategy == TKqpStreamLookupSemiJoinStrategyName) {
if (settings.Strategy == EStreamLookupStrategyType::LookupJoinRows || settings.Strategy == EStreamLookupStrategyType::LookupSemiJoinRows) {
// Result type of lookupIndexTable: list<tuple<left_row, optional<main_table_pk>>>,
// expected input type for main table stream join: list<tuple<optional<main_table_pk>, left_row>>,
// so we should transform list<tuple<left_row, optional<main_table_pk>>> to list<tuple<optional<main_table_pk>, left_row>>
Expand Down Expand Up @@ -471,7 +477,7 @@ TExprBase KqpRewriteStreamLookupIndex(const TExprBase& node, TExprContext& ctx,
.Table(streamLookupIndex.Table())
.LookupKeys(lookupKeys.Cast())
.Columns(streamLookupIndex.Columns())
.LookupStrategy().Build(streamLookupIndex.LookupStrategy())
.Settings(settings.BuildNode(ctx, node.Pos()))
.Done();
}

Expand Down
Loading
Loading