Skip to content

Commit

Permalink
[cuebot/rqd] Prevent running frames on Swap memory (#1497)
Browse files Browse the repository at this point in the history
Improve logic previously implemented to handle Out-of-memory conditions
to consider swap usage.

When a host is using more than
`dispatcher.oom_max_safe_used_physical_memory_threshold` if its physical
memory and more than
`dispatcher.oom_max_safe_used_swap_memory_threshold` of its swap memory,
a logic that kills frames that are relying heavily on swap memory is
triggered. This logic will automatically mark killed frames to be
retried and possibly increase its parent layer memory requirements if it
had been using more memory than initially reserved.


Co-authored-by: Ramon Figueiredo <[email protected]>
  • Loading branch information
DiegoTavares and ramonfigueiredo authored Sep 6, 2024
1 parent 13087f3 commit e67a8b3
Show file tree
Hide file tree
Showing 16 changed files with 218 additions and 244 deletions.
2 changes: 1 addition & 1 deletion VERSION.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.34
0.35
11 changes: 2 additions & 9 deletions cuebot/src/main/java/com/imageworks/spcue/dao/ProcDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ public interface ProcDao {

long getReservedGpuMemory(ProcInterface proc);

/**
* Return the proc that has exceeded its reserved memory by the largest factor.
*
* @param host
* @return
*/
VirtualProc getWorstMemoryOffender(HostInterface host);

/**
* Removes a little bit of reserved memory from every other running frame
* in order to give some to the target proc.
Expand Down Expand Up @@ -151,7 +143,8 @@ public interface ProcDao {
*/
void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss,
long vsize, long maxVsize, long usedGpuMemory,
long maxUsedGpuMemory, byte[] children);
long maxUsedGpuMemory, long usedSwapMemory,
byte[] children);

/**
* get aq virual proc from its unique id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import java.util.Map;

import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import org.springframework.jdbc.core.PreparedStatementCreator;

import com.imageworks.spcue.FrameInterface;
import com.imageworks.spcue.HostInterface;
Expand Down Expand Up @@ -240,14 +240,16 @@ public boolean clearVirtualProcAssignment(FrameInterface frame) {
"int_virt_max_used = ?, " +
"int_gpu_mem_used = ?, " +
"int_gpu_mem_max_used = ?, " +
"int_swap_used = ?, " +
"bytea_children = ?, " +
"ts_ping = current_timestamp " +
"WHERE " +
"pk_frame = ?";

@Override
public void updateProcMemoryUsage(FrameInterface f, long rss, long maxRss,
long vss, long maxVss, long usedGpuMemory, long maxUsedGpuMemory, byte[] children) {
long vss, long maxVss, long usedGpuMemory, long maxUsedGpuMemory,
long usedSwapMemory, byte[] children) {
/*
* This method is going to repeat for a proc every 1 minute, so
* if the proc is being touched by another thread, then return
Expand All @@ -274,8 +276,9 @@ public PreparedStatement createPreparedStatement(Connection conn)
updateProc.setLong(4, maxVss);
updateProc.setLong(5, usedGpuMemory);
updateProc.setLong(6, maxUsedGpuMemory);
updateProc.setBytes(7, children);
updateProc.setString(8, f.getFrameId());
updateProc.setLong(7, usedSwapMemory);
updateProc.setBytes(8, children);
updateProc.setString(9, f.getFrameId());
return updateProc;
}
});
Expand Down Expand Up @@ -569,49 +572,6 @@ public boolean increaseReservedMemory(ProcInterface p, long value) {
}
}

private static final String FIND_WORST_MEMORY_OFFENDER =
"SELECT " +
"pk_proc, " +
"pk_host, " +
"pk_show, "+
"pk_job, "+
"pk_layer,"+
"pk_frame,"+
"b_unbooked,"+
"b_local, "+
"pk_alloc, "+
"pk_facility, " +
"int_cores_reserved,"+
"int_mem_reserved," +
"int_mem_max_used,"+
"int_mem_used,"+
"int_gpus_reserved," +
"int_gpu_mem_reserved," +
"int_gpu_mem_max_used," +
"int_gpu_mem_used," +
"int_virt_max_used,"+
"int_virt_used,"+
"host_name, " +
"str_os, " +
"bytea_children " +
"FROM ("
+ GET_VIRTUAL_PROC + " " +
"AND " +
"host.pk_host = ? " +
"AND " +
"proc.int_mem_reserved != 0 " +
"AND " +
"proc.int_virt_used >= proc.int_mem_pre_reserved " +
"ORDER BY " +
"proc.int_virt_used / proc.int_mem_pre_reserved DESC " +
") AS t1 LIMIT 1";

@Override
public VirtualProc getWorstMemoryOffender(HostInterface host) {
return getJdbcTemplate().queryForObject(FIND_WORST_MEMORY_OFFENDER,
VIRTUAL_PROC_MAPPER, host.getHostId());
}

public long getReservedMemory(ProcInterface proc) {
return getJdbcTemplate().queryForObject(
"SELECT int_mem_reserved FROM proc WHERE pk_proc=?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ void updateFrameMemoryUsageAndLluTime(FrameInterface frame, long rss, long maxRs
*/
void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss,
long vsize, long maxVsize, long usedGpuMemory,
long maxUsedGpuMemory, byte[] children);
long maxUsedGpuMemory, long usedSwapMemory,
byte[] children);

