Skip to content

Commit

Permalink
Reader iterator unification (ydb-platform#12387)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 9, 2024
1 parent e19930f commit ed0816f
Show file tree
Hide file tree
Showing 28 changed files with 767 additions and 1,174 deletions.
9 changes: 6 additions & 3 deletions ydb/core/tx/columnshard/engines/column_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@ TSelectInfo::TStats TSelectInfo::Stats() const {
return out;
}

void TSelectInfo::DebugStream(IOutputStream& out) {
TString TSelectInfo::DebugString() const {
TStringBuilder result;
result << "count:" << PortionsOrderedPK.size() << ";";
if (PortionsOrderedPK.size()) {
out << "portions:";
result << "portions:";
for (auto& portionInfo : PortionsOrderedPK) {
out << portionInfo->DebugString();
result << portionInfo->DebugString();
}
}
return result;
}

} // namespace NKikimr::NOlap
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct TSelectInfo {

TStats Stats() const;

void DebugStream(IOutputStream& out);
TString DebugString() const;
};

class TColumnEngineStats {
Expand Down
12 changes: 3 additions & 9 deletions ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TDataStorageAccessor {
};

// Holds all metadata that is needed to perform read/scan
struct TReadMetadataBase {
class TReadMetadataBase {
public:
enum class ESorting {
NONE = 0 /* "not_sorted" */,
Expand Down Expand Up @@ -153,8 +153,8 @@ struct TReadMetadataBase {

ui64 Limit = 0;

virtual void Dump(IOutputStream& out) const {
out << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}"
virtual TString DebugString() const {
return TStringBuilder() << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}"
<< " " << Sorting << " sorted";
}

Expand All @@ -179,12 +179,6 @@ struct TReadMetadataBase {
virtual std::unique_ptr<TScanIteratorBase> StartScan(const std::shared_ptr<TReadContext>& readContext) const = 0;
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const = 0;

// TODO: can this only be done for base class?
friend IOutputStream& operator<<(IOutputStream& out, const TReadMetadataBase& meta) {
meta.Dump(out);
return out;
}

const TProgramContainer& GetProgram() const {
return Program;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include "read_metadata.h"

#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/transactions/locks/read_finished.h>
#include <ydb/core/tx/columnshard/transactions/locks/read_start.h>

namespace NKikimr::NOlap::NReader::NCommon {

TConclusionStatus TReadMetadata::Init(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) {
SetPKRangesFilter(readDescription.PKRangesFilter);
InitShardingInfo(readDescription.PathId);
TxId = readDescription.TxId;
LockId = readDescription.LockId;
if (LockId) {
owner->GetOperationsManager().RegisterLock(*LockId, owner->Generation());
LockSharingInfo = owner->GetOperationsManager().GetLockVerified(*LockId).GetSharingInfo();
}

SelectInfo = dataAccessor.Select(readDescription, !!LockId);
if (LockId) {
for (auto&& i : SelectInfo->PortionsOrderedPK) {
if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) {
if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) {
} else {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i->GetInsertWriteIdVerified());
AddWriteIdToCheck(i->GetInsertWriteIdVerified(), op->GetLockId());
}
}
}
}

{
auto customConclusion = DoInitCustom(owner, readDescription, dataAccessor);
if (customConclusion.IsFail()) {
return customConclusion;
}
}

StatsMode = readDescription.StatsMode;
return TConclusionStatus::Success();
}

std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
std::set<ui32> result;
for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
auto id = indexInfo.GetColumnIdOptional(i);
if (id) {
result.emplace(*id);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
return result;
}

std::set<ui32> TReadMetadata::GetPKColumnIds() const {
std::set<ui32> result;
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
for (auto&& i : indexInfo.GetPrimaryKeyColumns()) {
Y_ABORT_UNLESS(result.emplace(indexInfo.GetColumnIdVerified(i.first)).second);
}
return result;
}

NArrow::NMerger::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const {
return NArrow::NMerger::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0, GetReplaceKey()->field_names(), {}, IsDescSorted());
}

void TReadMetadata::DoOnReadFinished(NColumnShard::TColumnShard& owner) const {
if (!GetLockId()) {
return;
}
const ui64 lock = *GetLockId();
if (GetBrokenWithCommitted()) {
owner.GetOperationsManager().GetLockVerified(lock).SetBroken();
} else {
NOlap::NTxInteractions::TTxConflicts conflicts;
for (auto&& i : GetConflictableLockIds()) {
conflicts.Add(i, lock);
}
auto writer = std::make_shared<NOlap::NTxInteractions::TEvReadFinishedWriter>(PathId, conflicts);
owner.GetOperationsManager().AddEventForLock(owner, lock, writer);
}
}

void TReadMetadata::DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const {
if (!LockId) {
return;
}
auto evWriter = std::make_shared<NOlap::NTxInteractions::TEvReadStartWriter>(
PathId, GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetConflictableLockIds());
owner.GetOperationsManager().AddEventForLock(owner, *LockId, evWriter);
}

void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const {
if (LockSharingInfo) {
NKikimrDataEvents::TLock lockInfo;
lockInfo.SetLockId(LockSharingInfo->GetLockId());
lockInfo.SetGeneration(LockSharingInfo->GetGeneration());
lockInfo.SetDataShard(tabletId);
lockInfo.SetCounter(LockSharingInfo->GetCounter());
lockInfo.SetPathId(PathId);
lockInfo.SetHasWrites(LockSharingInfo->HasWrites());
if (LockSharingInfo->IsBroken()) {
scanData.LocksInfo.BrokenLocks.emplace_back(std::move(lockInfo));
} else {
scanData.LocksInfo.Locks.emplace_back(std::move(lockInfo));
}
}
}

bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const {
AFL_VERIFY(LockSharingInfo);
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size());
return it->second.GetLockId() == LockSharingInfo->GetLockId();
}

} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#pragma once
#include <ydb/core/formats/arrow/reader/position.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/common/stats.h>

#include <ydb/library/formats/arrow/replace_key.h>

namespace NKikimr::NColumnShard {
class TLockSharingInfo;
}

namespace NKikimr::NOlap::NReader::NCommon {

class TReadMetadata: public TReadMetadataBase {
using TBase = TReadMetadataBase;

private:
const ui64 PathId;
std::shared_ptr<TAtomicCounter> BrokenWithCommitted = std::make_shared<TAtomicCounter>();
std::shared_ptr<NColumnShard::TLockSharingInfo> LockSharingInfo;

class TWriteIdInfo {
private:
const ui64 LockId;
std::shared_ptr<TAtomicCounter> Conflicts;

public:
TWriteIdInfo(const ui64 lockId, const std::shared_ptr<TAtomicCounter>& counter)
: LockId(lockId)
, Conflicts(counter) {
}

ui64 GetLockId() const {
return LockId;
}

void MarkAsConflictable() const {
Conflicts->Inc();
}

bool IsConflictable() const {
return Conflicts->Val();
}
};

THashMap<ui64, std::shared_ptr<TAtomicCounter>> LockConflictCounters;
THashMap<TInsertWriteId, TWriteIdInfo> ConflictedWriteIds;

virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override;
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override;
virtual void DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const override;

virtual TConclusionStatus DoInitCustom(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) = 0;

public:
using TConstPtr = std::shared_ptr<const TReadMetadata>;

bool GetBrokenWithCommitted() const {
return BrokenWithCommitted->Val();
}
THashSet<ui64> GetConflictableLockIds() const {
THashSet<ui64> result;
for (auto&& i : ConflictedWriteIds) {
if (i.second.IsConflictable()) {
result.emplace(i.second.GetLockId());
}
}
return result;
}

bool IsLockConflictable(const ui64 lockId) const {
auto it = LockConflictCounters.find(lockId);
AFL_VERIFY(it != LockConflictCounters.end());
return it->second->Val();
}

bool IsWriteConflictable(const TInsertWriteId writeId) const {
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end());
return it->second.IsConflictable();
}

void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) {
auto it = LockConflictCounters.find(lockId);
if (it == LockConflictCounters.end()) {
it = LockConflictCounters.emplace(lockId, std::make_shared<TAtomicCounter>()).first;
}
AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second);
}

[[nodiscard]] bool IsMyUncommitted(const TInsertWriteId writeId) const;

void SetConflictedWriteId(const TInsertWriteId writeId) const {
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end());
it->second.MarkAsConflictable();
}

