Skip to content

Commit

Permalink
KQP part of duplicated columns (#9044)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrLolthe1st authored Sep 16, 2024
1 parent 4dc00f1 commit c42930c
Show file tree
Hide file tree
Showing 17 changed files with 84 additions and 63 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt
queryResultIndex = result.GetQueryResultIndex();
}

TxResults.emplace_back(result.GetIsStream(), resultMeta.MkqlItemType, &resultMeta.ColumnOrder,
TxResults.emplace_back(result.GetIsStream(), resultMeta.MkqlItemType, &resultMeta.ColumnOrder, &resultMeta.ColumnHints,
queryResultIndex);
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
batch.Payload = std::move(computeData.Payload);

TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder);
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder, txResult.ColumnHints);

if (!trailingResults) {
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
Expand Down Expand Up @@ -1841,7 +1841,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
IActor* proxy;
if (txResult.IsStream && txResult.QueryResultIndex.Defined()) {
proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
txResult.ColumnOrder, *txResult.QueryResultIndex, Target, this->SelfId(), StatementResultIndex);
txResult.ColumnOrder, txResult.ColumnHints, *txResult.QueryResultIndex, Target, this->SelfId(), StatementResultIndex);
} else {
proxy = CreateResultDataChannelProxy(TxId, channel.Id, this->SelfId(),
channel.DstInputIndex, ResponseEv.get());
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/kqp/executer_actor/kqp_result_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,11 @@ class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelPro
class TResultStreamChannelProxy : public TResultCommonChannelProxy {
public:
TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target,
const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, TActorId target,
TActorId executer, size_t statementResultIndex)
: TResultCommonChannelProxy(txId, channelId, executer)
, ColumnOrder(columnOrder)
, ColumnHints(columnHints)
, ItemType(itemType)
, QueryResultIndex(queryResultIndex)
, Target(target)
Expand All @@ -157,7 +158,7 @@ class TResultStreamChannelProxy : public TResultCommonChannelProxy {
batch.Payload = std::move(computeData.Payload);

TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), ItemType, ColumnOrder);
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), ItemType, ColumnOrder, ColumnHints);

auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
Expand All @@ -173,6 +174,7 @@ class TResultStreamChannelProxy : public TResultCommonChannelProxy {

private:
const TVector<ui32>* ColumnOrder;
const TVector<TString>* ColumnHints;
NKikimr::NMiniKQL::TType* ItemType;
ui32 QueryResultIndex = 0;
const NActors::TActorId Target;
Expand Down Expand Up @@ -214,15 +216,15 @@ class TResultDataChannelProxy : public TResultCommonChannelProxy {
} // anonymous namespace end

NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target,
const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, TActorId target,
TActorId executer, ui32 statementResultIndex)
{
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
"CreateResultStreamChannelProxy: TxId: " << txId <<
", channelId: " << channelId
);

return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, queryResultIndex, target,
return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, columnHints, queryResultIndex, target,
executer, statementResultIndex);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_result_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct TQueryExecutionStats;
struct TKqpExecuterTxResult;

NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target,
const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, NActors::TActorId target,
NActors::TActorId executer, ui32 statementResultIndex);

NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
{
TVector<TString> columnHints(NCommon::GetResOrPullColumnHints(node));
TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node));

auto protoValue = &resultValue;
YQL_ENSURE(resultValue.GetArena());
Expand Down
19 changes: 9 additions & 10 deletions ydb/core/kqp/provider/yql_kikimr_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ bool ResultsOverflow(ui64 rows, ui64 bytes, const IDataProvider::TFillSettings&
}

