Skip to content

Commit

Permalink
Merge d62cdb4 into d591f19
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored Jan 6, 2025
2 parents d591f19 + d62cdb4 commit 2dcbf6d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 7 deletions.
97 changes: 93 additions & 4 deletions ydb/core/kqp/common/kqp_data_integrity_trails.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <library/cpp/string_utils/base64/base64.h>

#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/datashard/datashard.h>

namespace NKikimr {
namespace NDataIntegrity {
Expand Down Expand Up @@ -96,23 +98,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction
}

// DataExecuter
inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) {
inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) {
TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Request", ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);
LogKeyValue("Locks", txLocksDebugStr, ss); // TODO

if (shardId) {
LogKeyValue("ShardId", ToString(*shardId), ss);
}

LogKeyValue("Type", type, ss, /*last*/ true);
LogKeyValue("TxType", type, ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId));
}

inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) {
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
const auto& record = ev->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Response", ss);
LogKeyValue("State", state, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : record.GetTxLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);
LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss);

NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetIssues(), issues);
LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
}

inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) {
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
const auto& record = ev->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Response", ss);
LogKeyValue("State", state, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : record.GetTxLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);
LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss);
LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
}

template <typename TActorResultInfo>
inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) {
TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", type, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : info.GetLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info));
}

}
Expand Down
14 changes: 11 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
NDataIntegrity::LogIntegrityTrails("InputActorResponse", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
Expand All @@ -254,6 +255,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
NDataIntegrity::LogIntegrityTrails("OutputActorResponse", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
Expand Down Expand Up @@ -505,6 +507,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId);

NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
Expand Down Expand Up @@ -574,6 +577,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
Expand Down Expand Up @@ -1104,7 +1108,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
}

NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());

LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards");
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
Expand Down Expand Up @@ -1245,6 +1249,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
Expand Down Expand Up @@ -1314,6 +1319,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState);

NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
Expand Down Expand Up @@ -1804,7 +1810,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
flags));
}

NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

ResponseEv->Orbit.Fork(evData->Orbit);
ev = std::move(evData);
Expand Down Expand Up @@ -1840,7 +1847,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

auto traceId = ExecuterSpan.GetTraceId();

NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

auto shardsToString = [](const auto& shards) {
TStringBuilder builder;
Expand Down

0 comments on commit 2dcbf6d

Please sign in to comment.