Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CPU affinity used for CPU pinning configurable via json file #1136

Merged
merged 15 commits into from
Dec 18, 2024
5 changes: 5 additions & 0 deletions source/config/localhost_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,10 @@
"server_cert": "",
"server_key": "",
"root_cert": ""
},
"core_configuration": {
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
"sideband_read_write_core": 8,
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
"stream_write_core": 1,
"server_run_core": 6
}
}
14 changes: 14 additions & 0 deletions source/server/core_configuration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef CORE_CONFIGURATION_H
#define CORE_CONFIGURATION_H

extern int s_SidebandReadWriteCore;
extern int s_SidebandReadWriteCore;
extern int s_SidebandReadWriteCore;

struct CoreConfiguration {
int sideband_read_write_core;
int stream_write_core;
int server_run_core;
};

#endif // CORE_CONFIGURATION_H
24 changes: 20 additions & 4 deletions source/server/core_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "logging.h"
#include "server_configuration_parser.h"
#include "server_security_configuration.h"
#include "core_configuration.h"

#if defined(__GNUC__)
#include <sys/mman.h>
Expand Down Expand Up @@ -37,6 +38,7 @@ struct ServerConfiguration {
int max_message_size;
int sideband_port;
nidevice_grpc::FeatureToggles feature_toggles;
CoreConfiguration core_config;
};

static ServerConfiguration GetConfiguration(const std::string& config_file_path)
Expand All @@ -51,6 +53,7 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)
config.server_address = server_config_parser.parse_address();
config.sideband_address = server_config_parser.parse_sideband_address();
config.sideband_port = server_config_parser.parse_sideband_port();
config.core_config = server_config_parser.parse_core_configuration();
config.server_cert = server_config_parser.parse_server_cert();
config.server_key = server_config_parser.parse_server_key();
config.root_cert = server_config_parser.parse_root_cert();
Expand All @@ -69,6 +72,16 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)
static std::mutex server_mutex;
static std::unique_ptr<grpc::Server> server;
static bool shutdown = false;
int s_SidebandReadWriteCore;
int s_StreamWriteCore;
int s_ServerRunCore;

void SetCoreConfiguration(CoreConfiguration core_config)
{
s_SidebandReadWriteCore = core_config.sideband_read_write_core;
s_StreamWriteCore = core_config.stream_write_core;
s_ServerRunCore = core_config.server_run_core;
}

