Skip to content

Commit

Permalink
writing enabled flag for ev write (ydb-platform#12250)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent f2ebda7 commit 080e74a
Showing 1 changed file with 16 additions and 25 deletions.
41 changes: 16 additions & 25 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,13 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
ctx.Send(source, result.release(), 0, cookie);
};
if (behaviour == EOperationBehaviour::CommitWriteLock) {
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
const auto sendError = [&](const TString& message, const NKikimrDataEvents::TEvWriteResult::EStatus status) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, status, message);
ctx.Send(source, result.release(), 0, cookie);
};
auto conclusionParse = commitOperation->Parse(*ev->Get());
if (conclusionParse.IsFail()) {
sendError(conclusionParse.GetErrorMessage(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
Expand Down Expand Up @@ -538,48 +538,34 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const std::optional<NEvWrite::EModificationType> mType =
TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(operation.GetType());
if (!mType) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST,
"operation " + NKikimrDataEvents::TEvWrite::TOperation::EOperationType_Name(operation.GetType()) + " is not supported");
ctx.Send(source, result.release(), 0, cookie);
sendError("operation " + NKikimrDataEvents::TEvWrite::TOperation::EOperationType_Name(operation.GetType()) + " is not supported",
NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}

if (!operation.GetTableId().HasSchemaVersion()) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set");
ctx.Send(source, result.release(), 0, cookie);
sendError("schema version not set", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}

auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaOptional(operation.GetTableId().GetSchemaVersion());
if (!schema) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version");
ctx.Send(source, result.release(), 0, cookie);
sendError("unknown schema version", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}

const auto pathId = operation.GetTableId().GetTableId();

if (!TablesManager.IsReadyForWrite(pathId)) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable");
ctx.Send(source, result.release(), 0, cookie);
sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR);
return;
}

Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)->OnWriteEvent();

auto arrowData = std::make_shared<TArrowData>(schema);
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error");
ctx.Send(source, result.release(), 0, cookie);
sendError("parsing data error", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
return;
}

Expand All @@ -605,6 +591,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
lockId = record.GetLockTxId();
}

if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
return;
}

OperationsManager->RegisterLock(lockId, Generation());
auto writeOperation = OperationsManager->RegisterOperation(
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());
Expand Down

0 comments on commit 080e74a

Please sign in to comment.