Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cuebot] Move dispatcher memory properties to opencue.properties #1570

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public class ServiceEntity extends Entity {
/**
* Determines the default minimum memory per frame.
*/
public long minMemory = Dispatcher.MEM_RESERVED_DEFAULT;
public long minMemory = Dispatcher.MEM_SERVICE_RESERVED_DEFAULT;

/**
* Determines the default minimum gpu per frame.
*/
public long minGpuMemory = Dispatcher.MEM_GPU_RESERVED_DEFAULT;
public long minGpuMemory = Dispatcher.MEM_SERVICE_GPU_RESERVED_DEFAULT;

/**
* Determines the default tags.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ else if (proc.coresReserved >= 100) {
proc.coresReserved = wholeCores * 100;
} else {
if (frame.threadable) {
if (selfishServices != null &&
if (selfishServices != null &&
frame.services != null &&
containsSelfishService(frame.services.split(","), selfishServices)){
proc.coresReserved = wholeCores * 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.CallableStatementCreator;
import org.springframework.jdbc.core.RowMapper;
Expand Down Expand Up @@ -58,6 +60,9 @@

public class HostDaoJdbc extends JdbcDaoSupport implements HostDao {

@Autowired
private Environment env;

public static final RowMapper<HostEntity> HOST_DETAIL_MAPPER = new RowMapper<HostEntity>() {
public HostEntity mapRow(ResultSet rs, int rowNum) throws SQLException {
HostEntity host = new HostEntity();
Expand Down Expand Up @@ -324,9 +329,12 @@ public void insertRenderHost(RenderHost host, AllocationInterface a, boolean use
}

long memUnits = convertMemoryUnits(host);
if (memUnits < Dispatcher.MEM_RESERVED_MIN) {
long memReserverMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);
if (memUnits < memReserverMin) {
throw new EntityCreationError("could not create host " + host.getName() + ", " +
" must have at least " + Dispatcher.MEM_RESERVED_MIN + " free memory.");
" must have at least " + memReserverMin + " free memory.");
}

String fqdn;
Expand Down Expand Up @@ -727,18 +735,21 @@ private long convertMemoryUnits(RenderHost host) {

long memUnits;
if (host.getTagsList().contains("64bit")) {
memUnits = CueUtil.convertKbToFakeKb64bit(host.getTotalMem());
memUnits = CueUtil.convertKbToFakeKb64bit(env, host.getTotalMem());
}
else {
memUnits = CueUtil.convertKbToFakeKb32bit(host.getTotalMem());
memUnits = CueUtil.convertKbToFakeKb32bit(env, host.getTotalMem());
}

/*
* If this is a desktop, we'll just cut the memory
* so we don't annoy the user.
*/
if (host.getNimbyEnabled()) {
memUnits = (long) (memUnits / 1.5) + Dispatcher.MEM_RESERVED_SYSTEM;
long memReservedSystem = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_system",
Long.class);
memUnits = (long) (memUnits / 1.5) + memReservedSystem;
}

return memUnits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;

import com.imageworks.spcue.ExecutionSummary;
import com.imageworks.spcue.FrameStateTotals;
Expand All @@ -41,7 +43,6 @@
import com.imageworks.spcue.LayerEntity;
import com.imageworks.spcue.LayerInterface;
import com.imageworks.spcue.LimitEntity;
import com.imageworks.spcue.LimitInterface;
import com.imageworks.spcue.ResourceUsage;
import com.imageworks.spcue.ThreadStats;
import com.imageworks.spcue.dao.LayerDao;
Expand All @@ -56,6 +57,7 @@
import org.apache.logging.log4j.LogManager;

public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
private final long MEM_RESERVED_MIN;
private static final Logger logger = LogManager.getLogger(LayerDaoJdbc.class);
private static final String INSERT_OUTPUT_PATH =
"INSERT INTO " +
Expand All @@ -67,6 +69,14 @@ public class LayerDaoJdbc extends JdbcDaoSupport implements LayerDao {
"str_filespec " +
") VALUES (?,?,?,?)";

@Autowired
public LayerDaoJdbc(Environment env) {
this.MEM_RESERVED_MIN = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class
);
}

