Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Oct 21, 2024
1 parent fceab40 commit 8edc46b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 15 deletions.
19 changes: 16 additions & 3 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
}
}

if (txCtx.HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (txCtx.NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
return true;
}

Expand Down Expand Up @@ -372,12 +372,25 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
}

for (const auto& input : stage.GetInputs()) {
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
switch (input.GetTypeCase()) {
case NKqpProto::TKqpPhyConnection::kStreamLookup:
if (modifiedTables.contains(getTable(input.GetStreamLookup().GetTable()))) {
return true;
}
} else {
break;
case NKqpProto::TKqpPhyConnection::kSequencer:
return true;
case NKqpProto::TKqpPhyConnection::kUnionAll:
case NKqpProto::TKqpPhyConnection::kMap:
case NKqpProto::TKqpPhyConnection::kHashShuffle:
case NKqpProto::TKqpPhyConnection::kBroadcast:
case NKqpProto::TKqpPhyConnection::kMapShard:
case NKqpProto::TKqpPhyConnection::kShuffleShard:
case NKqpProto::TKqpPhyConnection::kResult:
case NKqpProto::TKqpPhyConnection::kValue:
case NKqpProto::TKqpPhyConnection::kMerge:
case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET:
break;
}
}

Expand Down
22 changes: 15 additions & 7 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class TShardIdToTableInfo {
};
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;

bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery);

class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
public:
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down Expand Up @@ -236,7 +238,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
HasOlapTable = false;
HasOltpTable = false;
HasTableWrite = false;
HasUncommittedChangesRead = false;
NeedUncommittedChangesFlush = false;
}

TKqpTransactionInfo GetInfo() const;
Expand Down Expand Up @@ -273,7 +275,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool ShouldExecuteDeferredEffects() const {
if (HasUncommittedChangesRead || HasOlapTable) {
if (NeedUncommittedChangesFlush || HasOlapTable) {
return !DeferredEffects.Empty();
}

Expand Down Expand Up @@ -302,13 +304,20 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool CanDeferEffects() const {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
if (NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
return false;
}

return true;
}

void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery) {
NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery);
if (NeedUncommittedChangesFlush) {
ModifiedTablesSinceLastFlush.clear();
}
}

public:
struct TParamsState : public TThrRefBase {
ui32 LastIndex = 0;
Expand Down Expand Up @@ -338,8 +347,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool HasOlapTable = false;
bool HasOltpTable = false;
bool HasTableWrite = false;
bool HasUncommittedChangesRead = false;
THashSet<NKikimr::TTableId> ModifiedTables;

bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};
Expand Down Expand Up @@ -505,6 +515,4 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);

bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery);

} // namespace NKikimr::NKqp
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class TKqpQueryState : public TNonCopyable {
return false;
}

if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (tx && tx->GetHasEffects()) {
YQL_ENSURE(tx->ResultsSize() == 0);
// commit can be applied to the last transaction with effects
Expand Down
5 changes: 1 addition & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,12 +865,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
"Write transactions between column and row tables are disabled at current time.");
return false;
}
QueryState->TxCtx->HasUncommittedChangesRead = ::NKikimr::NKqp::HasUncommittedChangesRead(QueryState->TxCtx->ModifiedTables, phyQuery);
if (QueryState->TxCtx->HasUncommittedChangesRead) {
QueryState->TxCtx->ModifiedTables.clear();
}

QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery);
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
EKikimrQueryType::Dml);
if (!success) {
Expand Down

0 comments on commit 8edc46b

Please sign in to comment.