diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index d0c40ba629eb..7dd9c26ef87c 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -207,7 +207,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig } } - if (txCtx.HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (txCtx.NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { return true; } @@ -344,5 +344,81 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { return false; } +bool HasUncommittedChangesRead(THashSet& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery) { + auto getTable = [](const NKqpProto::TKqpPhyTableId& table) { + return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId()); + }; + + for (size_t index = 0; index < physicalQuery.TransactionsSize(); ++index) { + const auto &tx = physicalQuery.GetTransactions()[index]; + for (const auto &stage : tx.GetStages()) { + for (const auto &tableOp : stage.GetTableOps()) { + switch (tableOp.GetTypeCase()) { + case NKqpProto::TKqpPhyTableOperation::kReadRange: + case NKqpProto::TKqpPhyTableOperation::kLookup: + case NKqpProto::TKqpPhyTableOperation::kReadRanges: { + if (modifiedTables.contains(getTable(tableOp.GetTable()))) { + return true; + } + break; + } + case NKqpProto::TKqpPhyTableOperation::kReadOlapRange: + case NKqpProto::TKqpPhyTableOperation::kUpsertRows: + case NKqpProto::TKqpPhyTableOperation::kDeleteRows: + modifiedTables.insert(getTable(tableOp.GetTable())); + break; + default: + YQL_ENSURE(false, "unexpected type"); + } + } + + for (const auto& input : stage.GetInputs()) { + switch (input.GetTypeCase()) { + case NKqpProto::TKqpPhyConnection::kStreamLookup: + if (modifiedTables.contains(getTable(input.GetStreamLookup().GetTable()))) { + return true; + } + break; + case NKqpProto::TKqpPhyConnection::kSequencer: + return true; + case NKqpProto::TKqpPhyConnection::kUnionAll: + case NKqpProto::TKqpPhyConnection::kMap: + case NKqpProto::TKqpPhyConnection::kHashShuffle: + case NKqpProto::TKqpPhyConnection::kBroadcast: + case NKqpProto::TKqpPhyConnection::kMapShard: + case NKqpProto::TKqpPhyConnection::kShuffleShard: + case NKqpProto::TKqpPhyConnection::kResult: + case NKqpProto::TKqpPhyConnection::kValue: + case NKqpProto::TKqpPhyConnection::kMerge: + case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET: + break; + } + } + + for (const auto& source : stage.GetSources()) { + if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) { + if (modifiedTables.contains(getTable(source.GetReadRangesSource().GetTable()))) { + return true; + } + } else { + return true; + } + } + + for (const auto& sink : stage.GetSinks()) { + if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) { + YQL_ENSURE(sink.GetInternalSink().GetSettings().Is()); + NKikimrKqp::TKqpTableSinkSettings settings; + YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings"); + modifiedTables.insert(getTable(settings.GetTable())); + } else { + return true; + } + } + } + } + return false; +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 5121ffb73af1..700e9075236a 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -165,6 +165,8 @@ class TShardIdToTableInfo { }; using TShardIdToTableInfoPtr = std::shared_ptr; +bool HasUncommittedChangesRead(THashSet& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery); + class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { public: explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry, @@ -232,6 +234,11 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { ParamsState = MakeIntrusive(); SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot; HasImmediateEffects = false; + + HasOlapTable = false; + HasOltpTable = false; + HasTableWrite = false; + NeedUncommittedChangesFlush = false; } TKqpTransactionInfo GetInfo() const; @@ -267,7 +274,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool ShouldExecuteDeferredEffects() const { - if (HasUncommittedChangesRead || HasOlapTable) { + if (NeedUncommittedChangesFlush || HasOlapTable) { return !DeferredEffects.Empty(); } @@ -296,13 +303,20 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { } bool CanDeferEffects() const { - if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) { + if (NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) { return false; } return true; } + void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery) { + NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery); + if (NeedUncommittedChangesFlush) { + ModifiedTablesSinceLastFlush.clear(); + } + } + public: struct TParamsState : public TThrRefBase { ui32 LastIndex = 0; @@ -333,6 +347,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { bool HasOltpTable = false; bool HasTableWrite = false; + bool NeedUncommittedChangesFlush = false; + THashSet ModifiedTablesSinceLastFlush; + TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared(); }; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 2c14581e15f0..5711a1ecff98 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -292,7 +292,6 @@ class TKikimrTransactionContextBase : public TThrRefBase { Invalidated = false; Readonly = false; Closed = false; - HasUncommittedChangesRead = false; } void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) { @@ -409,17 +408,6 @@ class TKikimrTransactionContextBase : public TThrRefBase { } auto& currentOps = TableOperations[table]; - const bool currentModify = currentOps & KikimrModifyOps(); - if (currentModify) { - if (KikimrReadOps() & newOp) { - HasUncommittedChangesRead = true; - } - - if ((*info)->GetHasIndexTables()) { - HasUncommittedChangesRead = true; - } - } - currentOps |= newOp; } @@ -429,7 +417,6 @@ class TKikimrTransactionContextBase : public TThrRefBase { virtual ~TKikimrTransactionContextBase() = default; public: - bool HasUncommittedChangesRead = false; THashMap TableOperations; THashMap TableByIdMap; TMaybe EffectiveIsolationLevel; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 4b7f95a40ca3..6c31e5660ac8 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -355,7 +355,7 @@ class TKqpQueryState : public TNonCopyable { return false; } - if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { + if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { if (tx && tx->GetHasEffects()) { YQL_ENSURE(tx->ResultsSize() == 0); // commit can be applied to the last transaction with effects diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b2d1c49bf9dd..cb935e2adcc3 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -862,7 +862,9 @@ class TKqpSessionActor : public TActorBootstrapped { "Write transactions between column and row tables are disabled at current time."); return false; } + QueryState->TxCtx->SetTempTables(QueryState->TempTablesState); + QueryState->TxCtx->ApplyPhysicalQuery(phyQuery); auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(), EKikimrQueryType::Dml); if (!success) { @@ -1232,7 +1234,7 @@ class TKqpSessionActor : public TActorBootstrapped { } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); - if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) { + if (!txCtx.CanDeferEffects()) { request.UseImmediateEffects = true; } }