/**
* Return true if adding the given core units would put the show
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ public void runFrame(VirtualProc proc, DispatchFrame frame) {
" could not be booked on " + frame.getName() + ", " + e);
}
}

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void startFrameAndProc(VirtualProc proc, DispatchFrame frame) {
logger.trace("starting frame: " + frame);

frameDao.updateFrameStarted(proc, frame);

reserveProc(proc, frame);
Expand Down Expand Up @@ -571,9 +571,11 @@ public void lostProc(VirtualProc proc, String reason, int exitStatus) {
@Transactional(propagation = Propagation.REQUIRED)
public void updateProcMemoryUsage(FrameInterface frame, long rss, long maxRss,
long vsize, long maxVsize, long usedGpuMemory,
long maxUsedGpuMemory, byte[] children) {
long maxUsedGpuMemory, long usedSwapMemory,
byte[] children) {
procDao.updateProcMemoryUsage(frame, rss, maxRss, vsize, maxVsize,
usedGpuMemory, maxUsedGpuMemory, children);
usedGpuMemory, maxUsedGpuMemory, usedSwapMemory,
children);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -339,11 +340,11 @@ else if (!dispatchSupport.isCueBookable(host)) {

/**
* Check if a reported temp storage size and availability is enough for running a job
*
*
* Use dispatcher.min_available_temp_storage_percentage (opencue.properties) to
* define what's the accepted threshold. Providing hostOs is necessary as this feature
* is currently not available on Windows hosts
*
*
* @param tempTotalStorage Total storage on the temp directory
* @param tempFreeStorage Free storage on the temp directory
* @param hostOs Reported os
Expand Down Expand Up @@ -371,7 +372,7 @@ private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStora
* @param reportState
* @param isBoot
*/
private void changeHardwareState(DispatchHost host, HardwareState reportState, boolean isBoot) {
private void changeHardwareState(DispatchHost host, HardwareState reportState, boolean isBoot) {
// If the states are the same there is no reason to do this update.
if (host.hardwareState.equals(reportState)) {
return;
Expand Down Expand Up @@ -411,7 +412,7 @@ private void changeHardwareState(DispatchHost host, HardwareState reportState, b
* - Set the host state to UP, when the amount of free space in the temporary directory
* is greater or equal to the minimum required and the host has a comment with
* subject: SUBJECT_COMMENT_FULL_TEMP_DIR
*
*
* @param host
* @param reportHost
* @return
Expand Down Expand Up @@ -499,47 +500,88 @@ private void changeLockState(DispatchHost host, CoreDetail coreInfo) {
}

/**
* Prevent host from entering an OOM state where oom-killer might start killing important OS processes.
* Prevent host from entering an OOM state where oom-killer might start killing
* important OS processes and frames start using SWAP memory
* The kill logic will kick in one of the following conditions is met:
* - Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available
* - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved
* For frames that are using more than they had reserved but not above the threshold, negotiate expanding
* the reservations with other frames on the same host
*
* - Host has less than oom_max_safe_used_physical_memory_threshold memory
* available and less than oom_max_safe_used_swap_memory_threshold swap
* available
* - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had
* reserved
* For frames that are using more than they had reserved but not above the
* threshold, negotiate expanding the reservations with other frames on the same
* host
*
* @param dispatchHost
* @param report
*/
private void handleMemoryUsage(final DispatchHost dispatchHost, RenderHost renderHost,
List<RunningFrameInfo> runningFrames) {
// Don't keep memory balances on nimby hosts
if (dispatchHost.isNimby) {
// Don't keep memory balances on nimby hosts and hosts with invalid memory
// information
if (dispatchHost.isNimby || renderHost.getTotalMem() <= 0) {
return;
}

final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = env
.getRequiredProperty("dispatcher.oom_max_safe_used_memory_threshold", Double.class);
final double OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD = env
.getRequiredProperty("dispatcher.oom_max_safe_used_physical_memory_threshold", Double.class);
final double OOM_MAX_SAFE_USED_SWAP_THRESHOLD = env
.getRequiredProperty("dispatcher.oom_max_safe_used_swap_memory_threshold", Double.class);
final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = env
.getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class);

boolean memoryWarning = renderHost.getTotalMem() > 0 &&
((double)renderHost.getFreeMem()/renderHost.getTotalMem() <
(1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD));
Double physMemoryUsageRatio = renderHost.getTotalMem() > 0 ?
1.0 - renderHost.getFreeMem() / (double) renderHost.getTotalMem() :
0.0;

Double swapMemoryUsageRatio = renderHost.getTotalSwap() > 0 ?
1.0 - renderHost.getFreeSwap() / (double) renderHost.getTotalSwap() :
0.0;

// If checking for the swap threshold has been disabled, only memory usage is
// taken into consideration.
// If checking for memory has been disabled, checking for swap isolated is not
// safe, therefore disabled
boolean memoryWarning = false;
if (OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD > 0.0 && OOM_MAX_SAFE_USED_SWAP_THRESHOLD > 0.0 &&
!physMemoryUsageRatio.isNaN() && !swapMemoryUsageRatio.isNaN()) {
memoryWarning = physMemoryUsageRatio > OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD &&
swapMemoryUsageRatio > OOM_MAX_SAFE_USED_SWAP_THRESHOLD;
} else if (OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD > 0.0 && !physMemoryUsageRatio.isNaN()) {
memoryWarning = physMemoryUsageRatio > OOM_MAX_SAFE_USED_PHYSICAL_THRESHOLD;
}

if (memoryWarning) {
long memoryAvailable = renderHost.getFreeMem();
long minSafeMemoryAvailable = (long)(renderHost.getTotalMem() * (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD));
// Only allow killing up to 10 frames at a time
int killAttemptsRemaining = 10;
VirtualProc killedProc = null;
do {
killedProc = killWorstMemoryOffender(dispatchHost);
logger.warn("Memory warning(" + renderHost.getName() + "): physMemoryRatio: " +
physMemoryUsageRatio + ", swapRatio: " + swapMemoryUsageRatio);
// Try to kill frames using swap memory as they are probably performing poorly
long swapUsed = renderHost.getTotalSwap() - renderHost.getFreeSwap();
long maxSwapUsageAllowed = (long) (renderHost.getTotalSwap()
* OOM_MAX_SAFE_USED_SWAP_THRESHOLD);

// Sort runningFrames bassed on how much swap they are using
runningFrames.sort(Comparator.comparingLong((RunningFrameInfo frame) ->
frame.getUsedSwapMemory()).reversed());

int killAttemptsRemaining = 5;
for (RunningFrameInfo frame : runningFrames) {
// Reached the first frame on the sorted list without swap usage
if (frame.getUsedSwapMemory() <= 0) {
break;
}
if (killProcForMemory(frame.getFrameId(), renderHost.getName(),
KillCause.HostUnderOom)) {
swapUsed -= frame.getUsedSwapMemory();
logger.info("Memory warning(" + renderHost.getName() + "): " +
"Killing frame on " + frame.getJobName() + "." +
frame.getFrameName() + ", using too much swap.");
}

killAttemptsRemaining -= 1;
if (killedProc != null) {
memoryAvailable = memoryAvailable + killedProc.memoryUsed;
if (killAttemptsRemaining <= 0 || swapUsed <= maxSwapUsageAllowed) {
break;
}
} while (killAttemptsRemaining > 0 &&
memoryAvailable < minSafeMemoryAvailable &&
killedProc != null);
}
} else {
// When no mass cleaning was required, check for frames going overboard
// if frames didn't go overboard, manage its reservations trying to increase
Expand Down Expand Up @@ -582,10 +624,12 @@ private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname
if (proc.isLocalDispatch) {
return false;
}

logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() +
", using too much memory.");
return killProcForMemory(proc, hostname, KillCause.FrameOverboard);
boolean killed = killProcForMemory(proc.frameId, hostname, KillCause.FrameOverboard);
if (killed) {
logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() +
", using too much memory.");
}
return killed;
} catch (EmptyResultDataAccessException e) {
return false;
}
Expand Down Expand Up @@ -627,12 +671,12 @@ private boolean getKillClearance(String hostname, String frameId) {
return true;
}

private boolean killProcForMemory(VirtualProc proc, String hostname, KillCause killCause) {
if (!getKillClearance(hostname, proc.frameId)) {
private boolean killProcForMemory(String frameId, String hostname, KillCause killCause) {
if (!getKillClearance(hostname, frameId)) {
return false;
}

FrameInterface frame = jobManager.getFrame(proc.frameId);
FrameInterface frame = jobManager.getFrame(frameId);
if (dispatcher.isTestMode()) {
// Different threads don't share the same database state on the test environment
(new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient,
Expand Down Expand Up @@ -674,28 +718,6 @@ private boolean killFrame(String frameId, String hostname, KillCause killCause)
return true;
}

/**
* Kill proc with the worst user/reserved memory ratio.
*
* @param host
* @return killed proc, or null if none could be found or failed to be killed
*/
private VirtualProc killWorstMemoryOffender(final DispatchHost host) {
try {
VirtualProc proc = hostManager.getWorstMemoryOffender(host);
logger.info("Killing frame on " + proc.getName() + ", host is under stress.");

if (!killProcForMemory(proc, host.getName(), KillCause.HostUnderOom)) {
proc = null;
}
return proc;
}
catch (EmptyResultDataAccessException e) {
logger.error(host.name + " is under OOM and no proc is memory overboard.");
return null;
}
}

/**
* Check frame memory usage comparing the amount used with the amount it had reserved
* @param frame
Expand Down Expand Up @@ -825,15 +847,16 @@ private void killTimedOutFrames(List<RunningFrameInfo> runningFrames, String hos
private void updateMemoryUsageAndLluTime(List<RunningFrameInfo> rFrames) {

for (RunningFrameInfo rf: rFrames) {

FrameInterface frame = jobManager.getFrame(rf.getFrameId());

dispatchSupport.updateFrameMemoryUsageAndLluTime(frame,
rf.getRss(), rf.getMaxRss(), rf.getLluTime());

dispatchSupport.updateProcMemoryUsage(frame, rf.getRss(), rf.getMaxRss(),
rf.getVsize(), rf.getMaxVsize(), rf.getUsedGpuMemory(),
rf.getMaxUsedGpuMemory(), rf.getChildren().toByteArray());
rf.getMaxUsedGpuMemory(), rf.getUsedSwapMemory(),
rf.getChildren().toByteArray());

}

updateJobMemoryUsage(rFrames);
Expand Down
Loading

0 comments on commit e67a8b3

Please sign in to comment.