Skip to content

Commit

Permalink
Merge 352f0ec into baa0367
Browse files Browse the repository at this point in the history
  • Loading branch information
zinal authored Jan 6, 2025
2 parents baa0367 + 352f0ec commit aaefe21
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
39 changes: 28 additions & 11 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,31 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt

YQL_ENSURE(partitionInfo);

// Binary search of the index to start with.
size_t idxStart = 0;
size_t idxFinish = partitionInfo->size();
while ((idxFinish - idxStart) > 1) {
size_t idxCur = (idxFinish + idxStart) / 2;
const auto& partCur = (*partitionInfo)[idxCur].Range->EndKeyPrefix.GetCells();
YQL_ENSURE(partCur.size() <= keyColumnTypes.size());
int cmp = CompareTypedCellVectors(partCur.data(), range.From.data(), keyColumnTypes.data(),
std::min(partCur.size(), range.From.size()));
if (cmp < 0) {
idxStart = idxCur + 1;
} else {
idxFinish = idxCur;
}
}
if (idxStart > 0) {
idxStart -= 1;
} else if (idxStart >= partitionInfo->size()) {
idxStart = partitionInfo->size() - 1;
}

std::vector<TCell> minusInf(keyColumnTypes.size());

std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition;
for (size_t idx = 0; idx < partitionInfo->size(); ++idx) {
for (size_t idx = idxStart; idx < partitionInfo->size(); ++idx) {
TTableRange partitionRange{
idx == 0 ? minusInf : (*partitionInfo)[idx - 1].Range->EndKeyPrefix.GetCells(),
idx == 0 ? true : !(*partitionInfo)[idx - 1].Range->IsInclusive,
Expand Down Expand Up @@ -110,6 +131,12 @@ TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSetti
column.GetTypeInfo().GetPgTypeMod()
});
}

KeyColumnTypes.resize(KeyColumns.size());
for (const auto& [_, columnInfo] : KeyColumns) {
YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(KeyColumnTypes.size()));
KeyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
}
}

TKqpStreamLookupWorker::~TKqpStreamLookupWorker() {
Expand All @@ -123,16 +150,6 @@ TTableId TKqpStreamLookupWorker::GetTableId() const {
return TableId;
}

std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() const {
std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
for (const auto& [_, columnInfo] : KeyColumns) {
YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size()));
keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
}

return keyColumnTypes;
}

class TKqpLookupRows : public TKqpStreamLookupWorker {
public:
TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class TKqpStreamLookupWorker {

virtual std::string GetTablePath() const;
virtual TTableId GetTableId() const;
virtual std::vector<NScheme::TTypeInfo> GetKeyColumnTypes() const;

const std::vector<NScheme::TTypeInfo>& GetKeyColumnTypes() const {
return KeyColumnTypes;
}

virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
Expand All @@ -72,6 +75,7 @@ class TKqpStreamLookupWorker {
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
std::vector<TSysTables::TTableColumnInfo> Columns;
std::vector<NScheme::TTypeInfo> KeyColumnTypes;
};

std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
Expand Down

0 comments on commit aaefe21

Please sign in to comment.