Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CPU affinity used for CPU pinning configurable via json file #1136

Merged
merged 15 commits into from
Dec 18, 2024
4 changes: 4 additions & 0 deletions source/server/core_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "logging.h"
#include "server_configuration_parser.h"
#include "server_security_configuration.h"
#include "moniker_stream_processor.h"

#if defined(__GNUC__)
#include <sys/mman.h>
Expand Down Expand Up @@ -37,6 +38,7 @@ struct ServerConfiguration {
int max_message_size;
int sideband_port;
nidevice_grpc::FeatureToggles feature_toggles;
MonikerStreamProcessor stream_processor;
};

static ServerConfiguration GetConfiguration(const std::string& config_file_path)
Expand All @@ -51,6 +53,7 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)
config.server_address = server_config_parser.parse_address();
config.sideband_address = server_config_parser.parse_sideband_address();
config.sideband_port = server_config_parser.parse_sideband_port();
config.stream_processor = server_config_parser.parse_moniker_stream_processor();
config.server_cert = server_config_parser.parse_server_cert();
config.server_key = server_config_parser.parse_server_key();
config.root_cert = server_config_parser.parse_root_cert();
Expand Down Expand Up @@ -113,6 +116,7 @@ static void RunServer(const ServerConfiguration& config)
server = builder.BuildAndStart();
if (ni::data_monikers::is_sideband_streaming_enabled(config.feature_toggles)) {
auto sideband_socket_thread = new std::thread(RunSidebandSocketsAccept, config.sideband_address.c_str(), config.sideband_port);
ni::data_monikers::configure_moniker_stream_processor(config.stream_processor);
// auto sideband_rdma_send_thread = new std::thread(AcceptSidebandRdmaSendRequests);
// auto sideband_rdma_recv_thread = new std::thread(AcceptSidebandRdmaReceiveRequests);
}
Expand Down
42 changes: 33 additions & 9 deletions source/server/data_moniker_service.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//---------------------------------------------------------------------
//---------------------------------------------------------------------
#include "data_moniker_service.h"
#include "moniker_stream_processor.h"

#include <sideband_data.h>
#include <sideband_grpc.h>
Expand Down Expand Up @@ -45,6 +46,13 @@ static void SysFsWrite(const std::string& fileName, const std::string& value)

namespace ni::data_monikers {

static MonikerStreamProcessor s_StreamProcessor;

void configure_moniker_stream_processor(const MonikerStreamProcessor& stream_processor)
{
s_StreamProcessor = stream_processor;
}

bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles)
{
return feature_toggles.is_feature_enabled("sideband_streaming", nidevice_grpc::FeatureToggles::CodeReadiness::kNextRelease);
Expand Down Expand Up @@ -103,6 +111,20 @@ void DataMonikerService::InitiateMonikerList(const MonikerList& monikers, Endpoi
}
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
#ifndef _WIN32
void set_cpu_affinity(int cpu)
{
if (cpu >= 0) {
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(cpu, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}
}
#endif

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::SidebandStrategy strategy, EndpointList* readers, EndpointList* writers)
Expand All @@ -113,10 +135,7 @@ void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::S
pid_t threadId = syscall(SYS_gettid);
::SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/tasks", std::to_string(threadId));

cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(8, &cpuSet);
sched_setaffinity(threadId, sizeof(cpu_set_t), &cpuSet);
set_cpu_affinity(s_StreamProcessor.moniker_sideband_stream_read_write);
}
#endif

