Skip to content

Commit

Permalink
Merge 045abc1 into be08b3f
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 12, 2024
2 parents be08b3f + 045abc1 commit 8d4a443
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
ctx.Send(source, result.release(), 0, cookie);
};
if (behaviour == EOperationBehaviour::CommitWriteLock) {
TMemoryProfileGuard mpg1("NEvents::TDataEvents::TEvWrite::Commit");
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
auto conclusionParse = commitOperation->Parse(*ev->Get());
if (conclusionParse.IsFail()) {
Expand Down Expand Up @@ -520,6 +521,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
ctx.Send(source, result.release(), 0, cookie);
return;
}
TMemoryProfileGuard mpg2("NEvents::TDataEvents::TEvWrite::Continue");

const auto& operation = record.GetOperations()[0];
const std::optional<NEvWrite::EModificationType> mType =
Expand All @@ -535,6 +537,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

TMemoryProfileGuard mpg21("NEvents::TDataEvents::TEvWrite::Continue::21");
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaOptional(operation.GetTableId().GetSchemaVersion());
if (!schema) {
sendError("unknown schema version", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
Expand All @@ -550,12 +553,14 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)->OnWriteEvent();

TMemoryProfileGuard mpg20("NEvents::TDataEvents::TEvWrite::Continue::0");
auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
sendError("parsing data error", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}

TMemoryProfileGuard mpg21("NEvents::TDataEvents::TEvWrite::Continue::1");
auto overloadStatus = CheckOverloaded(pathId);
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
Expand All @@ -571,6 +576,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
granuleShardingVersionId = record.GetGranuleShardingVersionId();
}

TMemoryProfileGuard mpg22("NEvents::TDataEvents::TEvWrite::Continue::2");
ui64 lockId = 0;
if (behaviour == EOperationBehaviour::NoTxWrite) {
lockId = BuildEphemeralTxId();
Expand All @@ -583,14 +589,17 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

TMemoryProfileGuard mpg3("NEvents::TDataEvents::TEvWrite::Continue3");
OperationsManager->RegisterLock(lockId, Generation());
auto writeOperation = OperationsManager->RegisterOperation(
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());
Y_ABORT_UNLESS(writeOperation);
TMemoryProfileGuard mpg4("NEvents::TDataEvents::TEvWrite::Continue4");
writeOperation->SetBehaviour(behaviour);
NOlap::TWritingContext wContext(pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max());
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
TMemoryProfileGuard mpg5("NEvents::TDataEvents::TEvWrite::Continue5");
writeOperation->Start(*this, arrowData, source, wContext);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
}

void AddError(const ui64 pathId, const TString& errorMessage) {
AFL_VERIFY(FetchStage == 1);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", errorMessage)("event", "ErrorOnFetching")("path_id", pathId);
AFL_VERIFY(FetchStage <= 1);
auto itStatus = PathIdStatus.find(pathId);
AFL_VERIFY(itStatus != PathIdStatus.end());
itStatus->second.OnError(errorMessage);
Expand Down

0 comments on commit 8d4a443

Please sign in to comment.