diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h index 71198204a55b..9278577dde45 100644 --- a/ydb/core/kqp/common/kqp_data_integrity_trails.h +++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h @@ -6,6 +6,8 @@ #include #include +#include +#include namespace NKikimr { namespace NDataIntegrity { @@ -97,23 +99,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction } // DataExecuter -inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { - auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) { +inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) { TStringStream ss; LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Request", ss); LogKeyValue("TraceId", traceId, ss); LogKeyValue("PhyTxId", ToString(txId), ss); + LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss); if (shardId) { LogKeyValue("ShardId", ToString(*shardId), ss); } - LogKeyValue("Type", type, ss, /*last*/ true); + LogKeyValue("TxType", type, ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss); + + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetIssues(), issues); + LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss); + LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +template +inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) { + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", type, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : info.GetLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); return ss.Str(); }; - LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId)); + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info)); } // WriteActor,BufferActor diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index cbd154485695..ba64142c9912 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -236,6 +236,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrTxDataShard::TEvKqpInputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -254,6 +255,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -505,6 +507,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -574,6 +577,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1104,7 +1108,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext()); LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards"); Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true)); @@ -1245,6 +1249,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1314,6 +1319,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -1804,7 +1810,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); ResponseEv->Orbit.Fork(evData->Orbit); ev = std::move(evData); @@ -1840,7 +1847,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); auto shardsToString = [](const auto& shards) { TStringBuilder builder; diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp index 225d797e8668..52808c771aa6 100644 --- a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp +++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0); + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0); // check grpc logs @@ -194,7 +194,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); // check executer logs (should be empty, because executer only logs modification operations) - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0); + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2); // check grpc logs @@ -202,45 +202,6 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { // check datashard logs (should be empty, because DataShard only logs modification operations) UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0); } - - Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) { - TKikimrSettings serverSettings; - TStringStream ss; - serverSettings.LogStream = &ss; - TKikimrRunner kikimr(serverSettings); - kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE); - NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); - - - const auto query = R"( - --!syntax_v1 - - UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES - (3u, "Value3"), - (101u, "Value101"), - (201u, "Value201"); - )"; - - if (Streaming) { - auto result = client.StreamExecuteYqlScript(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - CollectStreamResult(result); - } else { - auto result = client.ExecuteYqlScript(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - - // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1); - // check session actor logs (should contain double logs because this query was executed via worker actor) - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4); - // check grpc logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2); - // check datashard logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2); - - Cout << ss.Str() << Endl; - } } } // namespace NKqp diff --git a/ydb/core/tx/datashard/datashard_integrity_trails.h b/ydb/core/tx/datashard/datashard_integrity_trails.h index de0a65569af4..9c742cf615f1 100644 --- a/ydb/core/tx/datashard/datashard_integrity_trails.h +++ b/ydb/core/tx/datashard/datashard_integrity_trails.h @@ -126,6 +126,32 @@ inline void LogIntegrityTrailsKeys(const NActors::TActorContext& ctx, const ui64 } } +inline void LogIntegrityTrailsLocks(const NActors::TActorContext& ctx, const ui64 tabletId, const ui64 txId, const TVector& locks) { + if (locks.empty()) { + return; + } + + auto logFn = [&]() { + TStringStream ss; + + LogKeyValue("Component", "DataShard", ss); + LogKeyValue("Type", "Locks", ss); + LogKeyValue("TabletId", ToString(tabletId), ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + ss << "BreakLocks: ["; + for (const auto& lock : locks) { + ss << lock << " "; + } + ss << "]"; + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, logFn()); + +} + template inline void LogIntegrityTrailsFinish(const NActors::TActorContext& ctx, const ui64 tabletId, const ui64 txId, const typename TxResult::EStatus status) { auto logFn = [&]() { diff --git a/ydb/core/tx/locks/locks.cpp b/ydb/core/tx/locks/locks.cpp index 8b2b6ef03b5d..6f055a4205bd 100644 --- a/ydb/core/tx/locks/locks.cpp +++ b/ydb/core/tx/locks/locks.cpp @@ -1185,16 +1185,20 @@ void TSysLocks::EraseLock(const TArrayRef& key) { } } -void TSysLocks::CommitLock(const TArrayRef& key) { +TVector TSysLocks::CommitLock(const TArrayRef& key) { Y_ABORT_UNLESS(Update); + TVector locks; if (auto* lock = Locker.FindLockPtr(GetLockId(key))) { for (auto& pr : lock->ConflictLocks) { if (!!(pr.second & ELockConflictFlags::BreakThemOnOurCommit)) { Update->AddBreakLock(pr.first); + locks.push_back(pr.first.LockId); } } Update->AddEraseLock(lock); } + + return locks; } void TSysLocks::SetLock(const TTableId& tableId, const TArrayRef& key) { @@ -1238,14 +1242,16 @@ void TSysLocks::BreakLock(ui64 lockId) { } } -void TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef& key) { +TVector TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef& key) { Y_ABORT_UNLESS(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks))); + TVector breakLockIds; if (auto* table = Locker.FindTablePtr(tableId)) { if (table->HasRangeLocks()) { // Note: avoid copying the key, find all locks here - table->Ranges.EachIntersection(key, [update = Update](const TRangeTreeBase::TRange&, TLockInfo* lock) { + table->Ranges.EachIntersection(key, [update = Update, &breakLockIds](const TRangeTreeBase::TRange&, TLockInfo* lock) { update->AddBreakLock(lock); + breakLockIds.push_back(lock->LockId); }); } if (table->HasShardLocks()) { @@ -1253,6 +1259,8 @@ void TSysLocks::BreakLocks(const TTableId& tableId, const TArrayRef Update->AddBreakShardLocks(table); } } + + return breakLockIds; } void TSysLocks::AddReadConflict(ui64 conflictId) { diff --git a/ydb/core/tx/locks/locks.h b/ydb/core/tx/locks/locks.h index 76c486123aa0..c3e6371dd38c 100644 --- a/ydb/core/tx/locks/locks.h +++ b/ydb/core/tx/locks/locks.h @@ -883,12 +883,12 @@ class TSysLocks { TLock GetLock(const TArrayRef& syslockKey) const; void EraseLock(ui64 lockId); void EraseLock(const TArrayRef& syslockKey); - void CommitLock(const TArrayRef& syslockKey); + TVector CommitLock(const TArrayRef& syslockKey); void SetLock(const TTableId& tableId, const TArrayRef& key); void SetLock(const TTableId& tableId, const TTableRange& range); void SetWriteLock(const TTableId& tableId, const TArrayRef& key); void BreakLock(ui64 lockId); - void BreakLocks(const TTableId& tableId, const TArrayRef& key); + TVector BreakLocks(const TTableId& tableId, const TArrayRef& key); void AddReadConflict(ui64 conflictId); void AddWriteConflict(ui64 conflictId); void AddWriteConflict(const TTableId& tableId, const TArrayRef& key);