Expand Down Expand Up @@ -169,7 +188,7 @@ Status DataMonikerService::BeginSidebandStream(ServerContext* context, const Beg
char identifier[32] = {};
InitOwnerSidebandData(strategy, bufferSize, identifier);
std::string identifierString(identifier);

response->set_strategy(request->strategy());
response->set_sideband_identifier(identifier);
response->set_connection_url(GetConnectionAddress(strategy));
Expand All @@ -190,6 +209,10 @@ Status DataMonikerService::BeginSidebandStream(ServerContext* context, const Beg
//---------------------------------------------------------------------
Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderWriter<MonikerReadResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
set_cpu_affinity(s_StreamProcessor.moniker_stream_read_write);
#endif

EndpointList writers;
EndpointList readers;
MonikerWriteRequest writeRequest;
Expand Down Expand Up @@ -218,6 +241,10 @@ Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderW
//---------------------------------------------------------------------
Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList* request, ServerWriter<MonikerReadResponse>* writer)
{
#ifndef _WIN32
set_cpu_affinity(s_StreamProcessor.moniker_stream_read);
#endif

EndpointList writers;
EndpointList readers;
InitiateMonikerList(*request, &readers, &writers);
Expand All @@ -240,10 +267,7 @@ Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList*
Status DataMonikerService::StreamWrite(ServerContext* context, ServerReaderWriter<StreamWriteResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(1, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
set_cpu_affinity(s_StreamProcessor.moniker_stream_write);
#endif

EndpointList writers;
Expand Down
5 changes: 4 additions & 1 deletion source/server/data_moniker_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
#include <map>
#include <sideband_data.h>
#include "feature_toggles.h"
#include "moniker_stream_processor.h"

//---------------------------------------------------------------------
//---------------------------------------------------------------------
namespace ni::data_monikers
{
void configure_moniker_stream_processor(const MonikerStreamProcessor& stream_processor);
bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles);
void set_cpu_affinity(int cpu);
//---------------------------------------------------------------------
//---------------------------------------------------------------------
using MonikerEndpointPtr = std::add_pointer<::grpc::Status(void*, google::protobuf::Arena& arena, google::protobuf::Any&)>::type;
Expand All @@ -37,7 +40,7 @@ namespace ni::data_monikers
public:
static void RegisterMonikerEndpoint(std::string endpointName, MonikerEndpointPtr endpoint);
static void RegisterMonikerInstance(std::string endpointName, void* instanceData, Moniker& moniker);

private:
static DataMonikerService* s_Server;
std::map<std::string, MonikerEndpointPtr> _endpoints;
Expand Down
11 changes: 11 additions & 0 deletions source/server/moniker_stream_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#ifndef MONIKER_STREAM_PROCESSOR_H
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved
#define MONIKER_STREAM_PROCESSOR_H

struct MonikerStreamProcessor {
int moniker_sideband_stream_read_write = -1;
int moniker_stream_write = -1;
int moniker_stream_read = -1;
int moniker_stream_read_write = -1;
};

#endif // Moniker_Stream_Processor_H
51 changes: 51 additions & 0 deletions source/server/server_configuration_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sstream>

#include "feature_toggles.h"
#include "moniker_stream_processor.h"

#if defined(_MSC_VER)
#include <windows.h>
Expand All @@ -19,6 +20,11 @@ static const char* kAddressJsonKey = "address";
static const char* kPortJsonKey = "port";
static const char* kSidebandAddressJsonKey = "sideband_address";
static const char* kSidebandPortJsonKey = "sideband_port";
static const char* kMonikerStreamProcessorKey = "moniker_stream_processor_configuration";
static const char* kMonikerSidebandStreamReadWriteKey = "moniker_sideband_stream_read_write";
static const char* kMonikerStreamWriteKey = "moniker_stream_write";
static const char* kMonikerStreamReadKey = "moniker_stream_read";
static const char* kMonikerStreamReadWriteKey = "moniker_stream_read_write";
static const char* kServerCertJsonKey = "server_cert";
static const char* kServerKeyJsonKey = "server_key";
static const char* kRootCertJsonKey = "root_cert";
Expand Down Expand Up @@ -287,6 +293,41 @@ int ServerConfigurationParser::parse_port_with_key(const std::string& key) const
return parsed_port;
}

MonikerStreamProcessor ServerConfigurationParser::parse_moniker_stream_processor() const
{
MonikerStreamProcessor stream_processor;

auto core_config_it = config_file_.find(kMonikerStreamProcessorKey);
if (core_config_it != config_file_.end()) {
stream_processor.moniker_sideband_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerSidebandStreamReadWriteKey);
stream_processor.moniker_stream_write = parse_moniker_stream_processor_with_key(kMonikerStreamWriteKey);
stream_processor.moniker_stream_read = parse_moniker_stream_processor_with_key(kMonikerStreamReadKey);
stream_processor.moniker_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerStreamReadWriteKey);
}
return stream_processor;
}

int ServerConfigurationParser::parse_moniker_stream_processor_with_key(const std::string& key) const
{
int parsed_core = -1;
Raghav-NI marked this conversation as resolved.
Show resolved Hide resolved

auto it = config_file_.find(key);
if (it != config_file_.end()) {
try {
parsed_core = it->get<int>();
}
catch (const nlohmann::json::type_error& ex) {
throw WrongMonikerStreamProcessorTypeException(ex.what());
}
}

if (parsed_core < -1) {
throw InvalidMonikerStreamProcessorException();
}

return parsed_core;
}

ServerConfigurationParser::ConfigFileNotFoundException::ConfigFileNotFoundException(const std::string& config_file_path)
: std::runtime_error(kConfigFileNotFoundMessage + config_file_path)
{
Expand All @@ -307,6 +348,11 @@ ServerConfigurationParser::InvalidPortException::InvalidPortException()
{
}

ServerConfigurationParser::InvalidMonikerStreamProcessorException::InvalidMonikerStreamProcessorException()
: std::runtime_error(kInvalidMonikerStreamProcessorMessage)
{
}

ServerConfigurationParser::MalformedJsonException::MalformedJsonException(const std::string& parse_error_details)
: std::runtime_error(kMalformedJsonMessage + parse_error_details)
{
Expand All @@ -317,6 +363,11 @@ ServerConfigurationParser::WrongPortTypeException::WrongPortTypeException(const
{
}

ServerConfigurationParser::WrongMonikerStreamProcessorTypeException::WrongMonikerStreamProcessorTypeException(const std::string& type_error_details)
: std::runtime_error(kWrongMonikerStreamProcessorTypeMessage + type_error_details)
{
}

ServerConfigurationParser::UnspecifiedPortException::UnspecifiedPortException()
: std::runtime_error(kUnspecifiedPortMessage)
{
Expand Down
13 changes: 13 additions & 0 deletions source/server/server_configuration_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
#include <nlohmann/json.hpp>

#include "feature_toggles.h"
#include "moniker_stream_processor.h"

namespace nidevice_grpc {

static const char* kConfigFileNotFoundMessage = "The server configuration file was not found at: ";
static const char* kInvalidAddressMessage = "The specified address is not valid.\n Use a valid IPv4 or IPv6 address. Valid values include localhost, 192.168.1.1, [::], [::1], etc.";
static const char* kWrongAddressTypeMessage = "The server address must be specified in the server's configuration file as a string: \n\n";
static const char* kInvalidPortMessage = "The specified port number must between 0 and 65535.";
static const char* kInvalidMonikerStreamProcessorMessage = "The specified moniker stream processor must be -1 or greater. -1 indicates that any available cpu core can be used.";
static const char* kMalformedJsonMessage = "The JSON in the server configuration file is malformed: \n\n";
static const char* kWrongPortTypeMessage = "The server port must be specified in the server's configuration file as an integer: \n\n";
static const char* kWrongMonikerStreamProcessorTypeMessage = "The moniker stream processor must be specified in the server's configuration file as an integer: \n\n";
static const char* kUnspecifiedPortMessage = "The server port must be specified in the server's configuration file.";
static const char* kValueTypeNotStringMessage = "The following key must be specified in the server's configuration file as a string enclosed with double quotes: ";
static const char* kFileNotFoundMessage = "The following certificate file was not found: ";
Expand Down Expand Up @@ -42,6 +45,7 @@ class ServerConfigurationParser {
std::string parse_root_cert() const;
int parse_max_message_size() const;
int parse_sideband_port() const;
MonikerStreamProcessor parse_moniker_stream_processor() const;
FeatureToggles parse_feature_toggles() const;
FeatureToggles::CodeReadiness parse_code_readiness() const;

Expand All @@ -61,6 +65,10 @@ class ServerConfigurationParser {
InvalidPortException();
};

struct InvalidMonikerStreamProcessorException : public std::runtime_error {
InvalidMonikerStreamProcessorException();
};

struct MalformedJsonException : public std::runtime_error {
MalformedJsonException(const std::string& parse_error_details);
};
Expand All @@ -69,6 +77,10 @@ class ServerConfigurationParser {
WrongPortTypeException(const std::string& type_error_details);
};

struct WrongMonikerStreamProcessorTypeException : public std::runtime_error {
WrongMonikerStreamProcessorTypeException(const std::string& type_error_details);
};

struct UnspecifiedPortException : public std::runtime_error {
UnspecifiedPortException();
};
Expand Down Expand Up @@ -105,6 +117,7 @@ class ServerConfigurationParser {
std::string parse_bind_address() const;
int parse_port() const;
int parse_port_with_key(const std::string& key) const;
int parse_moniker_stream_processor_with_key(const std::string& key) const;

std::string config_file_path_;
nlohmann::json config_file_;
Expand Down
Loading