void WriteValueToYson(const TStringStream& stream, NResult::TYsonResultWriter& writer, const NKikimrMiniKQL::TType& type,
const NKikimrMiniKQL::TValue& value, const TVector<TString>* fieldsOrder,
const NKikimrMiniKQL::TValue& value, const TColumnOrder* fieldsOrder,
const IDataProvider::TFillSettings& fillSettings, bool& truncated, bool firstLevel = false)
{
switch (type.GetKind()) {
Expand Down Expand Up @@ -203,13 +203,13 @@ void WriteValueToYson(const TStringStream& stream, NResult::TYsonResultWriter& w
};

if (fieldsOrder) {
YQL_ENSURE(fieldsOrder->size() == structType.MemberSize());
YQL_ENSURE(fieldsOrder->Size() == structType.MemberSize());
TMap<TString, size_t> memberIndices;
for (size_t i = 0; i < structType.MemberSize(); ++i) {
memberIndices[structType.GetMember(i).GetName()] = i;
}
for (auto& field : *fieldsOrder) {
auto* memberIndex = memberIndices.FindPtr(field);
auto* memberIndex = memberIndices.FindPtr(field.PhysicalName);
YQL_ENSURE(memberIndex);

writeMember(*memberIndex);
Expand Down Expand Up @@ -330,11 +330,11 @@ Y_FORCE_INLINE bool ExportStructTypeToKikimrProto(const TStructExprType* type, T
} // namespace

void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, const NKikimrMiniKQL::TResult& result,
const TVector<TString>& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated)
const TColumnOrder& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated)
{
truncated = false;
NResult::TYsonResultWriter resultWriter(writer);
WriteValueToYson(stream, resultWriter, result.GetType(), result.GetValue(), columnHints.empty() ? nullptr : &columnHints,
WriteValueToYson(stream, resultWriter, result.GetType(), result.GetValue(), columnHints.Size() == 0 ? nullptr : &columnHints,
fillSettings, truncated, true);
}

Expand All @@ -352,7 +352,7 @@ bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result) {
return structType.GetMember(0).GetName() != "Data" || structType.GetMember(1).GetName() != "Truncated";
}

NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TVector<TString>& columnHints,
NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena)
{
NKikimrMiniKQL::TResult* packedResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena);
Expand All @@ -371,12 +371,11 @@ NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& resu
auto* truncatedValue = packedValue->AddStruct();

bool truncated = false;
TColumnOrder order(columnHints);
if (result.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
const auto& itemType = result.GetType().GetList().GetItem();

TMap<TString, size_t> memberIndices;
if (itemType.GetKind() == NKikimrMiniKQL::ETypeKind::Struct && !columnHints.empty()) {
if (itemType.GetKind() == NKikimrMiniKQL::ETypeKind::Struct && columnHints.Size() != 0) {
const auto& structType = itemType.GetStruct();

for (size_t i = 0; i < structType.MemberSize(); ++i) {
Expand All @@ -387,7 +386,7 @@ NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& resu
auto* newItem = dataType->MutableList()->MutableItem();
newItem->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
auto* newStructType = newItem->MutableStruct();
for (auto& [column, gen_col] : order) {
for (auto& [column, gen_col] : columnHints) {
auto* memberIndex = memberIndices.FindPtr(gen_col);
YQL_ENSURE(memberIndex);

Expand All @@ -406,7 +405,7 @@ NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& resu
}
if (!memberIndices.empty()) {
auto* newStruct = dataValue->AddList();
for (auto& [column, gen_column] : order) {
for (auto& [column, gen_column] : columnHints) {
auto* memberIndex = memberIndices.FindPtr(gen_column);
YQL_ENSURE(memberIndex);

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_results.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
namespace NYql {

void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, const NKikimrMiniKQL::TResult& result,
const TVector<TString>& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated);
const TColumnOrder& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated);

NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TVector<TString>& columnHints,
NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena);

bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result);
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,20 +578,22 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
}

THashMap<TString, int> columnOrder;
TColumnOrder order;
columnOrder.reserve(kikimrProto.GetStruct().MemberSize());
if (!txResult.GetColumnHints().empty()) {
YQL_ENSURE(txResult.GetColumnHints().size() == (int)kikimrProto.GetStruct().MemberSize());
for (int i = 0; i < txResult.GetColumnHints().size(); i++) {
const auto& hint = txResult.GetColumnHints().at(i);
columnOrder[TString(hint)] = i;
columnOrder[order.AddColumn(TString(hint))] = i;
}
}

int id = 0;
for (const auto& column : kikimrProto.GetStruct().GetMember()) {
int bindingColumnId = columnOrder.count(column.GetName()) ? columnOrder.at(column.GetName()) : id++;
auto it = columnOrder.find(column.GetName());
int bindingColumnId = it != columnOrder.end() ? it->second : id++;
auto& columnMeta = resultMetaColumns->at(bindingColumnId);
columnMeta.Setname(column.GetName());
columnMeta.Setname(it != columnOrder.end() ? order.at(it->second).LogicalName : column.GetName());
ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta.mutable_type());
}
}
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/kqp/query_data/kqp_prepared_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar
for(ui32 i = 0; i < structType->GetMembersCount(); ++i) {
memberIndices[TString(structType->GetMemberName(i))] = i;
}

for(auto& name: txResult.GetColumnHints()) {
auto it = memberIndices.find(name);
NYql::TColumnOrder order;
for (auto& name: txResult.GetColumnHints()) {
order.AddColumn(name);
}
for (auto& [name, phy_name]: order) {
auto it = memberIndices.find(phy_name);
YQL_ENSURE(it != memberIndices.end(), "undetermined column name: " << name);
result.ColumnOrder.push_back(it->second);
result.ColumnHints.push_back(name);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/query_data/kqp_prepared_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TPreparedQueryAllocHolder;
struct TPhyTxResultMetadata {
NKikimr::NMiniKQL::TType* MkqlItemType;
TVector<ui32> ColumnOrder;
TVector<TString> ColumnHints;
};

struct TTableConstInfoMap : public TAtomicRefCount<TTableConstInfoMap> {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/query_data/kqp_query_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void TKqpExecuterTxResult::FillMkql(NKikimrMiniKQL::TResult* mkqlResult) {
});
} else {
YQL_ENSURE(Rows.RowCount() == 1, "Actual buffer size: " << Rows.RowCount());
ExportTypeToProto(MkqlItemType, *mkqlResult->MutableType());
ExportTypeToProto(MkqlItemType, *mkqlResult->MutableType(), ColumnOrder);
ExportValueToProto(MkqlItemType, *Rows.Head(), *mkqlResult->MutableValue());
}
}
Expand Down Expand Up @@ -97,7 +97,7 @@ void TKqpExecuterTxResult::FillYdb(Ydb::ResultSet* ydbResult, TMaybe<ui64> rowsL
for (ui32 idx = 0; idx < mkqlSrcRowStructType->GetMembersCount(); ++idx) {
auto* column = ydbResult->add_columns();
ui32 memberIndex = (!ColumnOrder || ColumnOrder->empty()) ? idx : (*ColumnOrder)[idx];
column->set_name(TString(mkqlSrcRowStructType->GetMemberName(memberIndex)));
column->set_name(ColumnHints && ColumnHints->size() ? ColumnHints->at(idx) : TString(mkqlSrcRowStructType->GetMemberName(memberIndex)));
ExportTypeToProto(mkqlSrcRowStructType->GetMemberType(memberIndex), *column->mutable_type());
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/query_data/kqp_query_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct TKqpExecuterTxResult {
bool IsStream = true;
NKikimr::NMiniKQL::TType* MkqlItemType;
const TVector<ui32>* ColumnOrder = nullptr;
const TVector<TString>* ColumnHints = nullptr;
TMaybe<ui32> QueryResultIndex = 0;
NKikimr::NMiniKQL::TUnboxedValueBatch Rows;
Ydb::ResultSet TrailingResult;
Expand All @@ -88,10 +89,12 @@ struct TKqpExecuterTxResult {
bool isStream,
NKikimr::NMiniKQL::TType* mkqlItemType,
const TVector<ui32>* сolumnOrder,
const TVector<TString>* columnHints,
const TMaybe<ui32>& queryResultIndex)
: IsStream(isStream)
, MkqlItemType(mkqlItemType)
, ColumnOrder(сolumnOrder)
, ColumnHints(columnHints)
, QueryResultIndex(queryResultIndex)
{}

Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/runtime/kqp_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,18 @@ TKqpProtoBuilder::~TKqpProtoBuilder() {
Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
TVector<NYql::NDq::TDqSerializedBatch>&& data,
NKikimr::NMiniKQL::TType* mkqlSrcRowType,
const TVector<ui32>* columnOrder)
const TVector<ui32>* columnOrder,
const TVector<TString>* columnHints)
{
YQL_ENSURE(mkqlSrcRowType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct);
const auto* mkqlSrcRowStructType = static_cast<const TStructType*>(mkqlSrcRowType);

Ydb::ResultSet resultSet;

TColumnOrder order = columnHints ? TColumnOrder(*columnHints) : TColumnOrder{};
for (ui32 idx = 0; idx < mkqlSrcRowStructType->GetMembersCount(); ++idx) {
auto* column = resultSet.add_columns();
ui32 memberIndex = (!columnOrder || columnOrder->empty()) ? idx : (*columnOrder)[idx];
column->set_name(TString(mkqlSrcRowStructType->GetMemberName(memberIndex)));
column->set_name(TString(columnHints && columnHints->size() ? order.at(idx).LogicalName : mkqlSrcRowStructType->GetMemberName(memberIndex)));
ExportTypeToProto(mkqlSrcRowStructType->GetMemberType(memberIndex), *column->mutable_type());
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TKqpProtoBuilder : private TNonCopyable {
~TKqpProtoBuilder();

Ydb::ResultSet BuildYdbResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data,
NKikimr::NMiniKQL::TType* srcRowType, const TVector<ui32>* columnOrder = nullptr);
NKikimr::NMiniKQL::TType* srcRowType, const TVector<ui32>* columnOrder = nullptr, const TVector<TString>* columnHints = nullptr);

private:
NMiniKQL::TScopedAlloc* Alloc = nullptr;
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,26 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}

Y_UNIT_TEST(DuplicatedColumns) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

auto result = session.ExecuteDataQuery(R"(
--!syntax_pg
SELECT 1 a, '2' a, '1' || '2' a;
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_C(result.GetResultSet(0).GetColumnsMeta().size() == 3, "Expected 3 columns");
for (size_t i = 0; i < 3; ++i) {
UNIT_ASSERT_C(result.GetResultSet(0).GetColumnsMeta()[i].Name == "a", TStringBuilder() << "Expected all columns to be named 'a', but got: " << result.GetResultSet(0).GetColumnsMeta()[i].Name);
}
CompareYson(R"([
["1";"2";"12"]
])", FormatResultSetYson(result.GetResultSet(0)));
}

Y_UNIT_TEST(ReadPgArray) {
NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__);
auto binaryStr = NPg::PgNativeBinaryFromNativeText("{1,1}", INT2ARRAYOID).Str;
Expand Down
Loading

0 comments on commit c42930c

Please sign in to comment.