Skip to content

Commit

Permalink
feat: Add Job Monitor Hook (#325)
Browse files Browse the repository at this point in the history
* monitor_v2 (#317)

* refactor: Refactor hook name and function calling

* fix: Typos

* chore: Disable plugin module by default

* chore: Remove unnecessary headers

---------

Co-authored-by: Harvey Zhou <[email protected]>
  • Loading branch information
Nativu5 and JingxuanZhouNo1 authored Oct 5, 2024
1 parent 8f484dc commit a11c9e0
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 4 deletions.
2 changes: 1 addition & 1 deletion etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ RejectJobsBeyondCapacity: false

Plugin:
# Toggle the plugin module in CraneSched
Enabled: true
Enabled: false
# Relative to CraneBaseDir
PlugindSockPath: "cplugind/cplugind.sock"
# Debug level of Plugind
Expand Down
9 changes: 9 additions & 0 deletions protos/Plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,18 @@ message EndHookReply {
repeated TaskIdReply result = 1;
}

message JobMonitorHookRequest {
uint32 task_id = 1;
string cgroup = 2;
}

message JobMonitorHookReply {
bool ok = 1;
}

service CranePluginD {
/* ----------------------------------- Called from CraneCtld ---------------------------------------------------- */
rpc StartHook(StartHookRequest) returns (StartHookReply);
rpc EndHook(EndHookRequest) returns (EndHookReply);
rpc JobMonitorHook(JobMonitorHookRequest) returns (JobMonitorHookReply);
}
7 changes: 7 additions & 0 deletions src/Craned/CgroupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "CranedPublicDefs.h"
#include "DeviceManager.h"
#include "crane/PluginClient.h"
#include "crane/String.h"

namespace Craned {
Expand Down Expand Up @@ -291,6 +292,7 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id, Cgroup **cg) {
if (!cg_spec_it) return false;
res = cg_spec_it->res_in_node;
}

{
auto cg_it = m_task_id_to_cg_map_[task_id];
auto &cg_unique_ptr = *cg_it;
Expand All @@ -308,6 +310,11 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id, Cgroup **cg) {
if (cg) *cg = pcg;
}

// JobMonitorHook
if (g_config.Plugin.Enabled) {
g_plugin_client->JobMonitorHookAsync(task_id, pcg->GetCgroupString());
}

CRANE_TRACE(
"Setting cgroup limit of task #{}. CPU: {:.2f}, Mem: {:.2f} MB Gres: {}.",
task_id, res.allocatable_res_in_node().cpu_core_limit(),
Expand Down
1 change: 1 addition & 0 deletions src/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "CgroupManager.h"
#include "crane/OS.h"
#include "protos/CraneSubprocess.pb.h"
#include "protos/PublicDefs.pb.h"

namespace Craned {

Expand Down
26 changes: 26 additions & 0 deletions src/Utilities/PluginClient/PluginClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@

#include "crane/GrpcHelper.h"
#include "crane/Logger.h"
#include "crane/PublicHeader.h"
#include "protos/Crane.grpc.pb.h"
#include "protos/Crane.pb.h"
#include "protos/Plugin.pb.h"
#include "protos/PublicDefs.pb.h"

namespace plugin {
Expand Down Expand Up @@ -145,6 +147,18 @@ grpc::Status PluginClient::SendEndHook_(grpc::ClientContext* context,
return m_stub_->EndHook(context, *request, &reply);
}

grpc::Status PluginClient::SendJobMonitorHook_(grpc::ClientContext* context,
google::protobuf::Message* msg) {
using crane::grpc::plugin::JobMonitorHookReply;
using crane::grpc::plugin::JobMonitorHookRequest;

auto request = dynamic_cast<JobMonitorHookRequest*>(msg);
JobMonitorHookReply reply;

CRANE_TRACE("[Plugin] Sending JobMonitorHook.");
return m_stub_->JobMonitorHook(context, *request, &reply);
}

void PluginClient::StartHookAsync(std::vector<crane::grpc::TaskInfo> tasks) {
auto request = std::make_unique<crane::grpc::plugin::StartHookRequest>();
auto* task_list = request->mutable_task_info_list();
Expand Down Expand Up @@ -175,4 +189,16 @@ void PluginClient::EndHookAsync(std::vector<crane::grpc::TaskInfo> tasks) {
m_event_queue_.enqueue(std::move(e));
}

void PluginClient::JobMonitorHookAsync(task_id_t task_id,
std::string cgroup_path) {
auto request = std::make_unique<crane::grpc::plugin::JobMonitorHookRequest>();
request->set_task_id(task_id);
request->set_cgroup(cgroup_path);

HookEvent e{HookType::JOB_MONITOR,
std::unique_ptr<google::protobuf::Message>(std::move(request))};

m_event_queue_.enqueue(std::move(e));
}

} // namespace plugin
11 changes: 8 additions & 3 deletions src/Utilities/PluginClient/include/crane/PluginClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <thread>
#include <vector>

#include "crane/PublicHeader.h"
#include "protos/Crane.grpc.pb.h"
#include "protos/Crane.pb.h"
#include "protos/Plugin.grpc.pb.h"
Expand All @@ -47,7 +48,7 @@ class PluginClient {
enum class HookType {
START,
END,

JOB_MONITOR,
HookTypeCount,
};

Expand All @@ -61,6 +62,7 @@ class PluginClient {
// These functions are used to add HookEvent into the event queue.
void StartHookAsync(std::vector<crane::grpc::TaskInfo> tasks);
void EndHookAsync(std::vector<crane::grpc::TaskInfo> tasks);
void JobMonitorHookAsync(task_id_t task_id, std::string cgroup_path);

private:
// HookDispatchFunc is a function pointer type that handles different
Expand All @@ -71,6 +73,8 @@ class PluginClient {
google::protobuf::Message* msg);
grpc::Status SendEndHook_(grpc::ClientContext* context,
google::protobuf::Message* msg);
grpc::Status SendJobMonitorHook_(grpc::ClientContext* context,
google::protobuf::Message* msg);

void AsyncSendThread_();

Expand All @@ -85,8 +89,9 @@ class PluginClient {
// Use this array to dispatch the hook event to the corresponding function in
// O(1) time.
static constexpr std::array<HookDispatchFunc, size_t(HookType::HookTypeCount)>
s_hook_dispatch_funcs_{
{&PluginClient::SendStartHook_, &PluginClient::SendEndHook_}};
s_hook_dispatch_funcs_{{&PluginClient::SendStartHook_,
&PluginClient::SendEndHook_,
&PluginClient::SendJobMonitorHook_}};
};

} // namespace plugin
Expand Down

0 comments on commit a11c9e0

Please sign in to comment.