Skip to content

Commit

Permalink
Merge df2c912 into 9740792
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jan 5, 2025
2 parents 9740792 + df2c912 commit 904b5d2
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 101 deletions.
145 changes: 89 additions & 56 deletions ydb/tests/tools/kqprun/kqprun.cpp

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
public:
TRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback)
: TargetNode_(request.TargetNode)
, QueryId_(request.QueryId)
, Request_(std::move(request.Event))
, Promise_(promise)
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
Expand Down Expand Up @@ -83,12 +84,14 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
if (ProgressCallback_) {
ProgressCallback_(ev->Get()->Record);
ProgressCallback_(QueryId_, ev->Get()->Record);
}
}

private:
ui32 TargetNode_ = 0;
const ui32 TargetNode_ = 0;
const size_t QueryId_ = 0;

std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<TQueryResponse> Promise_;
ui64 ResultRowsLimit_;
Expand Down Expand Up @@ -296,6 +299,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderAct
TSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise)
: TargetNode_(request.TargetNode)
, TraceId_(request.Event->Record.GetTraceId())
, VerboseLevel_(request.VerboseLevel)
, Request_(std::move(request.Event))
, OpenPromise_(openPromise)
, ClosePromise_(closePromise)
Expand All @@ -314,7 +318,9 @@ class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderAct
}

SessionId_ = response.GetResponse().GetSessionId();
Cout << CoutColors_.Cyan() << "Created new session on node " << TargetNode_ << " with id " << SessionId_ << "\n";
if (VerboseLevel_ >= 1) {
Cout << CoutColors_.Cyan() << "Created new session on node " << TargetNode_ << " with id " << SessionId_ << "\n";
}

PingSession();
}
Expand Down Expand Up @@ -390,6 +396,7 @@ class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderAct
private:
const ui32 TargetNode_;
const TString TraceId_;
const ui8 VerboseLevel_;
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);

std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> Request_;
Expand Down
4 changes: 3 additions & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ struct TQueryRequest {
ui32 TargetNode;
ui64 ResultRowsLimit;
ui64 ResultSizeLimit;
size_t QueryId;
};

struct TCreateSessionRequest {
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> Event;
ui32 TargetNode;
ui8 VerboseLevel;
};

struct TEvPrivate {
Expand Down Expand Up @@ -75,7 +77,7 @@ struct TEvPrivate {
};
};

using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgress&)>;
using TProgressCallback = std::function<void(ui64 queryId, const NKikimrKqp::TEvExecuterProgress& executerProgress)>;

NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback);

Expand Down
19 changes: 15 additions & 4 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct TYdbSetupSettings {

bool TraceOptEnabled = false;
TString LogOutputFile;
ui8 VerboseLevel = 1;

TString YqlToken;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
Expand All @@ -72,14 +73,15 @@ struct TRunnerOptions {

IOutputStream* ResultOutput = nullptr;
IOutputStream* SchemeQueryAstOutput = nullptr;
IOutputStream* ScriptQueryAstOutput = nullptr;
IOutputStream* ScriptQueryPlanOutput = nullptr;
TString ScriptQueryTimelineFile;
TString InProgressStatisticsOutputFile;
std::vector<IOutputStream*> ScriptQueryAstOutputs;
std::vector<IOutputStream*> ScriptQueryPlanOutputs;
std::vector<TString> ScriptQueryTimelineFiles;
std::vector<TString> InProgressStatisticsOutputFiles;

EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
NYdb::NConsoleClient::EDataFormat PlanOutputFormat = NYdb::NConsoleClient::EDataFormat::Default;
ETraceOptType TraceOptType = ETraceOptType::Disabled;
std::optional<size_t> TraceOptScriptId;

TDuration ScriptCancelAfter;

Expand All @@ -95,6 +97,15 @@ struct TRequestOptions {
TString UserSID;
TString Database;
TDuration Timeout;
size_t QueryId = 0;
};

template <typename TValue>
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
if (values.empty()) {
return defaultValue;
}
return values[std::min(index, values.size() - 1)];
}

} // namespace NKqpRun
77 changes: 47 additions & 30 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TKqpRunner::TImpl {
}

