From d62cdb47eb5206f4a6f51370b0323fd4edcc80fb Mon Sep 17 00:00:00 2001 From: ulya-sidorina Date: Fri, 3 Jan 2025 19:58:20 +0100 Subject: [PATCH] feat(data_integrity_trails): add lock logging --- .../kqp/common/kqp_data_integrity_trails.h | 97 ++++++++++++++++++- .../kqp/executer_actor/kqp_data_executer.cpp | 14 ++- 2 files changed, 104 insertions(+), 7 deletions(-) diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h index 37c73bc4eff7..53781814cd76 100644 --- a/ydb/core/kqp/common/kqp_data_integrity_trails.h +++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h @@ -5,6 +5,8 @@ #include #include +#include +#include namespace NKikimr { namespace NDataIntegrity { @@ -96,23 +98,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction } // DataExecuter -inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { - auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) { +inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) { TStringStream ss; LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Request", ss); LogKeyValue("TraceId", traceId, ss); LogKeyValue("PhyTxId", ToString(txId), ss); + LogKeyValue("Locks", txLocksDebugStr, ss); // TODO if (shardId) { LogKeyValue("ShardId", ToString(*shardId), ss); } - LogKeyValue("Type", type, ss, /*last*/ true); + LogKeyValue("TxType", type, ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss); + + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetIssues(), issues); + LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss); + LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +template +inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) { + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", type, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : info.GetLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); return ss.Str(); }; - LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId)); + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info)); } } diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 9172c245f8a6..ded24a6e83fc 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -232,6 +232,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrTxDataShard::TEvKqpInputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("InputActorResponse", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -250,6 +251,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("OutputActorResponse", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -501,6 +503,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -570,6 +573,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1100,7 +1104,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext()); LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards"); Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true)); @@ -1241,6 +1245,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1310,6 +1315,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -1793,7 +1799,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); ResponseEv->Orbit.Fork(evData->Orbit); ev = std::move(evData); @@ -1829,7 +1836,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); auto shardsToString = [](const auto& shards) { TStringBuilder builder;