Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader iterator unification #12387

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions ydb/core/tx/columnshard/engines/column_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@ TSelectInfo::TStats TSelectInfo::Stats() const {
return out;
}

void TSelectInfo::DebugStream(IOutputStream& out) {
TString TSelectInfo::DebugString() const {
TStringBuilder result;
result << "count:" << PortionsOrderedPK.size() << ";";
if (PortionsOrderedPK.size()) {
out << "portions:";
result << "portions:";
for (auto& portionInfo : PortionsOrderedPK) {
out << portionInfo->DebugString();
result << portionInfo->DebugString();
}
}
return result;
}

} // namespace NKikimr::NOlap
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct TSelectInfo {

TStats Stats() const;

void DebugStream(IOutputStream& out);
TString DebugString() const;
};

class TColumnEngineStats {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TDataStorageAccessor {
};

// Holds all metadata that is needed to perform read/scan
struct TReadMetadataBase {
class TReadMetadataBase {
public:
enum class ESorting {
NONE = 0 /* "not_sorted" */,
Expand Down Expand Up @@ -153,8 +153,8 @@ struct TReadMetadataBase {

ui64 Limit = 0;

virtual void Dump(IOutputStream& out) const {
out << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}"
virtual TString DebugString() const {
return TStringBuilder() << " predicate{" << (PKRangesFilter ? PKRangesFilter->DebugString() : "no_initialized") << "}"
<< " " << Sorting << " sorted";
}

Expand All @@ -179,12 +179,6 @@ struct TReadMetadataBase {
virtual std::unique_ptr<TScanIteratorBase> StartScan(const std::shared_ptr<TReadContext>& readContext) const = 0;
virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const = 0;

// TODO: can this only be done for base class?
friend IOutputStream& operator<<(IOutputStream& out, const TReadMetadataBase& meta) {
meta.Dump(out);
return out;
}

const TProgramContainer& GetProgram() const {
return Program;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include "read_metadata.h"

#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/transactions/locks/read_finished.h>
#include <ydb/core/tx/columnshard/transactions/locks/read_start.h>

namespace NKikimr::NOlap::NReader::NCommon {

TConclusionStatus TReadMetadata::Init(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) {
SetPKRangesFilter(readDescription.PKRangesFilter);
InitShardingInfo(readDescription.PathId);
TxId = readDescription.TxId;
LockId = readDescription.LockId;
if (LockId) {
owner->GetOperationsManager().RegisterLock(*LockId, owner->Generation());
LockSharingInfo = owner->GetOperationsManager().GetLockVerified(*LockId).GetSharingInfo();
}

SelectInfo = dataAccessor.Select(readDescription, !!LockId);
if (LockId) {
for (auto&& i : SelectInfo->PortionsOrderedPK) {
if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) {
if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) {
} else {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(i->GetInsertWriteIdVerified());
AddWriteIdToCheck(i->GetInsertWriteIdVerified(), op->GetLockId());
}
}
}
}

{
auto customConclusion = DoInitCustom(owner, readDescription, dataAccessor);
if (customConclusion.IsFail()) {
return customConclusion;
}
}

StatsMode = readDescription.StatsMode;
return TConclusionStatus::Success();
}

std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds() const {
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
std::set<ui32> result;
for (auto&& i : GetProgram().GetEarlyFilterColumns()) {
auto id = indexInfo.GetColumnIdOptional(i);
if (id) {
result.emplace(*id);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("early_filter_column", i);
}
}
return result;
}

std::set<ui32> TReadMetadata::GetPKColumnIds() const {
std::set<ui32> result;
auto& indexInfo = ResultIndexSchema->GetIndexInfo();
for (auto&& i : indexInfo.GetPrimaryKeyColumns()) {
Y_ABORT_UNLESS(result.emplace(indexInfo.GetColumnIdVerified(i.first)).second);
}
return result;
}

NArrow::NMerger::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const {
return NArrow::NMerger::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0, GetReplaceKey()->field_names(), {}, IsDescSorted());
}

void TReadMetadata::DoOnReadFinished(NColumnShard::TColumnShard& owner) const {
if (!GetLockId()) {
return;
}
const ui64 lock = *GetLockId();
if (GetBrokenWithCommitted()) {
owner.GetOperationsManager().GetLockVerified(lock).SetBroken();
} else {
NOlap::NTxInteractions::TTxConflicts conflicts;
for (auto&& i : GetConflictableLockIds()) {
conflicts.Add(i, lock);
}
auto writer = std::make_shared<NOlap::NTxInteractions::TEvReadFinishedWriter>(PathId, conflicts);
owner.GetOperationsManager().AddEventForLock(owner, lock, writer);
}
}

void TReadMetadata::DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const {
if (!LockId) {
return;
}
auto evWriter = std::make_shared<NOlap::NTxInteractions::TEvReadStartWriter>(
PathId, GetResultSchema()->GetIndexInfo().GetPrimaryKey(), GetPKRangesFilterPtr(), GetConflictableLockIds());
owner.GetOperationsManager().AddEventForLock(owner, *LockId, evWriter);
}

void TReadMetadata::DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const {
if (LockSharingInfo) {
NKikimrDataEvents::TLock lockInfo;
lockInfo.SetLockId(LockSharingInfo->GetLockId());
lockInfo.SetGeneration(LockSharingInfo->GetGeneration());
lockInfo.SetDataShard(tabletId);
lockInfo.SetCounter(LockSharingInfo->GetCounter());
lockInfo.SetPathId(PathId);
lockInfo.SetHasWrites(LockSharingInfo->HasWrites());
if (LockSharingInfo->IsBroken()) {
scanData.LocksInfo.BrokenLocks.emplace_back(std::move(lockInfo));
} else {
scanData.LocksInfo.Locks.emplace_back(std::move(lockInfo));
}
}
}

bool TReadMetadata::IsMyUncommitted(const TInsertWriteId writeId) const {
AFL_VERIFY(LockSharingInfo);
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end())("write_id", writeId)("write_ids_count", ConflictedWriteIds.size());
return it->second.GetLockId() == LockSharingInfo->GetLockId();
}

} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
#pragma once
#include <ydb/core/formats/arrow/reader/position.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/common/stats.h>

#include <ydb/library/formats/arrow/replace_key.h>

namespace NKikimr::NColumnShard {
class TLockSharingInfo;
}

namespace NKikimr::NOlap::NReader::NCommon {

class TReadMetadata: public TReadMetadataBase {
using TBase = TReadMetadataBase;

private:
const ui64 PathId;
std::shared_ptr<TAtomicCounter> BrokenWithCommitted = std::make_shared<TAtomicCounter>();
std::shared_ptr<NColumnShard::TLockSharingInfo> LockSharingInfo;

class TWriteIdInfo {
private:
const ui64 LockId;
std::shared_ptr<TAtomicCounter> Conflicts;

public:
TWriteIdInfo(const ui64 lockId, const std::shared_ptr<TAtomicCounter>& counter)
: LockId(lockId)
, Conflicts(counter) {
}

ui64 GetLockId() const {
return LockId;
}

void MarkAsConflictable() const {
Conflicts->Inc();
}

bool IsConflictable() const {
return Conflicts->Val();
}
};

THashMap<ui64, std::shared_ptr<TAtomicCounter>> LockConflictCounters;
THashMap<TInsertWriteId, TWriteIdInfo> ConflictedWriteIds;

virtual void DoOnReadFinished(NColumnShard::TColumnShard& owner) const override;
virtual void DoOnBeforeStartReading(NColumnShard::TColumnShard& owner) const override;
virtual void DoOnReplyConstruction(const ui64 tabletId, NKqp::NInternalImplementation::TEvScanData& scanData) const override;

virtual TConclusionStatus DoInitCustom(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor) = 0;

public:
using TConstPtr = std::shared_ptr<const TReadMetadata>;

bool GetBrokenWithCommitted() const {
return BrokenWithCommitted->Val();
}
THashSet<ui64> GetConflictableLockIds() const {
THashSet<ui64> result;
for (auto&& i : ConflictedWriteIds) {
if (i.second.IsConflictable()) {
result.emplace(i.second.GetLockId());
}
}
return result;
}

bool IsLockConflictable(const ui64 lockId) const {
auto it = LockConflictCounters.find(lockId);
AFL_VERIFY(it != LockConflictCounters.end());
return it->second->Val();
}

bool IsWriteConflictable(const TInsertWriteId writeId) const {
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end());
return it->second.IsConflictable();
}

void AddWriteIdToCheck(const TInsertWriteId writeId, const ui64 lockId) {
auto it = LockConflictCounters.find(lockId);
if (it == LockConflictCounters.end()) {
it = LockConflictCounters.emplace(lockId, std::make_shared<TAtomicCounter>()).first;
}
AFL_VERIFY(ConflictedWriteIds.emplace(writeId, TWriteIdInfo(lockId, it->second)).second);
}

[[nodiscard]] bool IsMyUncommitted(const TInsertWriteId writeId) const;

void SetConflictedWriteId(const TInsertWriteId writeId) const {
auto it = ConflictedWriteIds.find(writeId);
AFL_VERIFY(it != ConflictedWriteIds.end());
it->second.MarkAsConflictable();
}

void SetBrokenWithCommitted() const {
BrokenWithCommitted->Inc();
}

NArrow::NMerger::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const;
virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const = 0;

bool HasProcessingColumnIds() const {
return GetProgram().HasProcessingColumnIds();
}

ui64 GetPathId() const {
return PathId;
}

std::shared_ptr<TSelectInfo> SelectInfo;
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
std::shared_ptr<TReadStats> ReadStats;

TReadMetadata(const ui64 pathId, const std::shared_ptr<TVersionedIndex> info, const TSnapshot& snapshot, const ESorting sorting,
const TProgramContainer& ssaProgram, const std::shared_ptr<IScanCursor>& scanCursor)
: TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor)
, PathId(pathId)
, ReadStats(std::make_shared<TReadStats>()) {
}

virtual std::vector<TNameTypeInfo> GetKeyYqlSchema() const override {
return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns();
}

TConclusionStatus Init(
const NColumnShard::TColumnShard* owner, const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor);

std::vector<std::string> GetColumnsOrder() const {
auto schema = GetResultSchema();
std::vector<std::string> result;
for (auto&& i : schema->GetSchema()->fields()) {
result.emplace_back(i->name());
}
return result;
}

std::set<ui32> GetEarlyFilterColumnIds() const;
std::set<ui32> GetPKColumnIds() const;

virtual bool Empty() const = 0;

size_t NumIndexedBlobs() const {
Y_ABORT_UNLESS(SelectInfo);
return SelectInfo->Stats().Blobs;
}

virtual TString DebugString() const override {
TStringBuilder result;

result << TBase::DebugString() << ";" << " index blobs: " << NumIndexedBlobs() << " committed blobs: "
<< " at snapshot: " << GetRequestSnapshot().DebugString();

if (SelectInfo) {
result << ", " << SelectInfo->DebugString();
}
return result;
}
};

} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()

SRCS(
read_metadata.cpp
)

PEERDIR(
ydb/core/tx/columnshard/engines/reader/abstract
ydb/core/kqp/compute_actor
)

END()
Loading
Loading