Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #11186 #11631

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2907,6 +2907,31 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

Y_UNIT_TEST(ScanFailedSnapshotTooOld) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableColumnShardConfig()->SetMaxReadStaleness_ms(5000);
auto settings = TKikimrSettings().SetAppConfig(appConfig).SetWithSampleTables(false);
TTestHelper testHelper(settings);

TTestHelper::TColumnTable cnt;
TVector<TTestHelper::TColumnSchema> schema = {
TTestHelper::TColumnSchema().SetName("key").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
TTestHelper::TColumnSchema().SetName("c").SetType(NScheme::NTypeIds::Int32).SetNullable(true)
};
cnt.SetName("/Root/cnt").SetPrimaryKey({ "key" }).SetSchema(schema);
testHelper.CreateTable(cnt);
Sleep(TDuration::Seconds(10));
auto client = testHelper.GetKikimr().GetQueryClient();
auto result =
client
.ExecuteQuery(
TStringBuilder() << "$v = SELECT CAST(COUNT(*) AS INT32) FROM `/Root/cnt`; INSERT INTO `/Root/cnt` (key, c) values(1, $v);",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}
}

}
26 changes: 14 additions & 12 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ namespace NKikimr::NColumnShard {

using namespace NTabletFlatExecutor;

void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie,
std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize,
const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
switch (overloadReason) {
case EOverloadStatus::Disk:
Expand Down Expand Up @@ -262,8 +262,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ")
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(TabletID(), SelfId(), BufferizationWriteActorId,
std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
Expand All @@ -285,7 +285,8 @@ class TCommitOperation {
}

TCommitOperation(const ui64 tabletId)
: TabletId(tabletId) {
: TabletId(tabletId)
{
}

TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) {
Expand Down Expand Up @@ -357,7 +358,8 @@ class TProposeWriteTransaction: public NTabletFlatExecutor::TTransactionBase<TCo
: TBase(self)
, WriteCommit(op)
, Source(source)
, Cookie(cookie) {
, Cookie(cookie)
{
}

virtual bool Execute(TTransactionContext& txc, const TActorContext&) override {
Expand Down Expand Up @@ -402,7 +404,8 @@ class TAbortWriteTransaction: public NTabletFlatExecutor::TTransactionBase<TColu
: TBase(self)
, TxId(txId)
, Source(source)
, Cookie(cookie) {
, Cookie(cookie)
{
}

virtual bool Execute(TTransactionContext& txc, const TActorContext&) override {
Expand Down Expand Up @@ -466,9 +469,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED);
} else {
if (lockInfo->GetGeneration() != commitOperation->GetGeneration()) {
sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) +
" != " + ::ToString(commitOperation->GetGeneration()),
NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) + " != " +
::ToString(commitOperation->GetGeneration()), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
} else if (lockInfo->GetInternalGenerationCounter() != commitOperation->GetInternalGenerationCounter()) {
sendError(
"tablet lock have another internal generation counter: " + ::ToString(lockInfo->GetInternalGenerationCounter()) +
Expand Down Expand Up @@ -567,7 +569,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
auto writeOperation = OperationsManager->RegisterOperation(lockId, cookie, granuleShardingVersionId, *mType);
Y_ABORT_UNLESS(writeOperation);
writeOperation->SetBehaviour(behaviour);
writeOperation->Start(*this, tableId, arrowData, source, schema, ctx);
writeOperation->Start(*this, tableId, arrowData, source, schema, ctx, NOlap::TSnapshot::Max());
}

} // namespace NKikimr::NColumnShard
} // namespace NKikimr::NColumnShard
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/data_reader/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanData::TPtr& ev) {
} else {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "restore_task_finished")("reason", status.GetErrorMessage());
}
PassAway();
}
}

Expand All @@ -35,10 +36,11 @@ void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
}

void TActor::HandleExecute(NKqp::TEvKqpCompute::TEvScanError::TPtr& ev) {
SwitchStage(EStage::WaitData, EStage::Finished);
SwitchStage(std::nullopt, EStage::Finished);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "problem_on_restore_data")(
"reason", NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
RestoreTask->OnError(NYql::IssuesFromMessageAsString(ev->Get()->Record.GetIssues()));
PassAway();
}

