diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java index faa1a9c04..2c60e9930 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java @@ -49,5 +49,8 @@ public class DispatchFrame extends FrameEntity implements FrameInterface { // A comma separated list of services public String services; + + // The Operational System this frame is expected to run in + public String os; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java index f01724e17..40a3e6bbc 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java @@ -51,7 +51,7 @@ public class DispatchHost extends Entity public long gpuMemory; public long idleGpuMemory; public String tags; - public String os; + private String os; public boolean isNimby; public boolean isLocalDispatch = false; @@ -81,6 +81,14 @@ public String getFacilityId() { return facilityId; } + public String[] getOs() { + return this.os.split(","); + } + + public void setOs(String os) { + this.os = os; + } + public boolean canHandleNegativeCoresRequest(int requestedCores) { // Request is positive, no need to test further. if (requestedCores > 0) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java index 8205f3021..02ade6bb4 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java @@ -85,7 +85,9 @@ public String getName() { * @param frame * @return */ - public static final VirtualProc build(DispatchHost host, DispatchFrame frame, String... selfishServices) { + public static final VirtualProc build(DispatchHost host, + DispatchFrame frame, + String... selfishServices) { VirtualProc proc = new VirtualProc(); proc.allocationId = host.getAllocationId(); proc.hostId = host.getHostId(); @@ -94,7 +96,7 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, St proc.jobId = frame.getJobId(); proc.showId = frame.getShowId(); proc.facilityId = frame.getFacilityId(); - proc.os = host.os; + proc.os = frame.os; proc.hostName = host.getName(); proc.unbooked = false; @@ -148,7 +150,7 @@ else if (proc.coresReserved >= 100) { proc.coresReserved = wholeCores * 100; } else { if (frame.threadable) { - if (selfishServices != null && + if (selfishServices != null && frame.services != null && containsSelfishService(frame.services.split(","), selfishServices)){ proc.coresReserved = wholeCores * 100; @@ -238,7 +240,7 @@ public static final VirtualProc build(DispatchHost host, proc.jobId = frame.getJobId(); proc.showId = frame.getShowId(); proc.facilityId = frame.getFacilityId(); - proc.os = host.os; + proc.os = frame.os; proc.hostName = host.getName(); proc.unbooked = false; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java index e36f97999..02dae0f22 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java @@ -72,9 +72,9 @@ public class DispatchQuery { "AND job.pk_facility = ? " + "AND " + "(" + - "job.str_os IS NULL OR job.str_os = '' " + + "job.str_os IS NULL OR job.str_os IN '' " + "OR " + - "job.str_os = ? " + + "job.str_os IN ? " + ") " + "AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " + "AND layer.int_cores_min <= ? " + @@ -135,7 +135,7 @@ public class DispatchQuery { "(" + "job.str_os IS NULL OR job.str_os = '' " + "OR " + - "job.str_os = ? " + + "job.str_os IN ? " + ") " + "AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " + "AND layer.int_cores_min <= ? " + @@ -250,7 +250,7 @@ private static final String replaceQueryForFifo(String query) { "AND " + "job.pk_facility = ? " + "AND " + - "(job.str_os = ? OR job.str_os IS NULL) " + + "(job.str_os IN ? OR job.str_os IS NULL) " + "AND " + "job.pk_job IN ( " + "SELECT " + @@ -276,7 +276,7 @@ private static final String replaceQueryForFifo(String query) { "AND " + "j.pk_facility = ? " + "AND " + - "(j.str_os = ? OR j.str_os IS NULL) " + + "(j.str_os IN ? OR j.str_os IS NULL) " + "AND " + "(CASE WHEN lst.int_waiting_count > 0 THEN lst.pk_layer ELSE NULL END) = l.pk_layer " + "AND " + @@ -519,40 +519,42 @@ private static final String replaceQueryForFifo(String query) { ") " + "LIMIT 1"; + private static final String FIND_DISPATCH_FRAME_COLUMNS = + "show_name, " + + "job_name, " + + "pk_job, " + + "pk_show, " + + "pk_facility, " + + "str_name, " + + "str_shot, " + + "str_user, " + + "int_uid, " + + "str_log_dir, " + + "str_os, " + + "frame_name, " + + "frame_state, " + + "pk_frame, " + + "pk_layer, " + + "int_retries, " + + "int_version, " + + "layer_name, " + + "layer_type, " + + "b_threadable, " + + "int_cores_min, " + + "int_cores_max, " + + "int_mem_min, " + + "int_gpus_min, " + + "int_gpus_max, " + + "int_gpu_mem_min, " + + "str_cmd, " + + "str_range, " + + "int_chunk_size, " + + "str_services "; /** * Finds the next frame in a job for a proc. */ public static final String FIND_DISPATCH_FRAME_BY_JOB_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -569,6 +571,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -638,36 +641,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_DISPATCH_FRAME_BY_JOB_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "int_gpus_min, " + - "int_gpus_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -684,6 +658,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -754,36 +729,7 @@ private static final String replaceQueryForFifo(String query) { public static final String FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -800,6 +746,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -863,36 +810,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "int_gpus_min, " + - "int_gpus_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -909,6 +827,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -975,36 +894,7 @@ private static final String replaceQueryForFifo(String query) { * Finds the next frame in a job for a proc. */ public static final String FIND_DISPATCH_FRAME_BY_LAYER_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1021,6 +911,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1090,36 +981,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_DISPATCH_FRAME_BY_LAYER_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1136,6 +998,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1206,36 +1069,7 @@ private static final String replaceQueryForFifo(String query) { public static final String FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1252,6 +1086,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1315,36 +1150,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER (ORDER BY " + @@ -1361,6 +1167,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java index b17ae14e3..7db4714ea 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -172,6 +173,12 @@ else if (cached.isExpired()) { return bookableShows.get(key).shows; } + // Given a query, + private String handleInClause(String key, String query, int inValueLength) { + String placeholders = String.join(",", Collections.nCopies(inValueLength, "?")); + return query.replace(key + " IN ?", key + " IN (" + placeholders + ")"); + } + private Set findDispatchJobs(DispatchHost host, int numJobs, boolean shuffleShows) { LinkedHashSet result = new LinkedHashSet(); List shows = new LinkedList(getBookableShows(host)); @@ -216,20 +223,24 @@ private Set findDispatchJobs(DispatchHost host, int numJobs, boolean shu @Override public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { - PreparedStatement find_jobs_stmt = conn.prepareStatement( - FIND_JOBS_BY_SHOW_NO_GPU); - find_jobs_stmt.setString(1, s.getShowId()); - find_jobs_stmt.setString(2, host.getFacilityId()); - find_jobs_stmt.setString(3, host.os); - find_jobs_stmt.setInt(4, host.idleCores); - find_jobs_stmt.setLong(5, host.idleMemory); - find_jobs_stmt.setInt(6, threadMode(host.threadMode)); - find_jobs_stmt.setString(7, host.getName()); - find_jobs_stmt.setInt(8, numJobs * 10); + String query = handleInClause("str_os", FIND_JOBS_BY_SHOW_NO_GPU, host.getOs().length); + PreparedStatement find_jobs_stmt = conn.prepareStatement(query); + + int index = 1; + find_jobs_stmt.setString(index++, s.getShowId()); + find_jobs_stmt.setString(index++, host.getFacilityId()); + for (String item : host.getOs()) { + find_jobs_stmt.setString(index++, item); + } + find_jobs_stmt.setInt(index++, host.idleCores); + find_jobs_stmt.setLong(index++, host.idleMemory); + find_jobs_stmt.setInt(index++, threadMode(host.threadMode)); + find_jobs_stmt.setString(index++, host.getName()); + find_jobs_stmt.setInt(index++, numJobs * 10); return find_jobs_stmt; }}, PKJOB_MAPPER )); - prometheusMetrics.setBookingDurationMetric("findDispatchJobs nogpu findByShowQuery", + prometheusMetrics.setBookingDurationMetric("findDispatchJobs nogpu findByShowQuery", System.currentTimeMillis() - lastTime); } else { @@ -237,19 +248,22 @@ public PreparedStatement createPreparedStatement(Connection conn) @Override public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { - PreparedStatement find_jobs_stmt = conn.prepareStatement( - findByShowQuery()); - find_jobs_stmt.setString(1, s.getShowId()); - find_jobs_stmt.setString(2, host.getFacilityId()); - find_jobs_stmt.setString(3, host.os); - find_jobs_stmt.setInt(4, host.idleCores); - find_jobs_stmt.setLong(5, host.idleMemory); - find_jobs_stmt.setInt(6, threadMode(host.threadMode)); - find_jobs_stmt.setInt(7, host.idleGpus); - find_jobs_stmt.setLong(8, (host.idleGpuMemory > 0) ? 1 : 0); - find_jobs_stmt.setLong(9, host.idleGpuMemory); - find_jobs_stmt.setString(10, host.getName()); - find_jobs_stmt.setInt(11, numJobs * 10); + String query = handleInClause("str_os", findByShowQuery(), host.getOs().length); + PreparedStatement find_jobs_stmt = conn.prepareStatement(query); + int index = 1; + find_jobs_stmt.setString(index++, s.getShowId()); + find_jobs_stmt.setString(index++, host.getFacilityId()); + for (String item : host.getOs()) { + find_jobs_stmt.setString(index++, item); + } + find_jobs_stmt.setInt(index++, host.idleCores); + find_jobs_stmt.setLong(index++, host.idleMemory); + find_jobs_stmt.setInt(index++, threadMode(host.threadMode)); + find_jobs_stmt.setInt(index++, host.idleGpus); + find_jobs_stmt.setLong(index++, (host.idleGpuMemory > 0) ? 1 : 0); + find_jobs_stmt.setLong(index++, host.idleGpuMemory); + find_jobs_stmt.setString(index++, host.getName()); + find_jobs_stmt.setInt(index++, numJobs * 10); return find_jobs_stmt; }}, PKJOB_MAPPER )); @@ -308,31 +322,48 @@ public Set findDispatchJobs(DispatchHost host, GroupInterface g) { long lastTime = System.currentTimeMillis(); if (host.idleGpus == 0 && (schedulingMode == SchedulingMode.BALANCED)) { + String query = handleInClause("str_os", FIND_JOBS_BY_GROUP_NO_GPU, host.getOs().length); + ArrayList args = new ArrayList(); + + args.add(g.getGroupId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.getName()); + args.add(50); result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_GROUP_NO_GPU, - PKJOB_MAPPER, - g.getGroupId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.getName(), 50)); + query, + PKJOB_MAPPER, args.toArray())); prometheusMetrics.setBookingDurationMetric("findDispatchJobs by group nogpu query", System.currentTimeMillis() - lastTime); } else { + String query = handleInClause("str_os", findByGroupQuery(), host.getOs().length); + ArrayList args = new ArrayList(); + + args.add(g.getGroupId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.idleGpus); + args.add(host.idleGpuMemory > 0 ? 1 : 0); + args.add(host.idleGpuMemory); + args.add(host.getName()); + args.add(50); result.addAll(getJdbcTemplate().query( - findByGroupQuery(), - PKJOB_MAPPER, - g.getGroupId(),host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.idleGpus, - (host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory, - host.getName(), 50)); + query, + PKJOB_MAPPER, args.toArray())); prometheusMetrics.setBookingDurationMetric("findDispatchJobs by group query", System.currentTimeMillis() - lastTime); - } - return result; } @@ -515,26 +546,47 @@ public Set findDispatchJobs(DispatchHost host, LinkedHashSet result = new LinkedHashSet(numJobs); long start = System.currentTimeMillis(); if (host.idleGpus == 0 && (schedulingMode == SchedulingMode.BALANCED)) { + String query = handleInClause("str_os", FIND_JOBS_BY_SHOW_NO_GPU, host.getOs().length); + ArrayList args = new ArrayList(); + args.add(show.getShowId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.getName()); + args.add(numJobs * 10); + result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_SHOW_NO_GPU, - PKJOB_MAPPER, - show.getShowId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.getName(), numJobs * 10)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findDispatchJobs by show nogpu query", System.currentTimeMillis() - start); } else { + String query = handleInClause("str_os", findByShowQuery(), host.getOs().length); + ArrayList args = new ArrayList(); + args.add(show.getShowId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.idleGpus); + args.add(host.idleGpuMemory > 0 ? 1 : 0); + args.add(host.idleGpuMemory); + args.add(host.getName()); + args.add(numJobs * 10); + result.addAll(getJdbcTemplate().query( - findByShowQuery(), - PKJOB_MAPPER, - show.getShowId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.idleGpus, - (host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory, - host.getName(), numJobs * 10)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findDispatchJobs by show query", System.currentTimeMillis() - start); } @@ -548,11 +600,24 @@ public Set findDispatchJobs(DispatchHost host, public Set findLocalDispatchJobs(DispatchHost host) { LinkedHashSet result = new LinkedHashSet(5); long start = System.currentTimeMillis(); + + String query = handleInClause("str_os", FIND_JOBS_BY_LOCAL, host.getOs().length); + ArrayList args = new ArrayList(); + args.add(host.getHostId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.getHostId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_LOCAL, - PKJOB_MAPPER, - host.getHostId(), host.getFacilityId(), - host.os, host.getHostId(), host.getFacilityId(), host.os)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findLocalDispatchJobs query", System.currentTimeMillis() - start); return result; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 0546d4558..9e0f6f80c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -331,6 +331,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { frame.minGpuMemory = rs.getLong("int_gpu_mem_min"); frame.version = rs.getInt("int_version"); frame.services = rs.getString("str_services"); + frame.os = rs.getString("str_os"); return frame; } }; @@ -347,6 +348,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { "job.str_user,"+ "job.int_uid,"+ "job.str_log_dir,"+ + "job.str_os,"+ "frame.str_name AS frame_name, "+ "frame.str_state AS frame_state, "+ "frame.pk_frame, "+ diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java index 223737042..304fe474d 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java @@ -214,7 +214,7 @@ public DispatchHost mapRow(ResultSet rs, int rowNum) throws SQLException { host.isNimby = rs.getBoolean("b_nimby"); host.threadMode = rs.getInt("int_thread_mode"); host.tags = rs.getString("str_tags"); - host.os = rs.getString("str_os"); + host.setOs(rs.getString("str_os")); host.hardwareState = HardwareState.valueOf(rs.getString("str_state")); return host; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index f60b2c1e6..0779209b0 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -395,6 +395,7 @@ public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { .setNumGpus(proc.gpusReserved) .setStartTime(System.currentTimeMillis()) .setIgnoreNimby(proc.isLocalDispatch) + .setOs(proc.os) .putAllEnvironment(jobDao.getEnvironment(frame)) .putAllEnvironment(layerDao.getLayerEnvironment(frame)) .putEnvironment("CUE3", "1") diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index b0a7ccd9c..b91a867bb 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -254,8 +254,10 @@ public void handleHostReport(HostReport report, boolean isBoot) { bookingManager.removeInactiveLocalHostAssignment(lca); } } - - if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) { + + if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), + report.getHost().getFreeMcp(), + host.getOs())) { msg = String.format( "%s doesn't have enough free space in the temporary directory (mcp), %dMB", host.name, (report.getHost().getFreeMcp()/1024)); @@ -348,16 +350,19 @@ else if (!dispatchSupport.isCueBookable(host)) { * * @param tempTotalStorage Total storage on the temp directory * @param tempFreeStorage Free storage on the temp directory - * @param hostOs Reported os + * @param hostOs Reported operational systems * @return */ - private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStorage, String hostOs) { + private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStorage, String[] hostOs) { // The minimum amount of free space in the temporary directory to book a host int minAvailableTempPercentage = env.getRequiredProperty( "dispatcher.min_available_temp_storage_percentage", Integer.class); - return minAvailableTempPercentage == -1 || hostOs.equalsIgnoreCase(WINDOWS_OS) || - (((tempFreeStorage * 100.0) / tempTotalStorage) >= minAvailableTempPercentage); + return minAvailableTempPercentage == -1 + // It is safe to asume multiple OSs imply windows is not the base OS, + // threfore Windows will always report a single hostOs + || (hostOs.length == 1 && hostOs[0].equalsIgnoreCase(WINDOWS_OS)) + || (((tempFreeStorage * 100.0) / tempTotalStorage) >= minAvailableTempPercentage); } /** @@ -424,7 +429,10 @@ private boolean changeStateForTempDirStorage(DispatchHost host, RenderHost repor "dispatcher.min_available_temp_storage_percentage", Integer.class); // Prevent cue frames from booking on hosts with full temporary directories - boolean hasEnoughTempStorage = isTempDirStorageEnough(reportHost.getTotalMcp(), reportHost.getFreeMcp(), host.os); + boolean hasEnoughTempStorage = isTempDirStorageEnough( + reportHost.getTotalMcp(), + reportHost.getFreeMcp(), + host.getOs()); if (!hasEnoughTempStorage && host.hardwareState == HardwareState.UP) { // Insert a comment indicating that the Host status = Repair with reason = Full temporary directory CommentDetail c = new CommentDetail(); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java index 5b7eaee72..1ff849473 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java @@ -203,7 +203,7 @@ public void testFindNextDispatchFrameByProc() { assertNotNull(frame); assertEquals("0001-pass_1", frame.name); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -235,7 +235,7 @@ public void testFindNextDispatchFramesByProc() { DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -288,7 +288,7 @@ public void testFindNextDispatchFramesByProcAndJobLocal() { assertEquals(10, frames.size()); DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; proc.isLocalDispatch = true; @@ -310,7 +310,7 @@ public void testFindNextDispatchFramesByProcAndLayerLocal() { assertEquals(10, frames.size()); DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; proc.isLocalDispatch = true; @@ -406,7 +406,7 @@ public void testfindUnderProcedJob() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job1.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -442,7 +442,7 @@ public void testHigherPriorityJobExistsTrue() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -476,7 +476,7 @@ public void testHigherPriorityJobExistsFalse() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -511,7 +511,7 @@ public void testHigherPriorityJobExistsMaxProcBound() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -525,4 +525,46 @@ public void testHigherPriorityJobExistsMaxProcBound() { public void testFifoSchedulingEnabled() { assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.PRIORITY_ONLY); } + + @Test + @Transactional + @Rollback(true) + public void testFindDispatchJobsByShowMultiOs() { + DispatchHost host = getHost(); + // Set multiple Os and confirm jobs with Linux are still being found + final JobDetail job = getJob1(); + assertNotNull(job); + + // Host with different os + host.setOs("centos7,SomethingElse"); + Set jobs = dispatcherDao.findDispatchJobs(host, + adminManager.findShowEntity("pipe"), 5); + assertTrue(jobs.size() == 0); + + // Host with Linux Os (same as defined on spec) + host.setOs("centos7,Linux,rocky9"); + jobs = dispatcherDao.findDispatchJobs(host, + adminManager.findShowEntity("pipe"), 5); + assertTrue(jobs.size() > 0); + } + + @Test + @Transactional + @Rollback(true) + public void testFindDispatchJobsAllShowsMultiOs() { + DispatchHost host = getHost(); + // Set multiple Os and confirm jobs with Linux are still being found + final JobDetail job = getJob1(); + assertNotNull(job); + + // Host with incompatible OS shouldn't find any job + host.setOs("centos7,SomethingElse"); + Set jobs = dispatcherDao.findDispatchJobs(host, 5); + assertTrue(jobs.size() == 0); + + // Host with Linux Os (same as defined on spec) should find jobs + host.setOs("centos7,Linux,rocky9"); + jobs = dispatcherDao.findDispatchJobs(host, 5); + assertTrue(jobs.size() > 0); + } } diff --git a/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd b/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd new file mode 100644 index 000000000..8bbcbf6f1 --- /dev/null +++ b/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd @@ -0,0 +1,104 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml b/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml index 2c372eff2..b656f499f 100644 --- a/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml +++ b/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml @@ -18,7 +18,7 @@ - + spi @@ -30,9 +30,10 @@ false + Linux - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 @@ -44,7 +45,7 @@ - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 @@ -61,9 +62,10 @@ false + Linux - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 diff --git a/proto/rqd.proto b/proto/rqd.proto index f6e0d8790..8d1946981 100644 --- a/proto/rqd.proto +++ b/proto/rqd.proto @@ -112,6 +112,7 @@ message RunFrame { map attributes = 22; int32 num_gpus = 23; report.ChildrenProcStats children = 24; + string os = 25; } message RunFrameSeq { diff --git a/rqd/rqd.example.conf b/rqd/rqd.example.conf index 22e260ae9..4369236dc 100644 --- a/rqd/rqd.example.conf +++ b/rqd/rqd.example.conf @@ -29,9 +29,18 @@ MAYA_SCRIPT_PATH PIXAR_LICENSE_FILE [docker.config] -DOCKER_IMAGE="" +# Setting this to True requires all the additional "docker.[]" sections to be filled RUN_ON_DOCKER=False +# This section is only required if RUN_ON_DOCKER=True +# List of volume mounts following docker run's format, but replacing = with : [docker.mounts] TEMP=type:bind,source:/tmp,target:/tmp,bind-propagation:slave -NET=type:bind,source:/net,target:/net,bind-propagation:slave \ No newline at end of file +NET=type:bind,source:/net,target:/net,bind-propagation:slave + +# This section is only required if RUN_ON_DOCKER=True +# - keys represent OSs this rqd is capable of executing jobs in +# - values are docker image tags +[docker.images] +centos7=centos7.3:latest +rocky9=rocky9.3:latest diff --git a/rqd/rqd/rqconstants.py b/rqd/rqd/rqconstants.py index 80b9bb29b..beba053fb 100644 --- a/rqd/rqd/rqconstants.py +++ b/rqd/rqd/rqconstants.py @@ -1,4 +1,5 @@ # Copyright Contributors to the OpenCue Project +# Copyright Contributors to the OpenCue Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -155,7 +156,7 @@ # Docker mode config RUN_ON_DOCKER = False -DOCKER_IMAGE = "Invalid" +DOCKER_IMAGES = {} DOCKER_MOUNTS = [] try: @@ -163,8 +164,6 @@ # Hostname can come from here: rqutil.getHostname() __override_section = "Override" __host_env_var_section = "UseHostEnvVar" - __docker_mounts = "docker.mounts" - __docker_config = "docker.config" import six from six.moves import configparser if six.PY2: @@ -237,6 +236,10 @@ if config.has_section(__host_env_var_section): RQD_HOST_ENV_VARS = config.options(__host_env_var_section) + __docker_mounts = "docker.mounts" + __docker_config = "docker.config" + __docker_images = "docker.images" + if config.has_section(__docker_config): RUN_ON_DOCKER = config.getboolean(__docker_config, "RUN_ON_DOCKER") if RUN_ON_DOCKER: @@ -248,6 +251,31 @@ RQD_UID = 0 RQD_GID = 0 + # Every key:value on the config file under docker.images + # is parsed as key=SP_OS and value=image_tag. + # SP_OS is set to a list of all available keys + # For example: + # + # rqd.conf + # [docker.images] + # centos7=centos7.3:latest + # rocky9=rocky9.3:latest + # + # becomes: + # SP_OS=centos7,rocky9 + # DOCKER_IMAGES={ + # "centos7": "centos7.3:latest", + # "rocky9": "rocky9.3:latest" + # } + keys = config.options(__docker_images) + DOCKER_IMAGES = {} + for key in keys: + DOCKER_IMAGES[key] = config.get(__docker_images, key) + SP_OS = ",".join(keys) + if not DOCKER_IMAGES: + raise RuntimeError("Misconfigured rqd. RUN_ON_DOCKER=True requires at " + "least one image on DOCKER_IMAGES ([docker.images] section of rqd.conf)") + def parse_mount(mount_str): """ Parse mount definitions similar to a docker run command into a docker @@ -263,7 +291,6 @@ def parse_mount(mount_str): mount_dict[key.strip()] = value.strip() return mount_dict - DOCKER_IMAGE = config.get(__docker_config, "DOCKER_IMAGE") # Parse values under the category docker.mounts into Mount objects mounts = config.options(__docker_mounts) for mount_name in mounts: diff --git a/rqd/rqd/rqcore.py b/rqd/rqd/rqcore.py index a328bd374..4de489e1d 100644 --- a/rqd/rqd/rqcore.py +++ b/rqd/rqd/rqcore.py @@ -91,12 +91,12 @@ def __init__(self, optNimbyoff=False): self.docker_client = None self.docker_mounts = [] - self.docker_image = "Invalid" + self.docker_images = {} if rqd.rqconstants.RUN_ON_DOCKER: # pylint: disable=import-outside-toplevel import docker self.docker_client = docker.from_env() - self.docker_image = rqd.rqconstants.DOCKER_IMAGE + self.docker_images = rqd.rqconstants.DOCKER_IMAGES self.docker_mounts = rqd.rqconstants.DOCKER_MOUNTS signal.signal(signal.SIGINT, self.handleExit) @@ -222,9 +222,9 @@ def deleteFrame(self, frameId): self.cores.reserved_cores) # pylint: disable=no-member self.cores.reserved_cores.clear() - log.info("Successfully delete frame with Id: %s", frameId) - else: - log.warning("Frame with Id: %s not found in cache", frameId) + log.info("Successfully delete frame with Id: %s", frameId) + else: + log.warning("Frame with Id: %s not found in cache", frameId) def killAllFrame(self, reason): """Will execute .kill() on every frame in cache until no frames remain @@ -936,14 +936,25 @@ def runDocker(self): frameInfo = self.frameInfo runFrame = self.runFrame - # TODO: implement support for multiple images - # requires adding `string os = 25;` to rqd.proto/RunFrame - # - # image = self.rqCore.docker_images.get(runFrame.os) - # if image is None: - # raise RuntimeError("rqd not configured to run an - # image for this frame OS: %s", runFrame.os) - image = self.rqCore.docker_image + if runFrame.os: + image = self.rqCore.docker_images.get(runFrame.os) + if image is None: + self.__writeHeader() + msg = ("This rqd is not configured to run an image " + "for this frame OS: %s. Check the [docker.images] " + "section of rqd.conf for more information." % runFrame.os) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + raise RuntimeError(msg) + elif self.rqCore.docker_images: + # If a frame doesn't require an specic OS, default to the first configured OS on + # [docker.images] + image = list(self.rqCore.docker_images.values)[0] + else: + self.__writeHeader() + msg = ("Misconfigured rqd. RUN_ON_DOCKER=True requires at " + "least one image on DOCKER_IMAGES ([docker.images] section of rqd.conf)") + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + raise RuntimeError(msg) self.__createEnvVariables() self.__writeHeader() diff --git a/rqd/rqd/rqutil.py b/rqd/rqd/rqutil.py index ce1964f08..3d8abc964 100644 --- a/rqd/rqd/rqutil.py +++ b/rqd/rqd/rqutil.py @@ -159,8 +159,7 @@ def checkAndCreateUser(username, uid=None, gid=None): subprocess.check_call(cmd) # pylint: disable=broad-except except Exception: - logging.exception("useradd failed to add user: %s. User possibly already exists.", - username) + logging.info("useradd failed to add user: %s. User possibly already exists.", username) finally: permissionsLow()