Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CPU affinity used for CPU pinning configurable via json file #1136

Merged
merged 15 commits into from
Dec 18, 2024
5 changes: 5 additions & 0 deletions source/config/localhost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@
"server_cert": "",
"server_key": "",
"root_cert": ""
},
"cpu_affinity_configuration": {
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
"sideband_read_write": -1,
"stream_write": -1,
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
"server": -1
}
}
14 changes: 10 additions & 4 deletions source/server/core_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "logging.h"
#include "server_configuration_parser.h"
#include "server_security_configuration.h"
#include "cpu_affinity_configuration.h"

#if defined(__GNUC__)
#include <sys/mman.h>
Expand Down Expand Up @@ -37,6 +38,7 @@ struct ServerConfiguration {
int max_message_size;
int sideband_port;
nidevice_grpc::FeatureToggles feature_toggles;
CpuAffinityConfiguration cpu_affinity;
};

static ServerConfiguration GetConfiguration(const std::string& config_file_path)
Expand All @@ -51,6 +53,7 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)
config.server_address = server_config_parser.parse_address();
config.sideband_address = server_config_parser.parse_sideband_address();
config.sideband_port = server_config_parser.parse_sideband_port();
config.cpu_affinity = server_config_parser.parse_cpu_affinity();
config.server_cert = server_config_parser.parse_server_cert();
config.server_key = server_config_parser.parse_server_key();
config.root_cert = server_config_parser.parse_root_cert();
Expand Down Expand Up @@ -113,6 +116,7 @@ static void RunServer(const ServerConfiguration& config)
server = builder.BuildAndStart();
if (ni::data_monikers::is_sideband_streaming_enabled(config.feature_toggles)) {
auto sideband_socket_thread = new std::thread(RunSidebandSocketsAccept, config.sideband_address.c_str(), config.sideband_port);
ni::data_monikers::configure_cpu_affinity(config.cpu_affinity);
// auto sideband_rdma_send_thread = new std::thread(AcceptSidebandRdmaSendRequests);
// auto sideband_rdma_recv_thread = new std::thread(AcceptSidebandRdmaReceiveRequests);
}
Expand Down Expand Up @@ -272,10 +276,12 @@ int main(int argc, char** argv)
schedParam.sched_priority = 95;
sched_setscheduler(0, SCHED_FIFO, &schedParam);

cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(6, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
if (config.cpu_affinity.server >= 0) {
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(config.cpu_affinity.server, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}

mlockall(MCL_CURRENT | MCL_FUTURE);
}
Expand Down
10 changes: 10 additions & 0 deletions source/server/cpu_affinity_configuration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#ifndef CPU_AFFINITY_CONFIGURATION_H
#define CPU_AFFINITY_CONFIGURATION_H

struct CpuAffinityConfiguration {
int sideband_read_write;
int stream_write;
int server;
};

#endif // CPU_AFFINITY_CONFIGURATION_H
26 changes: 18 additions & 8 deletions source/server/data_moniker_service.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//---------------------------------------------------------------------
//---------------------------------------------------------------------
#include "data_moniker_service.h"
#include "cpu_affinity_configuration.h"

#include <sideband_data.h>
#include <sideband_grpc.h>
Expand Down Expand Up @@ -45,6 +46,13 @@ static void SysFsWrite(const std::string& fileName, const std::string& value)

namespace ni::data_monikers {

static CpuAffinityConfiguration c_CpuAffinityConfig;

void configure_cpu_affinity(const CpuAffinityConfiguration& cpu_affinity)
{
c_CpuAffinityConfig = cpu_affinity;
}

bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles)
{
return feature_toggles.is_feature_enabled("sideband_streaming", nidevice_grpc::FeatureToggles::CodeReadiness::kNextRelease);
Expand Down Expand Up @@ -108,14 +116,14 @@ void DataMonikerService::InitiateMonikerList(const MonikerList& monikers, Endpoi
void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::SidebandStrategy strategy, EndpointList* readers, EndpointList* writers)
{
#ifndef _WIN32
if (strategy == ::SidebandStrategy::RDMA_LOW_LATENCY ||
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) {
if ((strategy == ::SidebandStrategy::RDMA_LOW_LATENCY ||
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && c_CpuAffinityConfig.sideband_read_write >= 0) {
pid_t threadId = syscall(SYS_gettid);
::SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/tasks", std::to_string(threadId));

cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(8, &cpuSet);
CPU_SET(c_CpuAffinityConfig.sideband_read_write, &cpuSet);
sched_setaffinity(threadId, sizeof(cpu_set_t), &cpuSet);
}
#endif
Expand Down Expand Up @@ -169,7 +177,7 @@ Status DataMonikerService::BeginSidebandStream(ServerContext* context, const Beg
char identifier[32] = {};
InitOwnerSidebandData(strategy, bufferSize, identifier);
std::string identifierString(identifier);

response->set_strategy(request->strategy());
response->set_sideband_identifier(identifier);
response->set_connection_url(GetConnectionAddress(strategy));
Expand Down Expand Up @@ -240,10 +248,12 @@ Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList*
Status DataMonikerService::StreamWrite(ServerContext* context, ServerReaderWriter<StreamWriteResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(1, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
if(c_CpuAffinityConfig.stream_write >= 0) {
cpu_set_t cpuSet;
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
CPU_ZERO(&cpuSet);
CPU_SET(c_CpuAffinityConfig.stream_write, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}
#endif

EndpointList writers;
Expand Down
2 changes: 2 additions & 0 deletions source/server/data_moniker_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
#include <map>
#include <sideband_data.h>
#include "feature_toggles.h"
#include "cpu_affinity_configuration.h"

//---------------------------------------------------------------------
//---------------------------------------------------------------------
namespace ni::data_monikers
{
void configure_cpu_affinity(const CpuAffinityConfiguration& cpu_affinity);
bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles);
//---------------------------------------------------------------------
//---------------------------------------------------------------------
Expand Down
56 changes: 56 additions & 0 deletions source/server/server_configuration_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sstream>

#include "feature_toggles.h"
#include "cpu_affinity_configuration.h"

#if defined(_MSC_VER)
#include <windows.h>
Expand All @@ -19,6 +20,10 @@ static const char* kAddressJsonKey = "address";
static const char* kPortJsonKey = "port";
static const char* kSidebandAddressJsonKey = "sideband_address";
static const char* kSidebandPortJsonKey = "sideband_port";
static const char* kCpuAffinityConfigurationKey = "streaming_core_configuration";
static const char* kSidebandReadWriteKey = "sideband_read_write_core";
static const char* kStreamWriteKey = "stream_write_core";
static const char* kServerKey = "server_run_core";
static const char* kServerCertJsonKey = "server_cert";
static const char* kServerKeyJsonKey = "server_key";
static const char* kRootCertJsonKey = "root_cert";
Expand Down Expand Up @@ -287,6 +292,47 @@ int ServerConfigurationParser::parse_port_with_key(const std::string& key) const
return parsed_port;
}

CpuAffinityConfiguration ServerConfigurationParser::parse_cpu_affinity() const
{
CpuAffinityConfiguration cpu_affinity;

auto core_config_it = config_file_.find(kCpuAffinityConfigurationKey);
if (core_config_it != config_file_.end()) {
cpu_affinity.sideband_read_write = parse_cpu_affinity_with_key(kSidebandReadWriteKey);
cpu_affinity.stream_write = parse_cpu_affinity_with_key(kStreamWriteKey);
cpu_affinity.server = parse_cpu_affinity_with_key(kServerKey);
}
else{
// -1 is set as the default value for the cores which indicates that any available core can be used.
cpu_affinity.sideband_read_write = -1;
cpu_affinity.stream_write = -1;
cpu_affinity.server = -1;
}
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved

return cpu_affinity;
}

int ServerConfigurationParser::parse_cpu_affinity_with_key(const std::string& key) const
{
int parsed_core = -1;
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved

auto it = config_file_.find(key);
if (it != config_file_.end()) {
try {
parsed_core = it->get<int>();
}
catch (const nlohmann::json::type_error& ex) {
throw WrongCpuAffinityTypeException(ex.what());
}
}

if (parsed_core < -1) {
throw InvalidCpuAffinityException();
}

return parsed_core;
}

ServerConfigurationParser::ConfigFileNotFoundException::ConfigFileNotFoundException(const std::string& config_file_path)
: std::runtime_error(kConfigFileNotFoundMessage + config_file_path)
{
Expand All @@ -307,6 +353,11 @@ ServerConfigurationParser::InvalidPortException::InvalidPortException()
{
}

ServerConfigurationParser::InvalidCpuAffinityException::InvalidCpuAffinityException()
: std::runtime_error(kInvalidCpuAffinityMessage)
{
}

ServerConfigurationParser::MalformedJsonException::MalformedJsonException(const std::string& parse_error_details)
: std::runtime_error(kMalformedJsonMessage + parse_error_details)
{
Expand All @@ -317,6 +368,11 @@ ServerConfigurationParser::WrongPortTypeException::WrongPortTypeException(const
{
}

ServerConfigurationParser::WrongCpuAffinityTypeException::WrongCpuAffinityTypeException(const std::string& type_error_details)
: std::runtime_error(kWrongCpuAffinityTypeMessage + type_error_details)
{
}

ServerConfigurationParser::UnspecifiedPortException::UnspecifiedPortException()
: std::runtime_error(kUnspecifiedPortMessage)
{
Expand Down
13 changes: 13 additions & 0 deletions source/server/server_configuration_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
#include <nlohmann/json.hpp>

#include "feature_toggles.h"
#include "cpu_affinity_configuration.h"

namespace nidevice_grpc {

static const char* kConfigFileNotFoundMessage = "The server configuration file was not found at: ";
static const char* kInvalidAddressMessage = "The specified address is not valid.\n Use a valid IPv4 or IPv6 address. Valid values include localhost, 192.168.1.1, [::], [::1], etc.";
static const char* kWrongAddressTypeMessage = "The server address must be specified in the server's configuration file as a string: \n\n";
static const char* kInvalidPortMessage = "The specified port number must between 0 and 65535.";
static const char* kInvalidCpuAffinityMessage = "The specified affinity must be -1 or greater. -1 indicates that any available core can be used.";
static const char* kMalformedJsonMessage = "The JSON in the server configuration file is malformed: \n\n";
static const char* kWrongPortTypeMessage = "The server port must be specified in the server's configuration file as an integer: \n\n";
static const char* kWrongCpuAffinityTypeMessage = "The cpu affinity must be specified in the server's configuration file as an integer: \n\n";
static const char* kUnspecifiedPortMessage = "The server port must be specified in the server's configuration file.";
static const char* kValueTypeNotStringMessage = "The following key must be specified in the server's configuration file as a string enclosed with double quotes: ";
static const char* kFileNotFoundMessage = "The following certificate file was not found: ";
Expand Down Expand Up @@ -42,6 +45,7 @@ class ServerConfigurationParser {
std::string parse_root_cert() const;
int parse_max_message_size() const;
int parse_sideband_port() const;
CpuAffinityConfiguration parse_cpu_affinity() const;
FeatureToggles parse_feature_toggles() const;
FeatureToggles::CodeReadiness parse_code_readiness() const;

Expand All @@ -61,6 +65,10 @@ class ServerConfigurationParser {
InvalidPortException();
};

struct InvalidCpuAffinityException : public std::runtime_error {
InvalidCpuAffinityException();
};

struct MalformedJsonException : public std::runtime_error {
MalformedJsonException(const std::string& parse_error_details);
};
Expand All @@ -69,6 +77,10 @@ class ServerConfigurationParser {
WrongPortTypeException(const std::string& type_error_details);
};

struct WrongCpuAffinityTypeException : public std::runtime_error {
WrongCpuAffinityTypeException(const std::string& type_error_details);
};

struct UnspecifiedPortException : public std::runtime_error {
UnspecifiedPortException();
};
Expand Down Expand Up @@ -105,6 +117,7 @@ class ServerConfigurationParser {
std::string parse_bind_address() const;
int parse_port() const;
int parse_port_with_key(const std::string& key) const;
int parse_cpu_affinity_with_key(const std::string& key) const;

std::string config_file_path_;
nlohmann::json config_file_;
Expand Down
Loading