Skip to content

Commit

Permalink
Simple reader (#11894)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 26, 2024
1 parent 7c597c1 commit 6ed5294
Show file tree
Hide file tree
Showing 80 changed files with 4,229 additions and 252 deletions.
19 changes: 12 additions & 7 deletions ydb/core/formats/arrow/common/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,32 @@ std::shared_ptr<NKikimr::NArrow::TGeneralContainer> TGeneralContainer::BuildEmpt
return std::make_shared<TGeneralContainer>(Schema, std::move(columns));
}

std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableOptional(const TTableConstructionContext& context) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
for (i32 i = 0; i < Schema->num_fields(); ++i) {
if (columnNames && !columnNames->contains(Schema->field(i)->name())) {
if (context.GetColumnNames() && !context.GetColumnNames()->contains(Schema->field(i)->name())) {
continue;
}
columns.emplace_back(Columns[i]->GetChunkedArray());
if (context.GetRecordsCount() || context.GetStartIndex()) {
columns.emplace_back(Columns[i]->Slice(context.GetStartIndex().value_or(0),
context.GetRecordsCount().value_or(GetRecordsCount() - context.GetStartIndex().value_or(0))));
} else {
columns.emplace_back(Columns[i]->GetChunkedArray());
}
fields.emplace_back(Schema->field(i));
}
if (fields.empty()) {
return nullptr;
}
AFL_VERIFY(RecordsCount);
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, *RecordsCount);
return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns, context.GetRecordsCount().value_or(*RecordsCount));
}

