diff --git a/VERSION.in b/VERSION.in index 61d2f3576..c74e8a041 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -0.34 +0.35 diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java index 206b19e22..dcdf8d097 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java @@ -56,14 +56,6 @@ public interface ProcDao { long getReservedGpuMemory(ProcInterface proc); - /** - * Return the proc that has exceeded its reserved memory by the largest factor. - * - * @param host - * @return - */ - VirtualProc getWorstMemoryOffender(HostInterface host); - /** * Removes a little bit of reserved memory from every other running frame * in order to give some to the target proc. @@ -151,7 +143,8 @@ public interface ProcDao { */ void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss, long vsize, long maxVsize, long usedGpuMemory, - long maxUsedGpuMemory, byte[] children); + long maxUsedGpuMemory, long usedSwapMemory, + byte[] children); /** * get aq virual proc from its unique id diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java index 586d1f1df..ecf39caf7 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java @@ -30,9 +30,9 @@ import java.util.Map; import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.PreparedStatementCreator; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.support.JdbcDaoSupport; -import org.springframework.jdbc.core.PreparedStatementCreator; import com.imageworks.spcue.FrameInterface; import com.imageworks.spcue.HostInterface; @@ -240,6 +240,7 @@ public boolean clearVirtualProcAssignment(FrameInterface frame) { "int_virt_max_used = ?, " + "int_gpu_mem_used = ?, " + "int_gpu_mem_max_used = ?, " + + "int_swap_used = ?, " + "bytea_children = ?, " + "ts_ping = current_timestamp " + "WHERE " + @@ -247,7 +248,8 @@ public boolean clearVirtualProcAssignment(FrameInterface frame) { @Override public void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss, - long vss, long maxVss, long usedGpuMemory, long maxUsedGpuMemory, byte[] children) { + long vss, long maxVss, long usedGpuMemory, long maxUsedGpuMemory, + long usedSwapMemory, byte[] children) { /* * This method is going to repeat for a proc every 1 minute, so * if the proc is being touched by another thread, then return @@ -274,8 +276,9 @@ public PreparedStatement createPreparedStatement(Connection conn) updateProc.setLong(4, maxVss); updateProc.setLong(5, usedGpuMemory); updateProc.setLong(6, maxUsedGpuMemory); - updateProc.setBytes(7, children); - updateProc.setString(8, f.getFrameId()); + updateProc.setLong(7, usedSwapMemory); + updateProc.setBytes(8, children); + updateProc.setString(9, f.getFrameId()); return updateProc; } }); @@ -569,49 +572,6 @@ public boolean increaseReservedMemory(ProcInterface p, long value) { } } - private static final String FIND_WORST_MEMORY_OFFENDER = - "SELECT " + - "pk_proc, " + - "pk_host, " + - "pk_show, "+ - "pk_job, "+ - "pk_layer,"+ - "pk_frame,"+ - "b_unbooked,"+ - "b_local, "+ - "pk_alloc, "+ - "pk_facility, " + - "int_cores_reserved,"+ - "int_mem_reserved," + - "int_mem_max_used,"+ - "int_mem_used,"+ - "int_gpus_reserved," + - "int_gpu_mem_reserved," + - "int_gpu_mem_max_used," + - "int_gpu_mem_used," + - "int_virt_max_used,"+ - "int_virt_used,"+ - "host_name, " + - "str_os, " + - "bytea_children " + - "FROM (" - + GET_VIRTUAL_PROC + " " + - "AND " + - "host.pk_host = ? " + - "AND " + - "proc.int_mem_reserved != 0 " + - "AND " + - "proc.int_virt_used >= proc.int_mem_pre_reserved " + - "ORDER BY " + - "proc.int_virt_used / proc.int_mem_pre_reserved DESC " + - ") AS t1 LIMIT 1"; - - @Override - public VirtualProc getWorstMemoryOffender(HostInterface host) { - return getJdbcTemplate().queryForObject(FIND_WORST_MEMORY_OFFENDER, - VIRTUAL_PROC_MAPPER, host.getHostId()); - } - public long getReservedMemory(ProcInterface proc) { return getJdbcTemplate().queryForObject( "SELECT int_mem_reserved FROM proc WHERE pk_proc=?", diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java index ffb4a7a34..106d413ce 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java @@ -437,7 +437,8 @@ void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRs */ void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long vsize, long maxVsize, long usedGpuMemory, - long maxUsedGpuMemory, byte[] children); + long maxUsedGpuMemory, long usedSwapMemory, + byte[] children); /** * Return true if adding the given core units would put the show diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 3d7211585..f60b2c1e6 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -214,12 +214,12 @@ public void runFrame(VirtualProc proc, DispatchFrame frame) { " could not be booked on " + frame.getName() + ", " + e); } } - + @Override @Transactional(propagation = Propagation.REQUIRED) public void startFrameAndProc(VirtualProc proc, DispatchFrame frame) { logger.trace("starting frame: " + frame); - + frameDao.updateFrameStarted(proc, frame); reserveProc(proc, frame); @@ -571,9 +571,11 @@ public void lostProc(VirtualProc proc, String reason, int exitStatus) { @Transactional(propagation = Propagation.REQUIRED) public void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss, long vsize, long maxVsize, long usedGpuMemory, - long maxUsedGpuMemory, byte[] children) { + long maxUsedGpuMemory, long usedSwapMemory, + byte[] children) { procDao.updateProcMemoryUsage(frame, rss, maxRss, vsize, maxVsize, - usedGpuMemory, maxUsedGpuMemory, children); + usedGpuMemory, maxUsedGpuMemory, usedSwapMemory, + children); } @Override diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 777186086..46d56929f 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -21,6 +21,7 @@ import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -339,11 +340,11 @@ else if (!dispatchSupport.isCueBookable(host)) { /** * Check if a reported temp storage size and availability is enough for running a job - * + * * Use dispatcher.min_available_temp_storage_percentage (opencue.properties) to * define what's the accepted threshold. Providing hostOs is necessary as this feature * is currently not available on Windows hosts - * + * * @param tempTotalStorage Total storage on the temp directory * @param tempFreeStorage Free storage on the temp directory * @param hostOs Reported os @@ -371,7 +372,7 @@ private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStora * @param reportState * @param isBoot */ - private void changeHardwareState(DispatchHost host, HardwareState reportState, boolean isBoot) { + private void changeHardwareState(DispatchHost host, HardwareState reportState, boolean isBoot) { // If the states are the same there is no reason to do this update. if (host.hardwareState.equals(reportState)) { return; @@ -411,7 +412,7 @@ private void changeHardwareState(DispatchHost host, HardwareState reportState, b * - Set the host state to UP, when the amount of free space in the temporary directory * is greater or equal to the minimum required and the host has a comment with * subject: SUBJECT_COMMENT_FULL_TEMP_DIR - * + * * @param host * @param reportHost * @return @@ -499,47 +500,88 @@ private void changeLockState(DispatchHost host, CoreDetail coreInfo) { } /** - * Prevent host from entering an OOM state where oom-killer might start killing important OS processes. + * Prevent host from entering an OOM state where oom-killer might start killing + * important OS processes and frames start using SWAP memory * The kill logic will kick in one of the following conditions is met: - * - Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available - * - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved - * For frames that are using more than they had reserved but not above the threshold, negotiate expanding - * the reservations with other frames on the same host - * + * - Host has less than oom_max_safe_used_physical_memory_threshold memory + * available and less than oom_max_safe_used_swap_memory_threshold swap + * available + * - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had + * reserved + * For frames that are using more than they had reserved but not above the + * threshold, negotiate expanding the reservations with other frames on the same + * host + * * @param dispatchHost * @param report */ private void handleMemoryUsage(final DispatchHost dispatchHost, RenderHost renderHost, List runningFrames) { - // Don't keep memory balances on nimby hosts - if (dispatchHost.isNimby) { + // Don't keep memory balances on nimby hosts and hosts with invalid memory + // information + if (dispatchHost.isNimby || renderHost.getTotalMem() <= 0) { return; } - final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = env - .getRequiredProperty("dispatcher.oom_max_safe_used_memory_threshold", Double.class); + final double OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD = env + .getRequiredProperty("dispatcher.oom_max_safe_used_physical_memory_threshold", Double.class); + final double OOM_MAX_SAFE_USED_SWAP_THRESHOLD = env + .getRequiredProperty("dispatcher.oom_max_safe_used_swap_memory_threshold", Double.class); final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = env .getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class); - boolean memoryWarning = renderHost.getTotalMem() > 0 && - ((double)renderHost.getFreeMem()/renderHost.getTotalMem() < - (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); + Double physMemoryUsageRatio = renderHost.getTotalMem() > 0 ? + 1.0 - renderHost.getFreeMem() / (double) renderHost.getTotalMem() : + 0.0; + + Double swapMemoryUsageRatio = renderHost.getTotalSwap() > 0 ? + 1.0 - renderHost.getFreeSwap() / (double) renderHost.getTotalSwap() : + 0.0; + + // If checking for the swap threshold has been disabled, only memory usage is + // taken into consideration. + // If checking for memory has been disabled, checking for swap isolated is not + // safe, therefore disabled + boolean memoryWarning = false; + if (OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD > 0.0 && OOM_MAX_SAFE_USED_SWAP_THRESHOLD > 0.0 && + !physMemoryUsageRatio.isNaN() && !swapMemoryUsageRatio.isNaN()) { + memoryWarning = physMemoryUsageRatio > OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD && + swapMemoryUsageRatio > OOM_MAX_SAFE_USED_SWAP_THRESHOLD; + } else if (OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD > 0.0 && !physMemoryUsageRatio.isNaN()) { + memoryWarning = physMemoryUsageRatio > OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD; + } if (memoryWarning) { - long memoryAvailable = renderHost.getFreeMem(); - long minSafeMemoryAvailable = (long)(renderHost.getTotalMem() * (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); - // Only allow killing up to 10 frames at a time - int killAttemptsRemaining = 10; - VirtualProc killedProc = null; - do { - killedProc = killWorstMemoryOffender(dispatchHost); + logger.warn("Memory warning(" + renderHost.getName() + "): physMemoryRatio: " + + physMemoryUsageRatio + ", swapRatio: " + swapMemoryUsageRatio); + // Try to kill frames using swap memory as they are probably performing poorly + long swapUsed = renderHost.getTotalSwap() - renderHost.getFreeSwap(); + long maxSwapUsageAllowed = (long) (renderHost.getTotalSwap() + * OOM_MAX_SAFE_USED_SWAP_THRESHOLD); + + // Sort runningFrames bassed on how much swap they are using + runningFrames.sort(Comparator.comparingLong((RunningFrameInfo frame) -> + frame.getUsedSwapMemory()).reversed()); + + int killAttemptsRemaining = 5; + for (RunningFrameInfo frame : runningFrames) { + // Reached the first frame on the sorted list without swap usage + if (frame.getUsedSwapMemory() <= 0) { + break; + } + if (killProcForMemory(frame.getFrameId(), renderHost.getName(), + KillCause.HostUnderOom)) { + swapUsed -= frame.getUsedSwapMemory(); + logger.info("Memory warning(" + renderHost.getName() + "): " + + "Killing frame on " + frame.getJobName() + "." + + frame.getFrameName() + ", using too much swap."); + } + killAttemptsRemaining -= 1; - if (killedProc != null) { - memoryAvailable = memoryAvailable + killedProc.memoryUsed; + if (killAttemptsRemaining <= 0 || swapUsed <= maxSwapUsageAllowed) { + break; } - } while (killAttemptsRemaining > 0 && - memoryAvailable < minSafeMemoryAvailable && - killedProc != null); + } } else { // When no mass cleaning was required, check for frames going overboard // if frames didn't go overboard, manage its reservations trying to increase @@ -582,10 +624,12 @@ private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname if (proc.isLocalDispatch) { return false; } - - logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() + - ", using too much memory."); - return killProcForMemory(proc, hostname, KillCause.FrameOverboard); + boolean killed = killProcForMemory(proc.frameId, hostname, KillCause.FrameOverboard); + if (killed) { + logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() + + ", using too much memory."); + } + return killed; } catch (EmptyResultDataAccessException e) { return false; } @@ -627,12 +671,12 @@ private boolean getKillClearance(String hostname, String frameId) { return true; } - private boolean killProcForMemory(VirtualProc proc, String hostname, KillCause killCause) { - if (!getKillClearance(hostname, proc.frameId)) { + private boolean killProcForMemory(String frameId, String hostname, KillCause killCause) { + if (!getKillClearance(hostname, frameId)) { return false; } - FrameInterface frame = jobManager.getFrame(proc.frameId); + FrameInterface frame = jobManager.getFrame(frameId); if (dispatcher.isTestMode()) { // Different threads don't share the same database state on the test environment (new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, @@ -674,28 +718,6 @@ private boolean killFrame(String frameId, String hostname, KillCause killCause) return true; } - /** - * Kill proc with the worst user/reserved memory ratio. - * - * @param host - * @return killed proc, or null if none could be found or failed to be killed - */ - private VirtualProc killWorstMemoryOffender(final DispatchHost host) { - try { - VirtualProc proc = hostManager.getWorstMemoryOffender(host); - logger.info("Killing frame on " + proc.getName() + ", host is under stress."); - - if (!killProcForMemory(proc, host.getName(), KillCause.HostUnderOom)) { - proc = null; - } - return proc; - } - catch (EmptyResultDataAccessException e) { - logger.error(host.name + " is under OOM and no proc is memory overboard."); - return null; - } - } - /** * Check frame memory usage comparing the amount used with the amount it had reserved * @param frame @@ -825,7 +847,6 @@ private void killTimedOutFrames(List runningFrames, String hos private void updateMemoryUsageAndLluTime(List rFrames) { for (RunningFrameInfo rf: rFrames) { - FrameInterface frame = jobManager.getFrame(rf.getFrameId()); dispatchSupport.updateFrameMemoryUsageAndLluTime(frame, @@ -833,7 +854,9 @@ private void updateMemoryUsageAndLluTime(List rFrames) { dispatchSupport.updateProcMemoryUsage(frame, rf.getRss(), rf.getMaxRss(), rf.getVsize(), rf.getMaxVsize(), rf.getUsedGpuMemory(), - rf.getMaxUsedGpuMemory(), rf.getChildren().toByteArray()); + rf.getMaxUsedGpuMemory(), rf.getUsedSwapMemory(), + rf.getChildren().toByteArray()); + } updateJobMemoryUsage(rFrames); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java index ce5f861f8..dae9bf552 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java @@ -172,13 +172,6 @@ void setHostStatistics(HostInterface host, void unbookVirtualProcs(List procs); void unbookProc(ProcInterface proc); - /** - * For a given host, return the proc using more memory above what it had initially reserved - * @param h - * @return - */ - VirtualProc getWorstMemoryOffender(HostInterface h); - /** * Return the Virtual proc with the specified unique ID. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java index 36de34a1c..6abb08090 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java @@ -337,12 +337,6 @@ public void setHostResources(DispatchHost host, HostReport report) { hostDao.updateHostResources(host, report); } - @Override - @Transactional(propagation = Propagation.REQUIRED, readOnly=true) - public VirtualProc getWorstMemoryOffender(HostInterface h) { - return procDao.getWorstMemoryOffender(h); - } - @Override @Transactional(propagation = Propagation.REQUIRED, readOnly=true) public VirtualProc getVirtualProc(String id) { diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V29__Add_swap_memory_used_column.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V29__Add_swap_memory_used_column.sql new file mode 100644 index 000000000..997bb4e19 --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V29__Add_swap_memory_used_column.sql @@ -0,0 +1,3 @@ +-- Add a new column to track swap memory usage in the proc table + +ALTER TABLE proc ADD COLUMN int_swap_used BIGINT DEFAULT 0 NOT NULL; \ No newline at end of file diff --git a/cuebot/src/main/resources/opencue.properties b/cuebot/src/main/resources/opencue.properties index 257b5695b..b40fbc9c6 100644 --- a/cuebot/src/main/resources/opencue.properties +++ b/cuebot/src/main/resources/opencue.properties @@ -130,7 +130,12 @@ dispatcher.booking_queue.max_pool_size=6 dispatcher.booking_queue.queue_capacity=1000 # Percentage of used memory to consider a risk for triggering oom-killer -dispatcher.oom_max_safe_used_memory_threshold=0.98 +# If equals to -1, it means the feature is turned off +dispatcher.oom_max_safe_used_physical_memory_threshold=0.9 + +# Percentage of used swap to consider a risk for triggering oom-killer +# If equals to -1, it means the feature is turned off +dispatcher.oom_max_safe_used_swap_memory_threshold=0.05 # How much can a frame exceed its reserved memory. # - 0.5 means 50% above reserve diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java index 7504fa751..0cbe09970 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/ProcDaoTests.java @@ -107,7 +107,7 @@ public class ProcDaoTests extends AbstractTransactionalJUnit4SpringContextTests @Resource FrameSearchFactory frameSearchFactory; - + @Resource ProcSearchFactory procSearchFactory; @@ -328,7 +328,7 @@ public void testUpdateProcMemoryUsage() { procDao.verifyRunningProc(proc.getId(), frame.getId()); byte[] children = new byte[100]; - procDao.updateProcMemoryUsage(frame, 100, 100, 1000, 1000, 0, 0, children); + procDao.updateProcMemoryUsage(frame, 100, 100, 1000, 1000, 0, 0, 0, children); } @@ -572,47 +572,6 @@ public void testIncreaseReservedMemory() { procDao.increaseReservedMemory(proc, 3145728); } - @Test - @Transactional - @Rollback(true) - public void testFindReservedMemoryOffender() { - DispatchHost host = createHost(); - - - jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_dispatch_test.xml")); - JobDetail job = jobManager.findJobDetail("pipe-dev.cue-testuser_shell_dispatch_test_v1"); - jobManager.setJobPaused(job, false); - - int i = 1; - List frames = dispatcherDao.findNextDispatchFrames(job, host, 6); - assertEquals(6, frames.size()); - byte[] children = new byte[100]; - for (DispatchFrame frame: frames) { - - VirtualProc proc = VirtualProc.build(host, frame); - proc.childProcesses = children; - frame.minMemory = Dispatcher.MEM_RESERVED_DEFAULT; - dispatcher.dispatch(frame, proc); - - // Increase the memory usage as frames are added - procDao.updateProcMemoryUsage(frame, - 1000*i, 1000*i, - Dispatcher.MEM_RESERVED_DEFAULT*i, Dispatcher.MEM_RESERVED_DEFAULT*i, - 0, 0, children); - i++; - } - - // Now compare the last frame which has the highest memory - // usage to the what is returned by getWorstMemoryOffender - VirtualProc offender = procDao.getWorstMemoryOffender(host); - - FrameDetail f = frameDao.getFrameDetail(frames.get(5)); - FrameDetail o = frameDao.getFrameDetail(offender); - - assertEquals(f.getName(), o.getName()); - assertEquals(f.id, o.getFrameId()); - } - @Test @Transactional @Rollback(true) @@ -672,7 +631,7 @@ public void testBalanceUnderUtilizedProcs() { procDao.insertVirtualProc(proc1); byte[] children = new byte[100]; - procDao.updateProcMemoryUsage(frame1, 250000, 250000, 250000, 250000, 0, 0, children); + procDao.updateProcMemoryUsage(frame1, 250000, 250000, 250000, 250000, 0, 0, 0, children); layerDao.updateLayerMaxRSS(frame1, 250000, true); FrameDetail frameDetail2 = frameDao.findFrameDetail(job, "0002-pass_1"); @@ -682,7 +641,7 @@ public void testBalanceUnderUtilizedProcs() { proc2.frameId = frame2.id; procDao.insertVirtualProc(proc2); - procDao.updateProcMemoryUsage(frame2, 255000, 255000,255000, 255000, 0, 0, children); + procDao.updateProcMemoryUsage(frame2, 255000, 255000,255000, 255000, 0, 0, 0, children); layerDao.updateLayerMaxRSS(frame2, 255000, true); FrameDetail frameDetail3 = frameDao.findFrameDetail(job, "0003-pass_1"); @@ -692,7 +651,7 @@ public void testBalanceUnderUtilizedProcs() { proc3.frameId = frame3.id; procDao.insertVirtualProc(proc3); - procDao.updateProcMemoryUsage(frame3, 3145728, 3145728,3145728, 3145728, 0, 0, children); + procDao.updateProcMemoryUsage(frame3, 3145728, 3145728,3145728, 3145728, 0, 0, 0, children); layerDao.updateLayerMaxRSS(frame3,300000, true); procDao.balanceUnderUtilizedProcs(proc3, 100000); @@ -856,23 +815,23 @@ public void getProcsBySearch() { public void testVirtualProcWithSelfishService() { DispatchHost host = createHost(); JobDetail job = launchJob(); - + FrameDetail frameDetail = frameDao.findFrameDetail(job, "0001-pass_1_preprocess"); DispatchFrame frame = frameDao.getDispatchFrame(frameDetail.id); frame.minCores = 250; frame.threadable = true; // Frame from a non-selfish sevice - VirtualProc proc = VirtualProc.build(host, frame, "something-else"); + VirtualProc proc = VirtualProc.build(host, frame, "something-else"); assertEquals(250, proc.coresReserved); // When no selfish service config is provided - proc = VirtualProc.build(host, frame); + proc = VirtualProc.build(host, frame); assertEquals(250, proc.coresReserved); - // Frame with a selfish service - proc = VirtualProc.build(host, frame, "shell", "something-else"); + // Frame with a selfish service + proc = VirtualProc.build(host, frame, "shell", "something-else"); assertEquals(800, proc.coresReserved); } } diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java index b39b97ad0..40c83d68d 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java @@ -297,11 +297,11 @@ public void testHandleHostReportWithFullTemporaryDirectories() { * Precondition: * - HardwareState=UP * Action: - * - Receives a HostReport with less freeTempDir than the threshold + * - Receives a HostReport with less freeTempDir than the threshold * (opencue.properties: min_available_temp_storage_percentage) * Postcondition: * - Host hardwareState changes to REPAIR - * - A comment is created with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and + * - A comment is created with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and * user=CUEBOT_COMMENT_USER * */ // Create HostReport with totalMcp=4GB and freeMcp=128MB @@ -337,12 +337,12 @@ public void testHandleHostReportWithFullTemporaryDirectories() { /* * Test 2: - * Precondition: + * Precondition: * - HardwareState=REPAIR - * - There is a comment for the host with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and + * - There is a comment for the host with subject=SUBJECT_COMMENT_FULL_TEMP_DIR and * user=CUEBOT_COMMENT_USER * Action: - * Receives a HostReport with more freeTempDir than the threshold + * Receives a HostReport with more freeTempDir than the threshold * (opencue.properties: min_available_temp_storage_percentage) * Postcondition: * - Host hardwareState changes to UP @@ -548,81 +548,94 @@ public void testMemoryAggressionMemoryWarning() { // Ok RunningFrameInfo info1 = RunningFrameInfo.newBuilder() - .setJobId(proc1.getJobId()) - .setLayerId(proc1.getLayerId()) - .setFrameId(proc1.getFrameId()) - .setResourceId(proc1.getProcId()) - .setVsize(CueUtil.GB2) - .setRss(CueUtil.GB2) - .setMaxRss(CueUtil.GB2) - .build(); + .setJobId(proc1.getJobId()) + .setLayerId(proc1.getLayerId()) + .setFrameId(proc1.getFrameId()) + .setResourceId(proc1.getProcId()) + .setUsedSwapMemory(CueUtil.MB512 - CueUtil.MB128) + .setVsize(CueUtil.GB2) + .setRss(CueUtil.GB2) + .setMaxRss(CueUtil.GB2) + .build(); // Overboard Rss RunningFrameInfo info2 = RunningFrameInfo.newBuilder() - .setJobId(proc2.getJobId()) - .setLayerId(proc2.getLayerId()) - .setFrameId(proc2.getFrameId()) - .setResourceId(proc2.getProcId()) - .setVsize(CueUtil.GB4) - .setRss(CueUtil.GB4) - .setMaxRss(CueUtil.GB4) - .build(); + .setJobId(proc2.getJobId()) + .setLayerId(proc2.getLayerId()) + .setFrameId(proc2.getFrameId()) + .setResourceId(proc2.getProcId()) + .setUsedSwapMemory(CueUtil.MB512) + .setVsize(CueUtil.GB4) + .setRss(CueUtil.GB4) + .setMaxRss(CueUtil.GB4) + .build(); // Overboard Rss long memoryUsedProc3 = CueUtil.GB8; RunningFrameInfo info3 = RunningFrameInfo.newBuilder() - .setJobId(proc3.getJobId()) - .setLayerId(proc3.getLayerId()) - .setFrameId(proc3.getFrameId()) - .setResourceId(proc3.getProcId()) - .setVsize(memoryUsedProc3) - .setRss(memoryUsedProc3) - .setMaxRss(memoryUsedProc3) - .build(); - - RenderHost hostAfterUpdate = getRenderHostBuilder(hostname).setFreeMem(0).build(); + .setJobId(proc3.getJobId()) + .setLayerId(proc3.getLayerId()) + .setFrameId(proc3.getFrameId()) + .setResourceId(proc3.getProcId()) + .setUsedSwapMemory(CueUtil.MB512 * 2) + .setVsize(memoryUsedProc3) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) + .build(); + + RenderHost hostAfterUpdate = getRenderHostBuilder(hostname) + .setFreeMem(0) + .setFreeSwap(CueUtil.GB2 - + info1.getUsedSwapMemory() - + info2.getUsedSwapMemory() - + info3.getUsedSwapMemory()) + .build(); HostReport report = HostReport.newBuilder() - .setHost(hostAfterUpdate) - .setCoreInfo(getCoreDetail(200, 200, 0, 0)) - .addAllFrames(Arrays.asList(info1, info2, info3)) - .build(); + .setHost(hostAfterUpdate) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addAllFrames(Arrays.asList(info1, info2, info3)) + .build(); // Get layer state before report gets sent LayerDetail layerBeforeIncrease = jobManager.getLayerDetail(proc3.getLayerId()); - // In this case, killing one job should be enough to ge the machine to a safe state + // In this case, killing 2 frames should be enough to ge the machine to a safe + // state. Total Swap: 2GB, usage before kill: 1944MB, usage after kill: 348 (less than 20%) long killCount = DispatchSupport.killedOffenderProcs.get(); hostReportHandler.handleHostReport(report, false); - assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + assertEquals(killCount + 2, DispatchSupport.killedOffenderProcs.get()); - // Confirm the frame will be set to retry after it's completion has been processed + // Confirm the frame will be set to retry after it's completion has been + // processed RunningFrameInfo runningFrame = RunningFrameInfo.newBuilder() - .setFrameId(proc3.getFrameId()) - .setFrameName("frame_name") - .setLayerId(proc3.getLayerId()) - .setRss(memoryUsedProc3) - .setMaxRss(memoryUsedProc3) - .setResourceId(proc3.id) - .build(); + .setFrameId(proc3.getFrameId()) + .setFrameName("frame_name") + .setLayerId(proc3.getLayerId()) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) + .setResourceId(proc3.id) + .build(); FrameCompleteReport completeReport = FrameCompleteReport.newBuilder() - .setHost(hostAfterUpdate) - .setFrame(runningFrame) - .setExitSignal(9) - .setRunTime(1) - .setExitStatus(1) - .build(); + .setHost(hostAfterUpdate) + .setFrame(runningFrame) + .setExitSignal(9) + .setRunTime(1) + .setExitStatus(1) + .build(); frameCompleteHandler.handleFrameCompleteReport(completeReport); FrameDetail killedFrame = jobManager.getFrameDetail(proc3.getFrameId()); LayerDetail layer = jobManager.getLayerDetail(proc3.getLayerId()); assertEquals(FrameState.WAITING, killedFrame.state); - // Memory increases are processed in two different places one will set the new value to proc.reserved + 2GB - // and the other will set to the maximum reported proc.maxRss the end value will be whoever is higher. + // Memory increases are processed in two different places. + // First: proc.reserved + 2GB + // Second: the maximum reported proc.maxRss + // The higher valuer beween First and Second wins. // In this case, proc.maxRss assertEquals(Math.max(memoryUsedProc3, layerBeforeIncrease.getMinimumMemory() + CueUtil.GB2), - layer.getMinimumMemory()); + layer.getMinimumMemory()); } } diff --git a/cuebot/src/test/resources/opencue.properties b/cuebot/src/test/resources/opencue.properties index 47c7b8e31..cfaec991c 100644 --- a/cuebot/src/test/resources/opencue.properties +++ b/cuebot/src/test/resources/opencue.properties @@ -77,11 +77,12 @@ dispatcher.booking_queue.max_pool_size=6 dispatcher.booking_queue.queue_capacity=1000 dispatcher.min_available_temp_storage_percentage=20 dispatcher.min_bookable_free_mcp_kb=1048576 -dispatcher.oom_max_safe_used_memory_threshold=0.95 +dispatcher.oom_max_safe_used_physical_memory_threshold=0.9 +dispatcher.oom_max_safe_used_swap_memory_threshold=0.2 dispatcher.oom_frame_overboard_allowed_threshold=0.6 dispatcher.frame_kill_retry_limit=3 -# A comma separated list of services that should have their frames considered +# A comma separated list of services that should have their frames considered # selfish. A selfish frame will reserve all the available cores to avoid # having to share resources with other renders. dispatcher.frame.selfish.services=arnold,selfish-service \ No newline at end of file diff --git a/proto/report.proto b/proto/report.proto index 441f810c3..f9c56d5f4 100644 --- a/proto/report.proto +++ b/proto/report.proto @@ -101,6 +101,7 @@ message RunningFrameInfo { int64 max_used_gpu_memory = 16; // kB int64 used_gpu_memory = 17; // kB ChildrenProcStats children = 18; //additional data about the running frame's child processes + int64 used_swap_memory = 19; // kB }; message ChildrenProcStats { diff --git a/rqd/rqd/rqmachine.py b/rqd/rqd/rqmachine.py index 6874bd25b..f67b8955d 100644 --- a/rqd/rqd/rqmachine.py +++ b/rqd/rqd/rqmachine.py @@ -266,6 +266,8 @@ def rssUpdate(self, frames): # The time in jiffies the process started # after system boot. "start_time": statFields[21], + # Fetch swap usage + "swap": self._getProcSwap(pid), } # cmdline: p = psutil.Process(int(pid)) @@ -301,6 +303,7 @@ def rssUpdate(self, frames): session = str(frame.pid) rss = 0 vsize = 0 + swap = 0 pcpu = 0 # children pids share the same session id for pid, data in pids.items(): @@ -308,6 +311,7 @@ def rssUpdate(self, frames): try: rss += int(data["rss"]) vsize += int(data["vsize"]) + swap += int(data["swap"]) # jiffies used by this process, last two means that dead # children are counted @@ -343,6 +347,7 @@ def rssUpdate(self, frames): frame.childrenProcs[pid]['rss'] = childRss frame.childrenProcs[pid]['vsize'] = \ int(data["vsize"]) // 1024 + frame.childrenProcs[pid]['swap'] = swap // 1024 frame.childrenProcs[pid]['statm_rss'] = \ (int(data["statm_rss"]) \ * resource.getpagesize()) // 1024 @@ -355,6 +360,7 @@ def rssUpdate(self, frames): 'rss_page': int(data["rss"]), 'rss': (int(data["rss"]) * resource.getpagesize()) // 1024, 'vsize': int(data["vsize"]) // 1024, + 'swap': swap // 1024, 'state': data['state'], # statm reports in pages (~ 4kB) # same as VmRss in /proc/[pid]/status (in KB) @@ -373,9 +379,11 @@ def rssUpdate(self, frames): # convert bytes to KB rss = (rss * resource.getpagesize()) // 1024 vsize = int(vsize/1024) + swap = swap // 1024 frame.rss = rss frame.maxRss = max(rss, frame.maxRss) + frame.usedSwapMemory = swap if os.path.exists(frame.runFrame.log_dir_file): stat = os.stat(frame.runFrame.log_dir_file).st_mtime @@ -395,6 +403,21 @@ def rssUpdate(self, frames): except Exception as e: log.exception('Failure with rss update due to: %s', e) + def _getProcSwap(self, pid): + """Helper function to get swap memory used by a process""" + swap_used = 0 + try: + with open("/proc/%s/status" % pid, "r", encoding='utf-8') as statusFile: + for line in statusFile: + if line.startswith("VmSwap:"): + swap_used = int(line.split()[1]) + break + except FileNotFoundError: + log.info('Process %s terminated before swap info could be read.', pid) + except Exception as e: + log.warning('Failed to read swap usage for pid %s: %s', pid, e) + return swap_used + def getLoadAvg(self): """Returns average number of processes waiting to be served for the last 1 minute multiplied by 100.""" diff --git a/rqd/rqd/rqnetwork.py b/rqd/rqd/rqnetwork.py index fa1fb8060..de1b38475 100644 --- a/rqd/rqd/rqnetwork.py +++ b/rqd/rqd/rqnetwork.py @@ -71,6 +71,8 @@ def __init__(self, rqCore, runFrame): self.usedGpuMemory = 0 self.maxUsedGpuMemory = 0 + self.usedSwapMemory = 0 + self.realtime = 0 self.utime = 0 self.stime = 0 @@ -98,7 +100,8 @@ def runningFrameInfo(self): num_gpus=self.runFrame.num_gpus, max_used_gpu_memory=self.maxUsedGpuMemory, used_gpu_memory=self.usedGpuMemory, - children=self._serializeChildrenProcs() + children=self._serializeChildrenProcs(), + used_swap_memory=self.usedSwapMemory, ) return runningFrameInfo