bool ExecuteScript(const TRequestOptions& script) {
StartScriptTraceOpt();
StartScriptTraceOpt(script.QueryId);

TRequestResult status = YdbSetup_.ScriptRequest(script, ExecutionOperation_);

Expand All @@ -132,11 +132,11 @@ class TKqpRunner::TImpl {
ExecutionMeta_ = TExecutionMeta();
ExecutionMeta_.Database = script.Database;

return WaitScriptExecutionOperation();
return WaitScriptExecutionOperation(script.QueryId);
}

bool ExecuteQuery(const TRequestOptions& query, EQueryType queryType) {
StartScriptTraceOpt();
StartScriptTraceOpt(query.QueryId);
StartTime_ = TInstant::Now();

TString queryTypeStr;
Expand Down Expand Up @@ -164,9 +164,9 @@ class TKqpRunner::TImpl {
meta.Plan = ExecutionMeta_.Plan;
}

PrintScriptAst(meta.Ast);
PrintScriptProgress(meta.Plan);
PrintScriptPlan(meta.Plan);
PrintScriptAst(query.QueryId, meta.Ast);
PrintScriptProgress(query.QueryId, meta.Plan);
PrintScriptPlan(query.QueryId, meta.Plan);
PrintScriptFinish(meta, queryTypeStr);

if (!status.IsSuccess()) {
Expand Down Expand Up @@ -224,7 +224,7 @@ class TKqpRunner::TImpl {
if (Options_.ResultOutput) {
Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Writing script query results..." << CoutColors_.Default() << Endl;
for (size_t i = 0; i < ResultSets_.size(); ++i) {
if (ResultSets_.size() > 1) {
if (ResultSets_.size() > 1 && Options_.YdbSettings.VerboseLevel >= 1) {
*Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl;
}
PrintScriptResult(ResultSets_[i]);
Expand All @@ -233,7 +233,7 @@ class TKqpRunner::TImpl {
}

private:
bool WaitScriptExecutionOperation() {
bool WaitScriptExecutionOperation(ui64 queryId) {
StartTime_ = TInstant::Now();

TDuration getOperationPeriod = TDuration::Seconds(1);
Expand All @@ -244,7 +244,7 @@ class TKqpRunner::TImpl {
TRequestResult status;
while (true) {
status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionMeta_.Database, ExecutionOperation_, ExecutionMeta_);
PrintScriptProgress(ExecutionMeta_.Plan);
PrintScriptProgress(queryId, ExecutionMeta_.Plan);

if (ExecutionMeta_.Ready) {
break;
Expand All @@ -267,9 +267,11 @@ class TKqpRunner::TImpl {
Sleep(getOperationPeriod);
}

PrintScriptAst(ExecutionMeta_.Ast);
PrintScriptProgress(ExecutionMeta_.Plan);
PrintScriptPlan(ExecutionMeta_.Plan);
TYdbSetup::StopTraceOpt();

PrintScriptAst(queryId, ExecutionMeta_.Ast);
PrintScriptProgress(queryId, ExecutionMeta_.Plan);
PrintScriptPlan(queryId, ExecutionMeta_.Plan);
PrintScriptFinish(ExecutionMeta_, "Script");

if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) {
Expand All @@ -290,23 +292,33 @@ class TKqpRunner::TImpl {
}
}

void StartScriptTraceOpt() const {
if (Options_.TraceOptType == TRunnerOptions::ETraceOptType::All || Options_.TraceOptType == TRunnerOptions::ETraceOptType::Script) {
void StartScriptTraceOpt(size_t queryId) const {
bool startTraceOpt = Options_.TraceOptType == TRunnerOptions::ETraceOptType::All;

if (Options_.TraceOptType == TRunnerOptions::ETraceOptType::Script) {
startTraceOpt |= !Options_.TraceOptScriptId || *Options_.TraceOptScriptId == queryId;
}

if (startTraceOpt) {
YdbSetup_.StartTraceOpt();
}
}

void PrintSchemeQueryAst(const TString& ast) const {
if (Options_.SchemeQueryAstOutput) {
Cout << CoutColors_.Cyan() << "Writing scheme query ast" << CoutColors_.Default() << Endl;
if (Options_.YdbSettings.VerboseLevel >= 1) {
Cout << CoutColors_.Cyan() << "Writing scheme query ast" << CoutColors_.Default() << Endl;
}
Options_.SchemeQueryAstOutput->Write(ast);
}
}

void PrintScriptAst(const TString& ast) const {
if (Options_.ScriptQueryAstOutput) {
Cout << CoutColors_.Cyan() << "Writing script query ast" << CoutColors_.Default() << Endl;
Options_.ScriptQueryAstOutput->Write(ast);
void PrintScriptAst(size_t queryId, const TString& ast) const {
if (const auto output = GetValue<IOutputStream*>(queryId, Options_.ScriptQueryAstOutputs, nullptr)) {
if (Options_.YdbSettings.VerboseLevel >= 1) {
Cout << CoutColors_.Cyan() << "Writing script query ast" << CoutColors_.Default() << Endl;
}
output->Write(ast);
}
}

Expand All @@ -325,16 +337,18 @@ class TKqpRunner::TImpl {
printer.Print(plan);
}

void PrintScriptPlan(const TString& plan) const {
if (Options_.ScriptQueryPlanOutput) {
Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;
PrintPlan(plan, Options_.ScriptQueryPlanOutput);
void PrintScriptPlan(size_t queryId, const TString& plan) const {
if (const auto output = GetValue<IOutputStream*>(queryId, Options_.ScriptQueryPlanOutputs, nullptr)) {
if (Options_.YdbSettings.VerboseLevel >= 1) {
Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;
}
PrintPlan(plan, output);
}
}

void PrintScriptProgress(const TString& plan) const {
if (Options_.InProgressStatisticsOutputFile) {
TFileOutput outputStream(Options_.InProgressStatisticsOutputFile);
void PrintScriptProgress(size_t queryId, const TString& plan) const {
if (const auto& output = GetValue<TString>(queryId, Options_.InProgressStatisticsOutputFiles, {})) {
TFileOutput outputStream(output);
outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl;

auto convertedPlan = plan;
Expand All @@ -361,8 +375,8 @@ class TKqpRunner::TImpl {

outputStream.Finish();
}
if (Options_.ScriptQueryTimelineFile) {
TFileOutput outputStream(Options_.ScriptQueryTimelineFile);
if (const auto& output = GetValue<TString>(queryId, Options_.ScriptQueryTimelineFiles, {})) {
TFileOutput outputStream(output);

TPlanVisualizer planVisualizer;
planVisualizer.LoadPlans(plan);
Expand All @@ -373,10 +387,10 @@ class TKqpRunner::TImpl {
}

TProgressCallback GetProgressCallback() {
return [this](const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable {
return [this](ui64 queryId, const NKikimrKqp::TEvExecuterProgress& executerProgress) mutable {
const TString& plan = executerProgress.GetQueryPlan();
ExecutionMeta_.Plan = plan;
PrintScriptProgress(plan);
PrintScriptProgress(queryId, plan);
};
}

Expand Down Expand Up @@ -411,6 +425,9 @@ class TKqpRunner::TImpl {
}

void PrintScriptFinish(const TQueryMeta& meta, const TString& queryType) const {
if (Options_.YdbSettings.VerboseLevel < 1) {
return;
}
Cout << CoutColors_.Cyan() << queryType << " request finished.";
if (meta.TotalDuration) {
Cout << " Total duration: " << meta.TotalDuration;
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/src/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ PEERDIR(
ydb/core/testlib
)

GENERATE_ENUM_SERIALIZATION(common.h)

YQL_LAST_ABI_VERSION()

END()
16 changes: 9 additions & 7 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred

class TSessionState {
public:
explicit TSessionState(NActors::TTestActorRuntime* runtime, ui32 targetNodeIndex, const TString& database, const TString& traceId)
explicit TSessionState(NActors::TTestActorRuntime* runtime, ui32 targetNodeIndex, const TString& database, const TString& traceId, ui8 verboseLevel)
: Runtime_(runtime)
, TargetNodeIndex_(targetNodeIndex)
{
Expand All @@ -78,7 +78,8 @@ class TSessionState {
auto closePromise = NThreading::NewPromise<void>();
SessionHolderActor_ = Runtime_->Register(CreateSessionHolderActor(TCreateSessionRequest{
.Event = std::move(event),
.TargetNode = Runtime_->GetNodeId(targetNodeIndex)
.TargetNode = Runtime_->GetNodeId(targetNodeIndex),
.VerboseLevel = verboseLevel
}, openPromise, closePromise));

SessionId_ = openPromise.GetFuture().GetValueSync();
Expand Down Expand Up @@ -210,7 +211,7 @@ class TYdbSetup::TImpl {
serverSettings.SetYtGateway(Settings_.YtGateway);
serverSettings.S3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
serverSettings.SetInitializeFederatedQuerySetupFactory(true);
serverSettings.SetVerbose(false);
serverSettings.SetVerbose(Settings_.VerboseLevel >= 2);

SetLoggerSettings(serverSettings);
SetFunctionRegistry(serverSettings);
Expand Down Expand Up @@ -355,13 +356,13 @@ class TYdbSetup::TImpl {
InitializeServer(grpcPort);
WaitResourcesPublishing();

if (Settings_.MonitoringEnabled) {
if (Settings_.MonitoringEnabled && Settings_.VerboseLevel >= 1) {
for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) {
Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl;
}
}

if (Settings_.GrpcEnabled) {
if (Settings_.GrpcEnabled && Settings_.VerboseLevel >= 1) {
Cout << CoutColors_.Cyan() << "Domain gRPC port: " << CoutColors_.Default() << grpcPort << Endl;
}
}
Expand Down Expand Up @@ -516,7 +517,7 @@ class TYdbSetup::TImpl {

if (Settings_.SameSession) {
if (!SessionState_) {
SessionState_ = TSessionState(GetRuntime(), targetNodeIndex, database, query.TraceId);
SessionState_ = TSessionState(GetRuntime(), targetNodeIndex, database, query.TraceId, Settings_.VerboseLevel);
}
request->SetSessionId(SessionState_->GetSessionId());
}
Expand All @@ -535,7 +536,8 @@ class TYdbSetup::TImpl {
.Event = std::move(event),
.TargetNode = GetRuntime()->GetNodeId(targetNodeIndex),
.ResultRowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(),
.ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit()
.ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(),
.QueryId = query.QueryId
};
}

Expand Down

0 comments on commit 904b5d2

Please sign in to comment.