void TActor::Bootstrap(const TActorContext& /*ctx*/) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/data_reader/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ class TActor: public NActors::TActorBootstrapped<TActor> {

EStage Stage = EStage::Initialization;
static inline const ui64 FreeSpace = ((ui64)8) << 20;
void SwitchStage(const EStage from, const EStage to) {
AFL_VERIFY(Stage == from)("from", (ui32)from)("real", (ui32)Stage)("to", (ui32)to);
void SwitchStage(const std::optional<EStage> from, const EStage to) {
if (from) {
AFL_VERIFY(Stage == *from)("from", (ui32)*from)("real", (ui32)Stage)("to", (ui32)to);
}
Stage = to;
}

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/operations/batch_builder/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class TBuildBatchesTask: public NConveyor::ITask {
, BufferActorId(bufferActorId)
, ActualSchema(actualSchema)
, ActualSnapshot(actualSnapshot)
, WritingCounters(writingCounters) {
, WritingCounters(writingCounters)
{
}
};
} // namespace NKikimr::NOlap
} // namespace NKikimr::NOlap
24 changes: 12 additions & 12 deletions ydb/core/tx/columnshard/operations/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ TWriteOperation::TWriteOperation(const TOperationWriteId writeId, const ui64 loc
, LockId(lockId)
, Cookie(cookie)
, GranuleShardingVersionId(granuleShardingVersionId)
, ModificationType(mType) {
, ModificationType(mType)
{
}

void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source,
const std::shared_ptr<NOlap::ISnapshotSchema>& schema, const TActorContext& ctx) {
const std::shared_ptr<NOlap::ISnapshotSchema>& schema, const TActorContext& ctx, const NOlap::TSnapshot& applyToSnapshot) {
Y_ABORT_UNLESS(Status == EOperationStatus::Draft);

NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source, GranuleShardingVersionId);
Expand All @@ -34,13 +35,14 @@ void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWr
std::make_shared<NOlap::TBuildBatchesTask>(owner.TabletID(), ctx.SelfID, owner.BufferizationWriteActorId,
NEvWrite::TWriteData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(),
owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR)),
schema, owner.GetLastTxSnapshot(), owner.Counters.GetCSCounters().WritingCounters);
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
schema, applyToSnapshot, owner.Counters.GetCSCounters().WritingCounters);
NConveyor::TCompServiceOperator::SendTaskToExecute(task);

Status = EOperationStatus::Started;
}

void TWriteOperation::CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const {
void TWriteOperation::CommitOnExecute(
TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const {
Y_ABORT_UNLESS(Status == EOperationStatus::Prepared);

TBlobGroupSelector dsGroupSelector(owner.Info());
Expand Down Expand Up @@ -78,12 +80,10 @@ void TWriteOperation::OnWriteFinish(
TString metadata;
Y_ABORT_UNLESS(proto.SerializeToString(&metadata));

db.Table<Schema::Operations>()
.Key((ui64)WriteId)
.Update(NIceDb::TUpdate<Schema::Operations::Status>((ui32)Status), NIceDb::TUpdate<Schema::Operations::CreatedAt>(CreatedAt.Seconds()),
NIceDb::TUpdate<Schema::Operations::Metadata>(metadata), NIceDb::TUpdate<Schema::Operations::LockId>(LockId),
NIceDb::TUpdate<Schema::Operations::Cookie>(Cookie),
NIceDb::TUpdate<Schema::Operations::GranuleShardingVersionId>(GranuleShardingVersionId.value_or(0)));
db.Table<Schema::Operations>().Key((ui64)WriteId).Update(NIceDb::TUpdate<Schema::Operations::Status>((ui32)Status),
NIceDb::TUpdate<Schema::Operations::CreatedAt>(CreatedAt.Seconds()), NIceDb::TUpdate<Schema::Operations::Metadata>(metadata),
NIceDb::TUpdate<Schema::Operations::LockId>(LockId), NIceDb::TUpdate<Schema::Operations::Cookie>(Cookie),
NIceDb::TUpdate<Schema::Operations::GranuleShardingVersionId>(GranuleShardingVersionId.value_or(0)));
}

void TWriteOperation::ToProto(NKikimrTxColumnShard::TInternalOperationData& proto) const {
Expand Down Expand Up @@ -119,4 +119,4 @@ void TWriteOperation::AbortOnComplete(TColumnShard& /*owner*/) const {
Y_ABORT_UNLESS(Status == EOperationStatus::Prepared);
}

} // namespace NKikimr::NColumnShard
} // namespace NKikimr::NColumnShard
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/operations/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TWriteOperation {
const std::optional<ui32> granuleShardingVersionId, const NEvWrite::EModificationType mType);

void Start(TColumnShard& owner, const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data, const NActors::TActorId& source,
const std::shared_ptr<NOlap::ISnapshotSchema>& schema, const TActorContext& ctx);
const std::shared_ptr<NOlap::ISnapshotSchema>& schema, const TActorContext& ctx, const NOlap::TSnapshot& applyToSnapshot);
void OnWriteFinish(NTabletFlatExecutor::TTransactionContext& txc, const std::vector<TInsertWriteId>& insertWriteIds, const bool ephemeralFlag);
void CommitOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, const NOlap::TSnapshot& snapshot) const;
void CommitOnComplete(TColumnShard& owner, const NOlap::TSnapshot& snapshot) const;
Expand Down
Loading