Skip to content

Commit

Permalink
Merge 695d256 into a71cc84
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Oct 17, 2024
2 parents a71cc84 + 695d256 commit 3ef7442
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 14 deletions.
63 changes: 63 additions & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,5 +343,68 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}

bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery) {
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
};

for (size_t index = 0; index < physicalQuery.TransactionsSize(); ++index) {
const auto &tx = physicalQuery.GetTransactions()[index];
for (const auto &stage : tx.GetStages()) {
for (const auto &tableOp : stage.GetTableOps()) {
switch (tableOp.GetTypeCase()) {
case NKqpProto::TKqpPhyTableOperation::kReadRange:
case NKqpProto::TKqpPhyTableOperation::kLookup:
case NKqpProto::TKqpPhyTableOperation::kReadRanges: {
if (modifiedTables.contains(getTable(tableOp.GetTable()))) {
return true;
}
break;
}
case NKqpProto::TKqpPhyTableOperation::kReadOlapRange:
case NKqpProto::TKqpPhyTableOperation::kUpsertRows:
case NKqpProto::TKqpPhyTableOperation::kDeleteRows:
modifiedTables.insert(getTable(tableOp.GetTable()));
break;
default:
YQL_ENSURE(false, "unexpected type");
}
}

for (const auto& input : stage.GetInputs()) {
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
if (modifiedTables.contains(getTable(input.GetStreamLookup().GetTable()))) {
return true;
}
} else {
return true;
}
}

for (const auto& source : stage.GetSources()) {
if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
if (modifiedTables.contains(getTable(source.GetReadRangesSource().GetTable()))) {
return true;
}
} else {
return true;
}
}

for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
modifiedTables.insert(getTable(settings.GetTable()));
} else {
return true;
}
}
}
}
return false;
}

} // namespace NKqp
} // namespace NKikimr
9 changes: 9 additions & 0 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
ParamsState = MakeIntrusive<TParamsState>();
SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot;
HasImmediateEffects = false;

HasOlapTable = false;
HasOltpTable = false;
HasTableWrite = false;
HasUncommittedChangesRead = false;
}

TKqpTransactionInfo GetInfo() const;
Expand Down Expand Up @@ -333,6 +338,8 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;
bool HasUncommittedChangesRead = false;
THashSet<NKikimr::TTableId> ModifiedTables;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};
Expand Down Expand Up @@ -498,4 +505,6 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);

bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery);

} // namespace NKikimr::NKqp
13 changes: 0 additions & 13 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
Invalidated = false;
Readonly = false;
Closed = false;
HasUncommittedChangesRead = false;
}

void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) {
Expand Down Expand Up @@ -409,17 +408,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
}

auto& currentOps = TableOperations[table];
const bool currentModify = currentOps & KikimrModifyOps();
if (currentModify) {
if (KikimrReadOps() & newOp) {
HasUncommittedChangesRead = true;
}

if ((*info)->GetHasIndexTables()) {
HasUncommittedChangesRead = true;
}
}

currentOps |= newOp;
}

Expand All @@ -429,7 +417,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
virtual ~TKikimrTransactionContextBase() = default;

public:
bool HasUncommittedChangesRead = false;
THashMap<TString, TYdbOperations> TableOperations;
THashMap<TKikimrPathId, TString> TableByIdMap;
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,11 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
"Write transactions between column and row tables are disabled at current time.");
return false;
}
QueryState->TxCtx->HasUncommittedChangesRead = ::NKikimr::NKqp::HasUncommittedChangesRead(QueryState->TxCtx->ModifiedTables, phyQuery);
if (QueryState->TxCtx->HasUncommittedChangesRead) {
QueryState->TxCtx->ModifiedTables.clear();
}

QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
EKikimrQueryType::Dml);
Expand Down Expand Up @@ -1236,7 +1241,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) {
if (!txCtx.CanDeferEffects()) {
request.UseImmediateEffects = true;
}
}
Expand Down

0 comments on commit 3ef7442

Please sign in to comment.