std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const std::optional<std::set<std::string>>& columnNames /*= {}*/) const {
auto result = BuildTableOptional(columnNames);
std::shared_ptr<arrow::Table> TGeneralContainer::BuildTableVerified(const TTableConstructionContext& context) const {
auto result = BuildTableOptional(context);
AFL_VERIFY(result);
AFL_VERIFY(!columnNames || result->schema()->num_fields() == (i32)columnNames->size());
AFL_VERIFY(!context.GetColumnNames() || result->schema()->num_fields() == (i32)context.GetColumnNames()->size());
return result;
}

Expand Down
25 changes: 23 additions & 2 deletions ydb/core/formats/arrow/common/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,29 @@ class TGeneralContainer {
return Columns[idx];
}

std::shared_ptr<arrow::Table> BuildTableVerified(const std::optional<std::set<std::string>>& columnNames = {}) const;
std::shared_ptr<arrow::Table> BuildTableOptional(const std::optional<std::set<std::string>>& columnNames = {}) const;
class TTableConstructionContext {
private:
YDB_ACCESSOR_DEF(std::optional<std::set<std::string>>, ColumnNames);
YDB_ACCESSOR_DEF(std::optional<ui32>, StartIndex);
YDB_ACCESSOR_DEF(std::optional<ui32>, RecordsCount);

public:
TTableConstructionContext() = default;
TTableConstructionContext(std::set<std::string>&& columnNames)
: ColumnNames(std::move(columnNames)) {
}

TTableConstructionContext(const std::set<std::string>& columnNames)
: ColumnNames(columnNames) {
}

void SetColumnNames(const std::vector<TString>& names) {
ColumnNames = std::set<std::string>(names.begin(), names.end());
}
};

std::shared_ptr<arrow::Table> BuildTableVerified(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;
std::shared_ptr<arrow::Table> BuildTableOptional(const TTableConstructionContext& context = Default<TTableConstructionContext>()) const;

std::shared_ptr<TGeneralContainer> BuildEmptySame() const;

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
std::vector<std::vector<ui32>> SplittedBatches;

TOwnedCellVec LastKey;
NKikimrKqp::TEvKqpScanCursor LastCursorProto;
TDuration CpuTime;
TDuration WaitTime;
ui32 PageFaults = 0; // number of page faults occurred when filling in this message
Expand Down Expand Up @@ -120,6 +121,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
ev->Finished = pbEv->Record.GetFinished();
ev->RequestedBytesLimitReached = pbEv->Record.GetRequestedBytesLimitReached();
ev->LastKey = TOwnedCellVec(TSerializedCellVec(pbEv->Record.GetLastKey()).GetCells());
ev->LastCursorProto = pbEv->Record.GetLastCursor();
if (pbEv->Record.HasAvailablePacks()) {
ev->AvailablePacks = pbEv->Record.GetAvailablePacks();
}
Expand Down Expand Up @@ -153,6 +155,7 @@ struct TEvScanData: public NActors::TEventLocal<TEvScanData, TKqpComputeEvents::
Remote->Record.SetPageFaults(PageFaults);
Remote->Record.SetPageFault(PageFault);
Remote->Record.SetLastKey(TSerializedCellVec::Serialize(LastKey));
*Remote->Record.MutableLastCursor() = LastCursorProto;
if (AvailablePacks) {
Remote->Record.SetAvailablePacks(*AvailablePacks);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct TShardState: public TCommonRetriesState {
bool SubscribedOnTablet = false;
TActorId ActorId;
TOwnedCellVec LastKey;
std::optional<NKikimrKqp::TEvKqpScanCursor> LastCursorProto;
std::optional<ui32> AvailablePacks;

TString PrintLastKey(TConstArrayRef<NScheme::TTypeInfo> keyTypes) const;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace NKikimr::NKqp::NScanPrivate {

class IExternalObjectsProvider {
public:
virtual std::unique_ptr<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const = 0;
virtual std::unique_ptr<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges,
const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const = 0;
virtual const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const = 0;
};

Expand Down Expand Up @@ -61,7 +62,7 @@ class TShardScannerInfo {

const auto& keyColumnTypes = externalObjectsProvider.GetKeyColumnTypes();
auto ranges = state.GetScanRanges(keyColumnTypes);
auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges);
auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges, state.LastCursorProto);

AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("tablet_id", TabletId)("generation", Generation)
("info", state.ToString(keyColumnTypes))("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry))
Expand Down
41 changes: 23 additions & 18 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
, ShardsScanningPolicy(shardsScanningPolicy)
, Counters(counters)
, InFlightShards(ScanId, *this)
, InFlightComputes(ComputeActorIds)
{
, InFlightComputes(ComputeActorIds) {
Y_UNUSED(traceId);
AFL_ENSURE(!Meta.GetReads().empty());
AFL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
Expand All @@ -47,7 +46,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
for (size_t i = 0; i < Meta.KeyColumnTypesSize(); i++) {
NScheme::TTypeId typeId = Meta.GetKeyColumnTypes().at(i);
NScheme::TTypeInfo typeInfo = NScheme::NTypeIds::IsParametrizedType(typeId) ?
NScheme::TypeInfoFromProto(typeId,Meta.GetKeyColumnTypeInfos().at(i)) :
NScheme::TypeInfoFromProto(typeId, Meta.GetKeyColumnTypeInfos().at(i)) :
NScheme::TTypeInfo(typeId);
KeyColumnTypes.push_back(typeInfo);
}
Expand Down Expand Up @@ -127,19 +126,19 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
("ScanId", ev->Get()->ScanId)
("Finished", ev->Get()->Finished)
("Lock", [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->LocksInfo.Locks) {
builder << lock.ShortDebugString();
}
return builder;
}())
TStringBuilder builder;
for (const auto& lock : ev->Get()->LocksInfo.Locks) {
builder << lock.ShortDebugString();
}
return builder;
}())
("BrokenLocks", [&]() {
TStringBuilder builder;
for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
builder << lock.ShortDebugString();
}
return builder;
}());
TStringBuilder builder;
for (const auto& lock : ev->Get()->LocksInfo.BrokenLocks) {
builder << lock.ShortDebugString();
}
return builder;
}());

TInstant startTime = TActivationContext::Now();
if (ev->Get()->Finished) {
Expand Down Expand Up @@ -347,11 +346,12 @@ void TKqpScanFetcherActor::HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySet

if (!state.LastKey.empty()) {
PendingShards.front().LastKey = std::move(state.LastKey);
while(!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) {
while (!PendingShards.empty() && PendingShards.front().GetScanRanges(KeyColumnTypes).empty()) {
CA_LOG_D("Nothing to read " << PendingShards.front().ToString(KeyColumnTypes));
auto readShard = std::move(PendingShards.front());
PendingShards.pop_front();
PendingShards.front().LastKey = std::move(readShard.LastKey);
PendingShards.front().LastCursorProto = std::move(readShard.LastCursorProto);
}

AFL_ENSURE(!PendingShards.empty());
Expand Down Expand Up @@ -409,7 +409,8 @@ bool TKqpScanFetcherActor::SendScanFinished() {
return true;
}

std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const {
std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEvKqpScan(const ui32 scanId, const ui32 gen,
const TSmallVec<TSerializedTableRange>& ranges, const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const {
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(ScanDataMeta.TableId.PathId.LocalPathId);
for (auto& column : ScanDataMeta.GetColumns()) {
Expand All @@ -423,6 +424,9 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
}
}
ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
if (cursor) {
*ev->Record.MutableScanCursor() = *cursor;
}

auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(ranges.size());
Expand Down Expand Up @@ -489,10 +493,11 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData
AFL_ENSURE(state->ActorId == ev->Sender)("expected", state->ActorId)("got", ev->Sender);

state->LastKey = std::move(msg.LastKey);
state->LastCursorProto = std::move(msg.LastCursorProto);
const ui64 rowsCount = msg.GetRowsCount();
AFL_ENSURE(!LockTxId || !msg.LocksInfo.Locks.empty() || !msg.LocksInfo.BrokenLocks.empty());
AFL_ENSURE(LockTxId || (msg.LocksInfo.Locks.empty() && msg.LocksInfo.BrokenLocks.empty()));
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action","got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action", "got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())
("from", ev->Sender)("shards remain", PendingShards.size())
("in flight scans", InFlightShards.GetScansCount())
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc

bool SendScanFinished();

virtual std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const override;
virtual std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen,
const TSmallVec<TSerializedTableRange>& ranges, const std::optional<NKikimrKqp::TEvKqpScanCursor>& cursor) const override;
virtual const TVector<NScheme::TTypeInfo>& GetKeyColumnTypes() const override {
return KeyColumnTypes;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,7 @@ message TColumnShardConfig {
optional bool ColumnChunksV0Usage = 25 [default = true];
optional bool ColumnChunksV1Usage = 26 [default = true];
optional uint64 MemoryLimitScanPortion = 27 [default = 100000000];
optional string ReaderClassName = 28;
}

message TSchemeShardConfig {
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,19 @@ message TEvScanError {
optional uint64 TabletId = 4;
}

message TEvKqpScanCursor {
message TColumnShardScanPlain {
}
message TColumnShardScanSimple {
optional uint64 SourceId = 1;
optional uint32 StartRecordIndex = 2;
}
oneof Implementation {
TColumnShardScanPlain ColumnShardPlain = 10;
TColumnShardScanSimple ColumnShardSimple = 11;
}
}

message TEvRemoteScanData {
optional uint32 ScanId = 1;
optional uint64 CpuTimeUs = 2;
Expand All @@ -665,6 +678,7 @@ message TEvRemoteScanData {

optional bool RequestedBytesLimitReached = 11 [default = false];
optional uint32 AvailablePacks = 12;
optional TEvKqpScanCursor LastCursor = 13;
}

message TEvRemoteScanDataAck {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,8 @@ message TEvKqpScan {
optional TComputeShardingPolicy ComputeShardingPolicy = 23;
optional uint64 LockTxId = 24;
optional uint32 LockNodeId = 25;
optional string CSScanPolicy = 26;
optional NKikimrKqp.TEvKqpScanCursor ScanCursor = 27;
}

message TEvCompactTable {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
AFL_VERIFY(ev->Get()->GetWriteStatus() == NKikimrProto::OK);
std::vector<TInsertedPortions> writtenPacks = ev->Get()->DetachInsertedPacks();
std::vector<TFailedWrite> fails = ev->Get()->DetachFails();
const TMonotonic now = TMonotonic::Now();
for (auto&& i : writtenPacks) {
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
for (auto&& i : fails) {
Expand Down Expand Up @@ -554,12 +556,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)->OnWriteEvent();

auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
ctx.Send(source, result.release(), 0, cookie);
return;
}

auto overloadStatus = CheckOverloaded(pathId);
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class TTxInternalScan;
namespace NPlain {
class TIndexScannerConstructor;
}
namespace NSimple {
class TIndexScannerConstructor;
}
} // namespace NReader

namespace NDataSharing {
Expand Down Expand Up @@ -109,7 +112,7 @@ class TSharingSessionsInitializer;
class TInFlightReadsInitializer;
class TSpecialValuesInitializer;
class TTablesManagerInitializer;
}
} // namespace NLoading

extern bool gAllowLogBatchingDefaultValue;

Expand Down Expand Up @@ -198,6 +201,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NOlap::NReader::TTxScan;
friend class NOlap::NReader::TTxInternalScan;
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;
friend class NOlap::NReader::NSimple::TIndexScannerConstructor;

class TStoragesManager;
friend class TTxController;
Expand Down Expand Up @@ -246,7 +250,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& ctx);

void Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext& ctx);
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,19 @@ class TConcreteScanCounters: public TScanCounters {
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
std::shared_ptr<TAtomicCounter> ReadTasksCount;
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
std::shared_ptr<TAtomicCounter> ResultsForSourceCount;

public:
TScanAggregations Aggregations;

TCounterGuard GetFetcherAcessorsGuard() const {
return TCounterGuard(FetchAccessorsCount);
}

TCounterGuard GetResultsForSourceGuard() const {
return TCounterGuard(ResultsForSourceCount);
}

TCounterGuard GetMergeTasksGuard() const {
return TCounterGuard(MergeTasksCount);
}
Expand All @@ -320,7 +326,7 @@ class TConcreteScanCounters: public TScanCounters {

bool InWaiting() const {
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
FetchAccessorsCount->Val();
FetchAccessorsCount->Val() || ResultsForSourceCount->Val();
}

void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
Expand All @@ -335,6 +341,7 @@ class TConcreteScanCounters: public TScanCounters {
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
, ReadTasksCount(std::make_shared<TAtomicCounter>())
, ResourcesAllocationTasksCount(std::make_shared<TAtomicCounter>())
, ResultsForSourceCount(std::make_shared<TAtomicCounter>())
, Aggregations(TBase::BuildAggregations())
{

Expand Down
Loading

0 comments on commit 6ed5294

Please sign in to comment.