Skip to content

Commit

Permalink
feat(data_integrity_trails): add locks to logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Jan 6, 2025
1 parent e464f3d commit 7d7c53e
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 51 deletions.
97 changes: 93 additions & 4 deletions ydb/core/kqp/common/kqp_data_integrity_trails.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <library/cpp/string_utils/base64/base64.h>

#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/datashard/datashard.h>

namespace NKikimr {
namespace NDataIntegrity {
Expand Down Expand Up @@ -96,23 +98,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction
}

// DataExecuter
inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) {
inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) {
TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Request", ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);
LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss);

if (shardId) {
LogKeyValue("ShardId", ToString(*shardId), ss);
}

LogKeyValue("Type", type, ss, /*last*/ true);
LogKeyValue("TxType", type, ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId));
}

inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) {
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
const auto& record = ev->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Response", ss);
LogKeyValue("State", state, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : record.GetTxLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);
LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss);

NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetIssues(), issues);
LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
}

inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) {
auto log = [](const auto& state, const auto& traceId, const auto& ev) {
const auto& record = ev->Get()->Record;

TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", "Response", ss);
LogKeyValue("State", state, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : record.GetTxLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);
LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss);
LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
}

template <typename TActorResultInfo>
inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) {
auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) {
TStringStream ss;
LogKeyValue("Component", "Executer", ss);
LogKeyValue("Type", type, ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);

TStringBuilder locksDebugStr;
locksDebugStr << "[";
for (const auto& lock : info.GetLocks()) {
locksDebugStr << lock.ShortDebugString();
}
locksDebugStr << "]";

LogKeyValue("Locks", locksDebugStr, ss);

return ss.Str();
};

LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info));
}

}
Expand Down
14 changes: 11 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
Expand All @@ -250,6 +251,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
Expand Down Expand Up @@ -501,6 +503,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId);

NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
Expand Down Expand Up @@ -570,6 +573,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
Expand Down Expand Up @@ -1100,7 +1104,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
}

NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());

LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards");
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
Expand Down Expand Up @@ -1241,6 +1245,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);

NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
Expand Down Expand Up @@ -1310,6 +1315,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState);

NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
Expand Down Expand Up @@ -1793,7 +1799,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
flags));
}

NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

ResponseEv->Orbit.Fork(evData->Orbit);
ev = std::move(evData);
Expand Down Expand Up @@ -1829,7 +1836,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

auto traceId = ExecuterSpan.GetTraceId();

NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());

auto shardsToString = [](const auto& shards) {
TStringBuilder builder;
Expand Down
47 changes: 3 additions & 44 deletions ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0);
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0);
// check grpc logs
Expand Down Expand Up @@ -72,14 +72,12 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 2);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check datashard logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 4);
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
}

Y_UNIT_TEST(Ddl) {
Expand Down Expand Up @@ -129,53 +127,14 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

// check executer logs (should be empty, because executer only logs modification operations)
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0);
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check datashard logs (should be empty, because DataShard only logs modification operations)
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
}

Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
TKikimrSettings serverSettings;
TStringStream ss;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());


const auto query = R"(
--!syntax_v1
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
(3u, "Value3"),
(101u, "Value101"),
(201u, "Value201");
)";

if (Streaming) {
auto result = client.StreamExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CollectStreamResult(result);
} else {
auto result = client.ExecuteYqlScript(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

// check executer logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
// check session actor logs (should contain double logs because this query was executed via worker actor)
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4);
// check grpc logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
// check datashard logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);

Cout << ss.Str() << Endl;
}
}

} // namespace NKqp
Expand Down

0 comments on commit 7d7c53e

Please sign in to comment.