Skip to content

Commit

Permalink
Merge 8edc46b into 7f4d37b
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Oct 21, 2024
2 parents 7f4d37b + 8edc46b commit dae0e94
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 18 deletions.
78 changes: 77 additions & 1 deletion ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
}
}

if (txCtx.HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (txCtx.NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
return true;
}

Expand Down Expand Up @@ -343,5 +343,81 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}

bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery) {
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
};

for (size_t index = 0; index < physicalQuery.TransactionsSize(); ++index) {
const auto &tx = physicalQuery.GetTransactions()[index];
for (const auto &stage : tx.GetStages()) {
for (const auto &tableOp : stage.GetTableOps()) {
switch (tableOp.GetTypeCase()) {
case NKqpProto::TKqpPhyTableOperation::kReadRange:
case NKqpProto::TKqpPhyTableOperation::kLookup:
case NKqpProto::TKqpPhyTableOperation::kReadRanges: {
if (modifiedTables.contains(getTable(tableOp.GetTable()))) {
return true;
}
break;
}
case NKqpProto::TKqpPhyTableOperation::kReadOlapRange:
case NKqpProto::TKqpPhyTableOperation::kUpsertRows:
case NKqpProto::TKqpPhyTableOperation::kDeleteRows:
modifiedTables.insert(getTable(tableOp.GetTable()));
break;
default:
YQL_ENSURE(false, "unexpected type");
}
}

for (const auto& input : stage.GetInputs()) {
switch (input.GetTypeCase()) {
case NKqpProto::TKqpPhyConnection::kStreamLookup:
if (modifiedTables.contains(getTable(input.GetStreamLookup().GetTable()))) {
return true;
}
break;
case NKqpProto::TKqpPhyConnection::kSequencer:
return true;
case NKqpProto::TKqpPhyConnection::kUnionAll:
case NKqpProto::TKqpPhyConnection::kMap:
case NKqpProto::TKqpPhyConnection::kHashShuffle:
case NKqpProto::TKqpPhyConnection::kBroadcast:
case NKqpProto::TKqpPhyConnection::kMapShard:
case NKqpProto::TKqpPhyConnection::kShuffleShard:
case NKqpProto::TKqpPhyConnection::kResult:
case NKqpProto::TKqpPhyConnection::kValue:
case NKqpProto::TKqpPhyConnection::kMerge:
case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET:
break;
}
}

for (const auto& source : stage.GetSources()) {
if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource) {
if (modifiedTables.contains(getTable(source.GetReadRangesSource().GetTable()))) {
return true;
}
} else {
return true;
}
}

for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
modifiedTables.insert(getTable(settings.GetTable()));
} else {
return true;
}
}
}
}
return false;
}

} // namespace NKqp
} // namespace NKikimr
21 changes: 19 additions & 2 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class TShardIdToTableInfo {
};
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;

bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery);

class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
public:
explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down Expand Up @@ -232,6 +234,11 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
ParamsState = MakeIntrusive<TParamsState>();
SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot;
HasImmediateEffects = false;

HasOlapTable = false;
HasOltpTable = false;
HasTableWrite = false;
NeedUncommittedChangesFlush = false;
}

TKqpTransactionInfo GetInfo() const;
Expand Down Expand Up @@ -268,7 +275,7 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool ShouldExecuteDeferredEffects() const {
if (HasUncommittedChangesRead || HasOlapTable) {
if (NeedUncommittedChangesFlush || HasOlapTable) {
return !DeferredEffects.Empty();
}

Expand Down Expand Up @@ -297,13 +304,20 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
}

bool CanDeferEffects() const {
if (HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
if (NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution() || HasOlapTable) {
return false;
}

return true;
}

void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery) {
NeedUncommittedChangesFlush = HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery);
if (NeedUncommittedChangesFlush) {
ModifiedTablesSinceLastFlush.clear();
}
}

public:
struct TParamsState : public TThrRefBase {
ui32 LastIndex = 0;
Expand Down Expand Up @@ -334,6 +348,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
bool HasOltpTable = false;
bool HasTableWrite = false;

bool NeedUncommittedChangesFlush = false;
THashSet<NKikimr::TTableId> ModifiedTablesSinceLastFlush;

TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared<TShardIdToTableInfo>();
};

Expand Down
13 changes: 0 additions & 13 deletions ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
Invalidated = false;
Readonly = false;
Closed = false;
HasUncommittedChangesRead = false;
}

void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) {
Expand Down Expand Up @@ -409,17 +408,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
}

auto& currentOps = TableOperations[table];
const bool currentModify = currentOps & KikimrModifyOps();
if (currentModify) {
if (KikimrReadOps() & newOp) {
HasUncommittedChangesRead = true;
}

if ((*info)->GetHasIndexTables()) {
HasUncommittedChangesRead = true;
}
}

currentOps |= newOp;
}

Expand All @@ -429,7 +417,6 @@ class TKikimrTransactionContextBase : public TThrRefBase {
virtual ~TKikimrTransactionContextBase() = default;

public:
bool HasUncommittedChangesRead = false;
THashMap<TString, TYdbOperations> TableOperations;
THashMap<TKikimrPathId, TString> TableByIdMap;
TMaybe<NKikimrKqp::EIsolationLevel> EffectiveIsolationLevel;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class TKqpQueryState : public TNonCopyable {
return false;
}

if (TxCtx->HasUncommittedChangesRead || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (tx && tx->GetHasEffects()) {
YQL_ENSURE(tx->ResultsSize() == 0);
// commit can be applied to the last transaction with effects
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
"Write transactions between column and row tables are disabled at current time.");
return false;
}

QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery);
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
EKikimrQueryType::Dml);
if (!success) {
Expand Down Expand Up @@ -1236,7 +1238,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
} else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();

if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution() || txCtx.HasOlapTable) {
if (!txCtx.CanDeferEffects()) {
request.UseImmediateEffects = true;
}
}
Expand Down

0 comments on commit dae0e94

Please sign in to comment.