diff --git a/etc/config.yaml b/etc/config.yaml index c51462b48..85857255d 100644 --- a/etc/config.yaml +++ b/etc/config.yaml @@ -123,7 +123,7 @@ RejectJobsBeyondCapacity: false Plugin: # Toggle the plugin module in CraneSched - Enabled: true + Enabled: false # Relative to CraneBaseDir PlugindSockPath: "cplugind/cplugind.sock" # Debug level of Plugind diff --git a/protos/Plugin.proto b/protos/Plugin.proto index b329deca5..779b5270b 100644 --- a/protos/Plugin.proto +++ b/protos/Plugin.proto @@ -49,9 +49,18 @@ message EndHookReply { repeated TaskIdReply result = 1; } +message JobMonitorHookRequest { + uint32 task_id = 1; + string cgroup = 2; +} + +message JobMonitorHookReply { + bool ok = 1; +} service CranePluginD { /* ----------------------------------- Called from CraneCtld ---------------------------------------------------- */ rpc StartHook(StartHookRequest) returns (StartHookReply); rpc EndHook(EndHookRequest) returns (EndHookReply); + rpc JobMonitorHook(JobMonitorHookRequest) returns (JobMonitorHookReply); } diff --git a/src/Craned/CgroupManager.cpp b/src/Craned/CgroupManager.cpp index f161b6e03..d7a72ef7d 100644 --- a/src/Craned/CgroupManager.cpp +++ b/src/Craned/CgroupManager.cpp @@ -23,6 +23,7 @@ #include "CranedPublicDefs.h" #include "DeviceManager.h" +#include "crane/PluginClient.h" #include "crane/String.h" namespace Craned { @@ -291,6 +292,7 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id, Cgroup **cg) { if (!cg_spec_it) return false; res = cg_spec_it->res_in_node; } + { auto cg_it = m_task_id_to_cg_map_[task_id]; auto &cg_unique_ptr = *cg_it; @@ -308,6 +310,11 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id, Cgroup **cg) { if (cg) *cg = pcg; } + // JobMonitorHook + if (g_config.Plugin.Enabled) { + g_plugin_client->JobMonitorHookAsync(task_id, pcg->GetCgroupString()); + } + CRANE_TRACE( "Setting cgroup limit of task #{}. CPU: {:.2f}, Mem: {:.2f} MB Gres: {}.", task_id, res.allocatable_res_in_node().cpu_core_limit(), diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 7d86b518f..10630cc96 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -24,6 +24,7 @@ #include "CgroupManager.h" #include "crane/OS.h" #include "protos/CraneSubprocess.pb.h" +#include "protos/PublicDefs.pb.h" namespace Craned { diff --git a/src/Utilities/PluginClient/PluginClient.cpp b/src/Utilities/PluginClient/PluginClient.cpp index 3850ceb0f..c548445ea 100644 --- a/src/Utilities/PluginClient/PluginClient.cpp +++ b/src/Utilities/PluginClient/PluginClient.cpp @@ -33,8 +33,10 @@ #include "crane/GrpcHelper.h" #include "crane/Logger.h" +#include "crane/PublicHeader.h" #include "protos/Crane.grpc.pb.h" #include "protos/Crane.pb.h" +#include "protos/Plugin.pb.h" #include "protos/PublicDefs.pb.h" namespace plugin { @@ -145,6 +147,18 @@ grpc::Status PluginClient::SendEndHook_(grpc::ClientContext* context, return m_stub_->EndHook(context, *request, &reply); } +grpc::Status PluginClient::SendJobMonitorHook_(grpc::ClientContext* context, + google::protobuf::Message* msg) { + using crane::grpc::plugin::JobMonitorHookReply; + using crane::grpc::plugin::JobMonitorHookRequest; + + auto request = dynamic_cast(msg); + JobMonitorHookReply reply; + + CRANE_TRACE("[Plugin] Sending JobMonitorHook."); + return m_stub_->JobMonitorHook(context, *request, &reply); +} + void PluginClient::StartHookAsync(std::vector tasks) { auto request = std::make_unique(); auto* task_list = request->mutable_task_info_list(); @@ -175,4 +189,16 @@ void PluginClient::EndHookAsync(std::vector tasks) { m_event_queue_.enqueue(std::move(e)); } +void PluginClient::JobMonitorHookAsync(task_id_t task_id, + std::string cgroup_path) { + auto request = std::make_unique(); + request->set_task_id(task_id); + request->set_cgroup(cgroup_path); + + HookEvent e{HookType::JOB_MONITOR, + std::unique_ptr(std::move(request))}; + + m_event_queue_.enqueue(std::move(e)); +} + } // namespace plugin diff --git a/src/Utilities/PluginClient/include/crane/PluginClient.h b/src/Utilities/PluginClient/include/crane/PluginClient.h index 773397167..f5d474af6 100644 --- a/src/Utilities/PluginClient/include/crane/PluginClient.h +++ b/src/Utilities/PluginClient/include/crane/PluginClient.h @@ -27,6 +27,7 @@ #include #include +#include "crane/PublicHeader.h" #include "protos/Crane.grpc.pb.h" #include "protos/Crane.pb.h" #include "protos/Plugin.grpc.pb.h" @@ -47,7 +48,7 @@ class PluginClient { enum class HookType { START, END, - + JOB_MONITOR, HookTypeCount, }; @@ -61,6 +62,7 @@ class PluginClient { // These functions are used to add HookEvent into the event queue. void StartHookAsync(std::vector tasks); void EndHookAsync(std::vector tasks); + void JobMonitorHookAsync(task_id_t task_id, std::string cgroup_path); private: // HookDispatchFunc is a function pointer type that handles different @@ -71,6 +73,8 @@ class PluginClient { google::protobuf::Message* msg); grpc::Status SendEndHook_(grpc::ClientContext* context, google::protobuf::Message* msg); + grpc::Status SendJobMonitorHook_(grpc::ClientContext* context, + google::protobuf::Message* msg); void AsyncSendThread_(); @@ -85,8 +89,9 @@ class PluginClient { // Use this array to dispatch the hook event to the corresponding function in // O(1) time. static constexpr std::array - s_hook_dispatch_funcs_{ - {&PluginClient::SendStartHook_, &PluginClient::SendEndHook_}}; + s_hook_dispatch_funcs_{{&PluginClient::SendStartHook_, + &PluginClient::SendEndHook_, + &PluginClient::SendJobMonitorHook_}}; }; } // namespace plugin