Skip to content

Commit

Permalink
过滤掉数据库中失效作业信息、添加slurm
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjie727 committed Apr 24, 2024
1 parent 86b3c7b commit 670e40f
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 13 deletions.
31 changes: 22 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1789,7 +1789,7 @@ func (s *serverConfig) GetAvailablePartitions(ctx context.Context, in *pb.GetAva
errInfo := &errdetails.ErrorInfo{
Reason: "COMMAND_EXEC_FAILED",
}
st := status.New(codes.Internal, "slurmctld down.")
st := status.New(codes.Internal, "Exec command failed or slurmctld down.")
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
Expand Down Expand Up @@ -2029,7 +2029,6 @@ func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfo
// fullCmd := getPartitionStatusCmd + " --format='%P %c %C %G %a %D %F'"
fullCmd := getPartitionStatusCmd + " --format='%P %c %C %G %a %D %F'| tr '\n' ','"
result, err := utils.RunCommand(fullCmd) // 状态
// fmt.Println(result, 111111)
if err != nil || utils.CheckSlurmStatus(result) {
errInfo := &errdetails.ErrorInfo{
Reason: "COMMAND_EXEC_FAILED",
Expand All @@ -2039,7 +2038,6 @@ func (s *serverConfig) GetClusterInfo(ctx context.Context, in *pb.GetClusterInfo
return nil, st.Err()
}

// 改bug
partitionElements := strings.Split(result, ",")
// 解析每个元素
for _, partitionElement := range partitionElements {
Expand Down Expand Up @@ -2773,14 +2771,11 @@ func (s *serverJob) GetJobs(ctx context.Context, in *pb.GetJobsRequest) (*pb.Get
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}
// logger.Infof("error %v ", err)
if len(pendingUserResult) != 0 {
pendingUserMap = utils.GetPendingMapInfo(pendingUserResult)
}
// logger.Infof("testttt %v %v %v", setBool, filterStates, submitUser)
if setBool && len(filterStates) != 0 && len(submitUser) != 0 {
// 新增判断逻辑 1117
// logger.Infof("rrrrrrrrrrrrrrrrrrrrrrrrr")
if len(in.Filter.Accounts) == 0 {
getJobInfoCmdLine = fmt.Sprintf("squeue -u %s --noheader", strings.Join(submitUser, ","))
} else {
Expand Down Expand Up @@ -2816,8 +2811,6 @@ func (s *serverJob) GetJobs(ctx context.Context, in *pb.GetJobsRequest) (*pb.Get
var singerJobtimeLimitMinutes int64
if len(strings.Split(v, " ")) == 17 {
singerJobInfo := strings.Split(v, " ")
// logger.Infof("RUNNING JOBS List %v ", singerJobInfo)
// logger.Infof("RUNNING JOBS List LENGTH %v ", len(singerJobInfo))
singerJobAccount := singerJobInfo[1]
singerJobUserName := singerJobInfo[13]
singerJobJobId, _ := strconv.Atoi(singerJobInfo[2])
Expand Down Expand Up @@ -3233,6 +3226,7 @@ func (s *serverJob) GetJobs(ctx context.Context, in *pb.GetJobsRequest) (*pb.Get
}

if err != nil {
logrus.Infof("Received err job info: %v", jobId)
continue // 一般是数据库中有数据但是squeue中没有数据导致执行命令行失败
}
}
Expand All @@ -3259,6 +3253,23 @@ func (s *serverJob) GetJobs(ctx context.Context, in *pb.GetJobsRequest) (*pb.Get
}
}
} else if state == 1 {
// 新加逻辑
getReasonCmdTmp := fmt.Sprintf("squeue -j %d --noheader ", jobId)
getReasonCmd := getReasonCmdTmp + " --format='%R'"
reason, err := utils.RunCommand(getReasonCmd)
if utils.CheckSlurmStatus(reason) {
errInfo := &errdetails.ErrorInfo{
Reason: "SLURMCTLD_FAILED",
}
st := status.New(codes.Internal, "slurmctld down.")
st, _ = st.WithDetails(errInfo)
return nil, st.Err()
}

if err != nil {
continue // 一般是数据库中有数据但是squeue中没有数据导致执行命令行失败
}

reason = "Running"
cpusAlloc = int32(utils.GetResInfoNumFromTresInfo(tresAlloc, cpuTresId))
memAllocMb = int64(utils.GetResInfoNumFromTresInfo(tresAlloc, memTresId))
Expand Down Expand Up @@ -3485,7 +3496,8 @@ func (s *serverJob) SubmitJob(ctx context.Context, in *pb.SubmitJobRequest) (*pb
}
scriptString += "#SBATCH " + "-J " + in.JobName + "\n"
scriptString += "#SBATCH " + "--nodes=" + strconv.Itoa(int(in.NodeCount)) + "\n"
scriptString += "#SBATCH " + "-c " + strconv.Itoa(int(in.CoreCount)) + "\n"
// scriptString += "#SBATCH " + "-c " + strconv.Itoa(int(in.CoreCount)) + "\n"
scriptString += "#SBATCH " + "--ntasks-per-node=" + strconv.Itoa(int(in.CoreCount)) + "\n"
if in.TimeLimitMinutes != nil {
scriptString += "#SBATCH " + "--time=" + strconv.Itoa(int(*in.TimeLimitMinutes)) + "\n"
}
Expand Down Expand Up @@ -3613,6 +3625,7 @@ func main() {
var (
err error
)
os.Setenv("SLURM_TIME_FORMAT", "standard") // 新加slurm环境变量
// 连接数据库
dbConfig := utils.DatabaseConfig()
db, err = sql.Open("mysql", dbConfig)
Expand Down
2 changes: 1 addition & 1 deletion tests/account/ListAccounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestListAccounts(t *testing.T) {

// Call the Add RPC with test data
req := &pb.ListAccountsRequest{
UserId: "root",
UserId: "demo_admin",
}
res, err := client.ListAccounts(context.Background(), req)
if err != nil {
Expand Down
28 changes: 25 additions & 3 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Service struct {

type Slurm struct {
DefaultQOS string `yaml:"defaultqos"`
Slurmpath string `yaml:"slurmpath,omitempty"`
}

type Modulepath struct {
Expand Down Expand Up @@ -352,7 +353,14 @@ func CheckAccountOrUserStrings(s string) bool {
func LocalSubmitJob(scriptString string, username string) (string, error) {
// 提交作业命令行
// cmdLine := fmt.Sprintf("su - %s -c '/usr/bin/sbatch'", username)
cmdLine := fmt.Sprintf("su - %s -c '/usr/bin/sbatch'", username)
config := ParseConfig(DefaultConfigPath)
slurmpath := config.Slurm.Slurmpath
if slurmpath == "" {
// 如果未定义,则将其设置为默认值 "/usr"
slurmpath = "/usr"
}
cmdLine := fmt.Sprintf("su - %s -c '%s/bin/sbatch'", username, slurmpath)
// cmdLine := fmt.Sprintf("su - %s -c '/usr/bin/sbatch'", username)
cmd := exec.Command("bash", "-c", cmdLine)

// 创建一个 bytes.Buffer 用于捕获输出
Expand All @@ -373,7 +381,14 @@ func LocalSubmitJob(scriptString string, username string) (string, error) {
}

func LocalFileSubmitJob(filePath string, username string) (string, error) {
cmdLine := fmt.Sprintf("su - %s -c '/usr/bin/sbatch %s'", username, filePath)
config := ParseConfig(DefaultConfigPath)
slurmpath := config.Slurm.Slurmpath
if slurmpath == "" {
// 如果未定义,则将其设置为默认值 "/usr"
slurmpath = "/usr"
}
cmdLine := fmt.Sprintf("su - %s -c '%s/bin/sbatch %s'", username, slurmpath, filePath)
// cmdLine := fmt.Sprintf("su - %s -c '/usr/bin/sbatch %s'", username, filePath)
cmd := exec.Command("bash", "-c", cmdLine)
// 创建一个 bytes.Buffer 用于捕获输出
var output bytes.Buffer
Expand Down Expand Up @@ -405,7 +420,14 @@ func GetUserHomedir(username string) (string, error) {

// 取消作业函数
func LocalCancelJob(username string, jobId int) (string, error) {
cmdLine := fmt.Sprintf("su - %s -c 'scancel %d'", username, jobId)
config := ParseConfig(DefaultConfigPath)
slurmpath := config.Slurm.Slurmpath
if slurmpath == "" {
// 如果未定义,则将其设置为默认值 "/usr"
slurmpath = "/usr"
}
cmdLine := fmt.Sprintf("su - %s -c '%s/bin/scancel %d'", username, slurmpath, jobId)
// cmdLine := fmt.Sprintf("su - %s -c 'scancel %d'", username, jobId)
cmd := exec.Command("bash", "-c", cmdLine)
// 创建一个 bytes.Buffer 用于捕获输出
var output bytes.Buffer
Expand Down

0 comments on commit 670e40f

Please sign in to comment.