void SetBrokenWithCommitted() const {
BrokenWithCommitted->Inc();
}

NArrow::NMerger::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const;
virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const = 0;

bool HasProcessingColumnIds() const {
return GetProgram().HasProcessingColumnIds();
}

ui64 GetPathId() const {
return PathId;
}

std::shared_ptr<TSelectInfo> SelectInfo;
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
std::shared_ptr<TReadStats> ReadStats;

TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
, PathId(pathId)
, ReadStats(std::make_shared<TReadStats>()) {
}

virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();
}

TConclusionStatus Init(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor);

std::vector<std::string> GetColumnsOrder() const {
auto schema = GetResultSchema();
std::vector<std::string> result;
for (auto&& i : schema->GetSchema()->fields()) {
result.emplace_back(i->name());
}
return result;
}

std::set<ui32> GetEarlyFilterColumnIds() const;
std::set<ui32> GetPKColumnIds() const;

virtual bool Empty() const = 0;

size_t NumIndexedBlobs() const {
Y_ABORT_UNLESS(SelectInfo);
return SelectInfo->Stats().Blobs;
}

virtual TString DebugString() const override {
TStringBuilder result;

result << TBase::DebugString() << ";" << " index blobs: " << NumIndexedBlobs() << " committed blobs: "
<< " at snapshot: " << GetRequestSnapshot().DebugString();

if (SelectInfo) {
result << ", " << SelectInfo->DebugString();
}
return result;
}
};

} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()

SRCS(
read_metadata.cpp
)

PEERDIR(
ydb/core/tx/columnshard/engines/reader/abstract
ydb/core/kqp/compute_actor
)

END()
Loading

0 comments on commit ed0816f

Please sign in to comment.