static void StopServer()
{
Expand Down Expand Up @@ -113,6 +126,7 @@ static void RunServer(const ServerConfiguration& config)
server = builder.BuildAndStart();
if (ni::data_monikers::is_sideband_streaming_enabled(config.feature_toggles)) {
auto sideband_socket_thread = new std::thread(RunSidebandSocketsAccept, config.sideband_address.c_str(), config.sideband_port);
SetCoreConfiguration(config.core_config);
// auto sideband_rdma_send_thread = new std::thread(AcceptSidebandRdmaSendRequests);
// auto sideband_rdma_recv_thread = new std::thread(AcceptSidebandRdmaReceiveRequests);
}
Expand Down Expand Up @@ -272,10 +286,12 @@ int main(int argc, char** argv)
schedParam.sched_priority = 95;
sched_setscheduler(0, SCHED_FIFO, &schedParam);

cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(6, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
if (s_ServerRunCore >= 0) {
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(s_ServerRunCore, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}

mlockall(MCL_CURRENT | MCL_FUTURE);
}
Expand Down
16 changes: 9 additions & 7 deletions source/server/data_moniker_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ void DataMonikerService::InitiateMonikerList(const MonikerList& monikers, Endpoi
void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::SidebandStrategy strategy, EndpointList* readers, EndpointList* writers)
{
#ifndef _WIN32
if (strategy == ::SidebandStrategy::RDMA_LOW_LATENCY ||
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) {
if ((strategy == ::SidebandStrategy::RDMA_LOW_LATENCY ||
strategy == ::SidebandStrategy::SOCKETS_LOW_LATENCY) && s_SidebandReadWriteCore >= 0) {
pid_t threadId = syscall(SYS_gettid);
::SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/tasks", std::to_string(threadId));

cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(8, &cpuSet);
CPU_SET(s_SidebandReadWriteCore, &cpuSet);
sched_setaffinity(threadId, sizeof(cpu_set_t), &cpuSet);
}
#endif
Expand Down Expand Up @@ -240,10 +240,12 @@ Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList*
Status DataMonikerService::StreamWrite(ServerContext* context, ServerReaderWriter<StreamWriteResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(1, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
if(s_StreamWriteCore >= 0) {
cpu_set_t cpuSet;
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
CPU_ZERO(&cpuSet);
CPU_SET(s_StreamWriteCore, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}
#endif

EndpointList writers;
Expand Down
55 changes: 55 additions & 0 deletions source/server/server_configuration_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sstream>

#include "feature_toggles.h"
#include "core_configuration.h"

#if defined(_MSC_VER)
#include <windows.h>
Expand All @@ -19,6 +20,10 @@ static const char* kAddressJsonKey = "address";
static const char* kPortJsonKey = "port";
static const char* kSidebandAddressJsonKey = "sideband_address";
static const char* kSidebandPortJsonKey = "sideband_port";
static const char* kCoreConfigurationKey = "core_configuration";
static const char* kSidebandReadWriteCoreKey = "sideband_read_write_core";
static const char* kStreamWriteCoreKey = "stream_write_core";
static const char* kServerRunCoreKey = "server_run_core";
static const char* kServerCertJsonKey = "server_cert";
static const char* kServerKeyJsonKey = "server_key";
static const char* kRootCertJsonKey = "root_cert";
Expand Down Expand Up @@ -287,6 +292,46 @@ int ServerConfigurationParser::parse_port_with_key(const std::string& key) const
return parsed_port;
}

CoreConfiguration ServerConfigurationParser::parse_core_configuration() const
{
CoreConfiguration core_config;

auto core_config_it = config_file_.find(kCoreConfigurationKey);
if (core_config_it != config_file_.end()) {
core_config.sideband_read_write_core = parse_core_with_key(kSidebandReadWriteCoreKey);
core_config.stream_write_core = parse_core_with_key(kStreamWriteCoreKey);
core_config.server_run_core = parse_core_with_key(kServerRunCoreKey);
}
else{
core_config.sideband_read_write_core = -1;
core_config.stream_write_core = -1;
core_config.server_run_core = -1;
}

return core_config;
}

int ServerConfigurationParser::parse_core_with_key(const std::string& key) const
{
int parsed_core = -1;
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved

auto it = config_file_.find(key);
if (it != config_file_.end()) {
try {
parsed_core = it->get<int>();
}
catch (const nlohmann::json::type_error& ex) {
throw WrongCoreTypeException(ex.what());
}
}

if (parsed_core < -1) {
throw InvalidCoreException();
}

return parsed_core;
}

ServerConfigurationParser::ConfigFileNotFoundException::ConfigFileNotFoundException(const std::string& config_file_path)
: std::runtime_error(kConfigFileNotFoundMessage + config_file_path)
{
Expand All @@ -307,6 +352,11 @@ ServerConfigurationParser::InvalidPortException::InvalidPortException()
{
}

ServerConfigurationParser::InvalidCoreException::InvalidCoreException()
: std::runtime_error(kInvalidCoreMessage)
{
}

ServerConfigurationParser::MalformedJsonException::MalformedJsonException(const std::string& parse_error_details)
: std::runtime_error(kMalformedJsonMessage + parse_error_details)
{
Expand All @@ -317,6 +367,11 @@ ServerConfigurationParser::WrongPortTypeException::WrongPortTypeException(const
{
}

ServerConfigurationParser::WrongCoreTypeException::WrongCoreTypeException(const std::string& type_error_details)
: std::runtime_error(kWrongCoreTypeMessage + type_error_details)
{
}

ServerConfigurationParser::UnspecifiedPortException::UnspecifiedPortException()
: std::runtime_error(kUnspecifiedPortMessage)
{
Expand Down
13 changes: 13 additions & 0 deletions source/server/server_configuration_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
#include <nlohmann/json.hpp>

#include "feature_toggles.h"
#include "core_configuration.h"

namespace nidevice_grpc {

static const char* kConfigFileNotFoundMessage = "The server configuration file was not found at: ";
static const char* kInvalidAddressMessage = "The specified address is not valid.\n Use a valid IPv4 or IPv6 address. Valid values include localhost, 192.168.1.1, [::], [::1], etc.";
static const char* kWrongAddressTypeMessage = "The server address must be specified in the server's configuration file as a string: \n\n";
static const char* kInvalidPortMessage = "The specified port number must between 0 and 65535.";
static const char* kInvalidCoreMessage = "The specified core number must be -1 or greater. -1 indicates that any available core can be used.";
static const char* kMalformedJsonMessage = "The JSON in the server configuration file is malformed: \n\n";
static const char* kWrongPortTypeMessage = "The server port must be specified in the server's configuration file as an integer: \n\n";
static const char* kWrongCoreTypeMessage = "The cpu core must be specified in the server's configuration file as an integer: \n\n";
static const char* kUnspecifiedPortMessage = "The server port must be specified in the server's configuration file.";
static const char* kValueTypeNotStringMessage = "The following key must be specified in the server's configuration file as a string enclosed with double quotes: ";
static const char* kFileNotFoundMessage = "The following certificate file was not found: ";
Expand Down Expand Up @@ -42,6 +45,7 @@ class ServerConfigurationParser {
std::string parse_root_cert() const;
int parse_max_message_size() const;
int parse_sideband_port() const;
CoreConfiguration parse_core_configuration() const;
FeatureToggles parse_feature_toggles() const;
FeatureToggles::CodeReadiness parse_code_readiness() const;

Expand All @@ -61,6 +65,10 @@ class ServerConfigurationParser {
InvalidPortException();
};

struct InvalidCoreException : public std::runtime_error {
InvalidCoreException();
};

struct MalformedJsonException : public std::runtime_error {
MalformedJsonException(const std::string& parse_error_details);
};
Expand All @@ -69,6 +77,10 @@ class ServerConfigurationParser {
WrongPortTypeException(const std::string& type_error_details);
};

struct WrongCoreTypeException : public std::runtime_error {
WrongCoreTypeException(const std::string& type_error_details);
};

struct UnspecifiedPortException : public std::runtime_error {
UnspecifiedPortException();
};
Expand Down Expand Up @@ -105,6 +117,7 @@ class ServerConfigurationParser {
std::string parse_bind_address() const;
int parse_port() const;
int parse_port_with_key(const std::string& key) const;
int parse_core_with_key(const std::string& key) const;

std::string config_file_path_;
nlohmann::json config_file_;
Expand Down
Loading