@Override
public void insertLayerOutput(LayerInterface layer, String filespec) {
getJdbcTemplate().update(
Expand Down Expand Up @@ -341,8 +351,8 @@ public void insertLayerDetail(LayerDetail l) {

@Override
public void updateLayerMinMemory(LayerInterface layer, long val) {
if (val < Dispatcher.MEM_RESERVED_MIN) {
val = Dispatcher.MEM_RESERVED_MIN;
if (val < MEM_RESERVED_MIN) {
val = MEM_RESERVED_MIN;
}
getJdbcTemplate().update("UPDATE layer SET int_mem_min=? WHERE pk_layer=?",
val, layer.getLayerId());
Expand Down Expand Up @@ -380,8 +390,8 @@ public boolean balanceLayerMinMemory(LayerInterface layer, long frameMaxRss) {
if (maxrss < frameMaxRss) {
maxrss = frameMaxRss;
}
if (maxrss < Dispatcher.MEM_RESERVED_MIN) {
maxrss = Dispatcher.MEM_RESERVED_MIN;
if (maxrss < MEM_RESERVED_MIN) {
maxrss = MEM_RESERVED_MIN;
} else {
maxrss = maxrss + CueUtil.MB256;
}
Expand Down Expand Up @@ -603,11 +613,11 @@ public long findPastMaxRSS(JobInterface job, String name) {
try {
long maxRss = getJdbcTemplate().queryForObject(FIND_PAST_MAX_RSS,
Long.class, job.getJobId(), name);
if (maxRss >= Dispatcher.MEM_RESERVED_MIN) {
if (maxRss >= MEM_RESERVED_MIN) {
return maxRss;
}
else {
return Dispatcher.MEM_RESERVED_MIN;
return MEM_RESERVED_MIN;
}
} catch (EmptyResultDataAccessException e) {
// Actually want to return 0 here, which means
Expand All @@ -625,8 +635,8 @@ public void updateTags(JobInterface job, String tags, LayerType type) {

@Override
public void updateMinMemory(JobInterface job, long mem, LayerType type) {
if (mem < Dispatcher.MEM_RESERVED_MIN) {
mem = Dispatcher.MEM_RESERVED_MIN;
if (mem < MEM_RESERVED_MIN) {
mem = MEM_RESERVED_MIN;
}
getJdbcTemplate().update(
"UPDATE layer SET int_mem_min=? WHERE pk_job=? AND str_type=?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.RowMapper;
Expand All @@ -45,14 +47,16 @@
import com.imageworks.spcue.dao.ProcDao;
import com.imageworks.spcue.dao.criteria.FrameSearchInterface;
import com.imageworks.spcue.dao.criteria.ProcSearchInterface;
import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.dispatcher.ResourceDuplicationFailureException;
import com.imageworks.spcue.dispatcher.ResourceReservationFailureException;
import com.imageworks.spcue.grpc.host.HardwareState;
import com.imageworks.spcue.util.SqlUtil;

public class ProcDaoJdbc extends JdbcDaoSupport implements ProcDao {

@Autowired
private Environment env;

private static final String VERIFY_RUNNING_PROC =
"SELECT " +
"proc.pk_frame " +
Expand Down Expand Up @@ -121,15 +125,21 @@ public boolean deleteVirtualProc(VirtualProc proc) {

public void insertVirtualProc(VirtualProc proc) {
proc.id = SqlUtil.genKeyRandom();
long memReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);
long memGpuReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_gpu_reserved_min",
Long.class);
int result = 0;
try {
result = getJdbcTemplate().update(INSERT_VIRTUAL_PROC,
proc.getProcId(), proc.getHostId(), proc.getShowId(),
proc.getLayerId(), proc.getJobId(), proc.getFrameId(),
proc.coresReserved, proc.memoryReserved,
proc.memoryReserved, Dispatcher.MEM_RESERVED_MIN,
proc.memoryReserved, memReservedMin,
proc.gpusReserved, proc.gpuMemoryReserved,
proc.gpuMemoryReserved, Dispatcher.MEM_GPU_RESERVED_MIN,
proc.gpuMemoryReserved, memGpuReservedMin,
proc.isLocalDispatch);

// Update all of the resource counts
Expand Down Expand Up @@ -634,7 +644,10 @@ public boolean balanceUnderUtilizedProcs(ProcInterface targetProc, long targetMe
for (Map<String,Object> map: result) {
String pk_proc = (String) map.get("pk_proc");
Long free_mem = (Long) map.get("free_mem");
long available = free_mem - borrowMap.get(pk_proc) - Dispatcher.MEM_RESERVED_MIN;
long memReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);
long available = free_mem - borrowMap.get(pk_proc) - memReservedMin;
if (available > memPerFrame) {
borrowMap.put(pk_proc, borrowMap.get(pk_proc) + memPerFrame);
memBorrowedTotal = memBorrowedTotal + memPerFrame;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ public class CoreUnitDispatcher implements Dispatcher {

public boolean testMode = false;

@Autowired
private final long MEM_RESERVED_MIN;
private final long MEM_GPU_RESERVED_DEFAULT;
private final long MEM_GPU_RESERVED_MIN;

private Environment env;

/*
Expand All @@ -108,13 +111,28 @@ public class CoreUnitDispatcher implements Dispatcher {
*/
private Cache<String, String> jobLock;

@Autowired
public CoreUnitDispatcher(Environment env) {
this.env = env;
MEM_RESERVED_MIN = getLongProperty("dispatcher.memory.mem_reserved_min");
MEM_GPU_RESERVED_DEFAULT = getLongProperty("dispatcher.memory.mem_gpu_reserved_default");
MEM_GPU_RESERVED_MIN = getLongProperty("dispatcher.memory.mem_gpu_reserved_min");
}

/*
* Return an integer value from the opencue.properties given a key
*/
private int getIntProperty(String property) {
return env.getRequiredProperty(property, Integer.class);
}

/*
* Return an integer value from the opencue.properties given a key
*/
private long getLongProperty(String property) {
return env.getRequiredProperty(property, Long.class);
}

private Cache<String, String> getOrCreateJobLock() {
if (jobLock == null) {
this.jobLock = CacheBuilder.newBuilder()
Expand All @@ -134,10 +152,10 @@ private List<VirtualProc> dispatchJobs(DispatchHost host, Set<String> jobs) {
for (String jobid: jobs) {

if (!host.hasAdditionalResources(
Dispatcher.CORE_POINTS_RESERVED_MIN,
Dispatcher.MEM_RESERVED_MIN,
Dispatcher.GPU_UNITS_RESERVED_MIN,
Dispatcher.MEM_GPU_RESERVED_MIN)) {
CORE_POINTS_RESERVED_MIN,
MEM_RESERVED_MIN,
GPU_UNITS_RESERVED_MIN,
MEM_GPU_RESERVED_MIN)) {
return procs;
}

Expand Down Expand Up @@ -174,15 +192,13 @@ private List<VirtualProc> dispatchJobs(DispatchHost host, Set<String> jobs) {
private Set<String> getGpuJobs(DispatchHost host, ShowInterface show) {
Set<String> jobs = null;

// TODO: GPU: make index with the 4 components instead of just 3, replace the just 3

// If the host has gpu idle, first do a query to find gpu jobs
// If no gpu jobs found remove resources to leave room for a gpu frame
if (host.hasAdditionalResources(
Dispatcher.CORE_POINTS_RESERVED_DEFAULT,
Dispatcher.MEM_RESERVED_MIN,
this.MEM_RESERVED_MIN,
Dispatcher.GPU_UNITS_RESERVED_DEFAULT,
Dispatcher.MEM_GPU_RESERVED_DEFAULT)) {
this.MEM_GPU_RESERVED_DEFAULT)) {
if (show == null)
jobs = dispatchSupport.findDispatchJobs(host,
getIntProperty("dispatcher.job_query_max"));
Expand Down Expand Up @@ -312,9 +328,9 @@ public void wrapDispatchFrame() {
host.useResources(proc.coresReserved, proc.memoryReserved, proc.gpusReserved, proc.gpuMemoryReserved);
if (!host.hasAdditionalResources(
Dispatcher.CORE_POINTS_RESERVED_MIN,
Dispatcher.MEM_RESERVED_MIN,
MEM_RESERVED_MIN,
Dispatcher.GPU_UNITS_RESERVED_MIN,
Dispatcher.MEM_GPU_RESERVED_MIN)) {
MEM_GPU_RESERVED_MIN)) {
break;
}
else if (procs.size() >= getIntProperty("dispatcher.job_frame_dispatch_max")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,15 @@ public interface Dispatcher {
// on the host.
public static final int CORE_LOAD_THRESHOLD = 5;


// The default amount of memory reserved for a frame if no memory
// reservation settings are specified
public static final long MEM_RESERVED_DEFAULT = 3355443;

// The maximum amount of memory that can be requested for a given frame.
public static final long MEM_RESERVED_MAX = CueUtil.GB * 50;

// The minimum amount of memory that can be assigned to a frame.
public static final long MEM_RESERVED_MIN = 262144;

// Memory reserved by system, gets chopped off the available memory
public static final long MEM_RESERVED_SYSTEM = 524288;

// Amount of memory that has to be idle for the rest of the cores
// on the machine to be considered stranded.
public static final long MEM_STRANDED_THRESHHOLD = CueUtil.GB + CueUtil.MB512;

// The default amount of gpu memory reserved for a frame if no gpu memory
// reservation settings are specified
public static final long MEM_GPU_RESERVED_DEFAULT = 0;

// The minimum amount of gpu memory that can be assigned to a frame.
public static final long MEM_GPU_RESERVED_MIN = 0;
// Determines the service default minimum memory per frame.
public static final long MEM_SERVICE_RESERVED_DEFAULT = CueUtil.GB4;

// The maximum amount of gpu memory that can be assigned to a frame.
public static final long MEM_GPU_RESERVED_MAX = CueUtil.GB * 1024;
// Determines the service default minimum gpu per frame.
public static final long MEM_SERVICE_GPU_RESERVED_DEFAULT = 0;

// Return value for cleared frame
public static final int EXIT_STATUS_FRAME_CLEARED = 299;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ public void handleHostReport(HostReport report, boolean isBoot) {
bookingManager.removeInactiveLocalHostAssignment(lca);
}
}

long memReservedMin = env.getRequiredProperty(
"dispatcher.memory.mem_reserved_min",
Long.class);

if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) {
msg = String.format(
"%s doesn't have enough free space in the temporary directory (mcp), %dMB",
Expand All @@ -264,13 +267,13 @@ else if (coresToReserve <= 0 || host.idleCores < Dispatcher.CORE_POINTS_RESERVED
msg = String.format("%s doesn't have enough idle cores, %d needs %d",
host.name, host.idleCores, Dispatcher.CORE_POINTS_RESERVED_MIN);
}
else if (host.idleMemory < Dispatcher.MEM_RESERVED_MIN) {
else if (host.idleMemory < memReservedMin) {
msg = String.format("%s doesn't have enough idle memory, %d needs %d",
host.name, host.idleMemory, Dispatcher.MEM_RESERVED_MIN);
host.name, host.idleMemory, memReservedMin);
}
else if (report.getHost().getFreeMem() < CueUtil.MB512) {
msg = String.format("%s doesn't have enough free system mem, %d needs %d",
host.name, report.getHost().getFreeMem(), Dispatcher.MEM_RESERVED_MIN);
host.name, report.getHost().getFreeMem(), memReservedMin);
}
else if(!host.hardwareState.equals(HardwareState.UP)) {
msg = host + " is not in the Up state.";
Expand Down
Loading
Loading