diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h index 37c73bc4eff7..53781814cd76 100644 --- a/ydb/core/kqp/common/kqp_data_integrity_trails.h +++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h @@ -5,6 +5,8 @@ #include #include +#include +#include namespace NKikimr { namespace NDataIntegrity { @@ -96,23 +98,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction } // DataExecuter -inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { - auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) { +inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe shardId, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) { TStringStream ss; LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Request", ss); LogKeyValue("TraceId", traceId, ss); LogKeyValue("PhyTxId", ToString(txId), ss); + LogKeyValue("Locks", txLocksDebugStr, ss); // TODO if (shardId) { LogKeyValue("ShardId", ToString(*shardId), ss); } - LogKeyValue("Type", type, ss, /*last*/ true); + LogKeyValue("TxType", type, ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss); + + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetIssues(), issues); + LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss); + LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +template +inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) { + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", type, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : info.GetLocks()) { + locksDebugStr << lock.ShortDebugString(); + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); return ss.Str(); }; - LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId)); + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info)); } } diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 444376163c1b..99d8db0ed2a9 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -236,6 +236,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrTxDataShard::TEvKqpInputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("InputActorResponse", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -254,6 +255,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("OutputActorResponse", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -505,6 +507,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -574,6 +577,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1104,7 +1108,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext()); LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards"); Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true)); @@ -1245,6 +1249,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1314,6 +1319,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -1804,7 +1810,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); ResponseEv->Orbit.Fork(evData->Orbit); ev = std::move(evData); @@ -1840,7 +1847,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAsActorContext()); + NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); auto shardsToString = [](const auto& shards) { TStringBuilder builder;