Skip to content

Commit

Permalink
Merge 7f2a682 into 2876ba4
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jan 6, 2025
2 parents 2876ba4 + 7f2a682 commit fa98d8f
Show file tree
Hide file tree
Showing 14 changed files with 427 additions and 161 deletions.
31 changes: 31 additions & 0 deletions ydb/core/driver_lib/run/config_helpers.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#include "config_helpers.h"

#include <ydb/core/base/localdb.h>
#include <ydb/core/protos/bootstrap.pb.h>
#include <ydb/core/protos/resource_broker.pb.h>

#include <ydb/library/actors/util/affinity.h>


Expand Down Expand Up @@ -114,4 +118,31 @@ NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSyste

} // namespace NActorSystemConfigHelpers

namespace NKikimrConfigHelpers {

NMemory::TResourceBrokerConfig CreateMemoryControllerResourceBrokerConfig(const NKikimrConfig::TAppConfig& config) {
NMemory::TResourceBrokerConfig resourceBrokerSelfConfig; // for backward compatibility
auto mergeResourceBrokerConfigs = [&](const NKikimrResourceBroker::TResourceBrokerConfig& resourceBrokerConfig) {
if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) {
resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory();
}
for (const auto& queue : resourceBrokerConfig.GetQueues()) {
if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) {
if (queue.HasLimit() && queue.GetLimit().HasMemory()) {
resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory();
}
}
}
};
if (config.HasBootstrapConfig() && config.GetBootstrapConfig().HasResourceBroker()) {
mergeResourceBrokerConfigs(config.GetBootstrapConfig().GetResourceBroker());
}
if (config.HasResourceBrokerConfig()) {
mergeResourceBrokerConfigs(config.GetResourceBrokerConfig());
}
return resourceBrokerSelfConfig;
}

} // namespace NKikimrConfigHelpers

} // namespace NKikimr
7 changes: 7 additions & 0 deletions ydb/core/driver_lib/run/config_helpers.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <ydb/core/memory_controller/memory_controller.h>
#include <ydb/core/protos/config.pb.h>

#include <ydb/library/actors/core/config.h>
Expand All @@ -15,4 +16,10 @@ NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSyste

} // namespace NActorSystemConfigHelpers

namespace NKikimrConfigHelpers {

NMemory::TResourceBrokerConfig CreateMemoryControllerResourceBrokerConfig(const NKikimrConfig::TAppConfig& config);

} // namespace NKikimrConfigHelpers

} // namespace NKikimr
22 changes: 1 addition & 21 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2052,28 +2052,8 @@ void TMemoryControllerInitializer::InitializeServices(
NActors::TActorSystemSetup* setup,
const NKikimr::TAppData* appData)
{
NMemory::TResourceBrokerConfig resourceBrokerSelfConfig; // for backward compatibility
auto mergeResourceBrokerConfigs = [&](const NKikimrResourceBroker::TResourceBrokerConfig& resourceBrokerConfig) {
if (resourceBrokerConfig.HasResourceLimit() && resourceBrokerConfig.GetResourceLimit().HasMemory()) {
resourceBrokerSelfConfig.LimitBytes = resourceBrokerConfig.GetResourceLimit().GetMemory();
}
for (const auto& queue : resourceBrokerConfig.GetQueues()) {
if (queue.GetName() == NLocalDb::KqpResourceManagerQueue) {
if (queue.HasLimit() && queue.GetLimit().HasMemory()) {
resourceBrokerSelfConfig.QueryExecutionLimitBytes = queue.GetLimit().GetMemory();
}
}
}
};
if (Config.HasBootstrapConfig() && Config.GetBootstrapConfig().HasResourceBroker()) {
mergeResourceBrokerConfigs(Config.GetBootstrapConfig().GetResourceBroker());
}
if (Config.HasResourceBrokerConfig()) {
mergeResourceBrokerConfigs(Config.GetResourceBrokerConfig());
}

auto* actor = NMemory::CreateMemoryController(TDuration::Seconds(1), ProcessMemoryInfoProvider,
Config.GetMemoryControllerConfig(), resourceBrokerSelfConfig,
Config.GetMemoryControllerConfig(), NKikimrConfigHelpers::CreateMemoryControllerResourceBrokerConfig(Config),
appData->Counters);
setup->LocalServices.emplace_back(
NMemory::MakeMemoryControllerId(0),
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
#include <ydb/core/mind/tenant_slot_broker.h>
#include <ydb/core/mind/tenant_node_enumeration.h>
#include <ydb/core/mind/node_broker.h>
#include <ydb/core/mon_alloc/monitor.h>
#include <ydb/core/kesus/tablet/events.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
Expand Down Expand Up @@ -1074,6 +1075,28 @@ namespace Tests {
}
}

{
if (Settings->NeedStatsCollectors) {
TString filePathPrefix;
if (Settings->AppConfig->HasMonitoringConfig()) {
filePathPrefix = Settings->AppConfig->GetMonitoringConfig().GetMemAllocDumpPathPrefix();
}

const TIntrusivePtr<NMemory::IProcessMemoryInfoProvider> processMemoryInfoProvider(MakeIntrusive<NMemory::TProcessMemoryInfoProvider>());

IActor* monitorActor = CreateMemProfMonitor(TDuration::Seconds(1), processMemoryInfoProvider,
Runtime->GetAppData(nodeIdx).Counters, filePathPrefix);
const TActorId monitorActorId = Runtime->Register(monitorActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId);
Runtime->RegisterService(MakeMemProfMonitorID(Runtime->GetNodeId(nodeIdx)), monitorActorId, nodeIdx);

IActor* controllerActor = NMemory::CreateMemoryController(TDuration::Seconds(1), processMemoryInfoProvider,
Settings->AppConfig->GetMemoryControllerConfig(), NKikimrConfigHelpers::CreateMemoryControllerResourceBrokerConfig(*Settings->AppConfig),
Runtime->GetAppData(nodeIdx).Counters);
const TActorId controllerActorId = Runtime->Register(controllerActor, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId);
Runtime->RegisterService(NMemory::MakeMemoryControllerId(0), controllerActorId, nodeIdx);
}
}

{
IActor* kesusService = NKesus::CreateKesusProxyService();
TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId);
Expand Down
13 changes: 7 additions & 6 deletions ydb/tests/tools/kqprun/flame_graph.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env bash

# For svg graph download https://github.com/brendangregg/FlameGraph
# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg`
set -eux

pid=$(pgrep -u $USER kqprun)
kqprun_pid=$(pgrep -u $USER kqprun)

echo "Target process id: ${pid}"

sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30
sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $kqprun_pid -v -o profdata -- sleep ${1:-'30'}
sudo perf script -i profdata > profdata.txt

flame_graph_tool="../../../../contrib/tools/flame-graph/"

${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg
Loading

0 comments on commit fa98d8f

Please sign in to comment.