diff --git a/.github/workflows/testing-pipeline.yml b/.github/workflows/testing-pipeline.yml index dd82f4dc5..fe3ade9a6 100644 --- a/.github/workflows/testing-pipeline.yml +++ b/.github/workflows/testing-pipeline.yml @@ -7,17 +7,6 @@ on: branches: [ master ] jobs: - test_python_2022: - name: Run Python Unit Tests (CY2022) - runs-on: ubuntu-22.04 - container: aswf/ci-opencue:2022 - env: - ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true - steps: - - uses: actions/checkout@v3 - - name: Run Python Tests - run: ci/run_python_tests.sh --no-gui - test_cuebot_2022: name: Build Cuebot and Run Unit Tests (CY2022) runs-on: ubuntu-22.04 diff --git a/VERSION.in b/VERSION.in index 9f8e9b69a..b123147e2 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -1.0 \ No newline at end of file +1.1 \ No newline at end of file diff --git a/ci/run_python_tests.sh b/ci/run_python_tests.sh index 101d75288..1259adf4c 100755 --- a/ci/run_python_tests.sh +++ b/ci/run_python_tests.sh @@ -28,7 +28,6 @@ PYTHONPATH=pycue python -m unittest discover -s cueadmin/tests -t cueadmin -p "* PYTHONPATH=pycue:pyoutline python -m unittest discover -s cuesubmit/tests -t cuesubmit -p "*.py" python -m pytest rqd/tests - # Xvfb no longer supports Python 2. if [[ "$python_version" =~ "Python 3" && ${args[0]} != "--no-gui" ]]; then ci/run_gui_test.sh diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java index faa1a9c04..2c60e9930 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java @@ -49,5 +49,8 @@ public class DispatchFrame extends FrameEntity implements FrameInterface { // A comma separated list of services public String services; + + // The Operational System this frame is expected to run in + public String os; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java index f01724e17..40a3e6bbc 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java @@ -51,7 +51,7 @@ public class DispatchHost extends Entity public long gpuMemory; public long idleGpuMemory; public String tags; - public String os; + private String os; public boolean isNimby; public boolean isLocalDispatch = false; @@ -81,6 +81,14 @@ public String getFacilityId() { return facilityId; } + public String[] getOs() { + return this.os.split(","); + } + + public void setOs(String os) { + this.os = os; + } + public boolean canHandleNegativeCoresRequest(int requestedCores) { // Request is positive, no need to test further. if (requestedCores > 0) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java index e89ced6ce..02ade6bb4 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java @@ -85,7 +85,9 @@ public String getName() { * @param frame * @return */ - public static final VirtualProc build(DispatchHost host, DispatchFrame frame, String... selfishServices) { + public static final VirtualProc build(DispatchHost host, + DispatchFrame frame, + String... selfishServices) { VirtualProc proc = new VirtualProc(); proc.allocationId = host.getAllocationId(); proc.hostId = host.getHostId(); @@ -94,7 +96,7 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, St proc.jobId = frame.getJobId(); proc.showId = frame.getShowId(); proc.facilityId = frame.getFacilityId(); - proc.os = host.os; + proc.os = frame.os; proc.hostName = host.getName(); proc.unbooked = false; @@ -238,7 +240,7 @@ public static final VirtualProc build(DispatchHost host, proc.jobId = frame.getJobId(); proc.showId = frame.getShowId(); proc.facilityId = frame.getFacilityId(); - proc.os = host.os; + proc.os = frame.os; proc.hostName = host.getName(); proc.unbooked = false; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java index e36f97999..02dae0f22 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java @@ -72,9 +72,9 @@ public class DispatchQuery { "AND job.pk_facility = ? " + "AND " + "(" + - "job.str_os IS NULL OR job.str_os = '' " + + "job.str_os IS NULL OR job.str_os IN '' " + "OR " + - "job.str_os = ? " + + "job.str_os IN ? " + ") " + "AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " + "AND layer.int_cores_min <= ? " + @@ -135,7 +135,7 @@ public class DispatchQuery { "(" + "job.str_os IS NULL OR job.str_os = '' " + "OR " + - "job.str_os = ? " + + "job.str_os IN ? " + ") " + "AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " + "AND layer.int_cores_min <= ? " + @@ -250,7 +250,7 @@ private static final String replaceQueryForFifo(String query) { "AND " + "job.pk_facility = ? " + "AND " + - "(job.str_os = ? OR job.str_os IS NULL) " + + "(job.str_os IN ? OR job.str_os IS NULL) " + "AND " + "job.pk_job IN ( " + "SELECT " + @@ -276,7 +276,7 @@ private static final String replaceQueryForFifo(String query) { "AND " + "j.pk_facility = ? " + "AND " + - "(j.str_os = ? OR j.str_os IS NULL) " + + "(j.str_os IN ? OR j.str_os IS NULL) " + "AND " + "(CASE WHEN lst.int_waiting_count > 0 THEN lst.pk_layer ELSE NULL END) = l.pk_layer " + "AND " + @@ -519,40 +519,42 @@ private static final String replaceQueryForFifo(String query) { ") " + "LIMIT 1"; + private static final String FIND_DISPATCH_FRAME_COLUMNS = + "show_name, " + + "job_name, " + + "pk_job, " + + "pk_show, " + + "pk_facility, " + + "str_name, " + + "str_shot, " + + "str_user, " + + "int_uid, " + + "str_log_dir, " + + "str_os, " + + "frame_name, " + + "frame_state, " + + "pk_frame, " + + "pk_layer, " + + "int_retries, " + + "int_version, " + + "layer_name, " + + "layer_type, " + + "b_threadable, " + + "int_cores_min, " + + "int_cores_max, " + + "int_mem_min, " + + "int_gpus_min, " + + "int_gpus_max, " + + "int_gpu_mem_min, " + + "str_cmd, " + + "str_range, " + + "int_chunk_size, " + + "str_services "; /** * Finds the next frame in a job for a proc. */ public static final String FIND_DISPATCH_FRAME_BY_JOB_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -569,6 +571,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -638,36 +641,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_DISPATCH_FRAME_BY_JOB_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "int_gpus_min, " + - "int_gpus_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -684,6 +658,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -754,36 +729,7 @@ private static final String replaceQueryForFifo(String query) { public static final String FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -800,6 +746,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -863,36 +810,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "int_gpus_min, " + - "int_gpus_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -909,6 +827,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -975,36 +894,7 @@ private static final String replaceQueryForFifo(String query) { * Finds the next frame in a job for a proc. */ public static final String FIND_DISPATCH_FRAME_BY_LAYER_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1021,6 +911,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1090,36 +981,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_DISPATCH_FRAME_BY_LAYER_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1136,6 +998,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1206,36 +1069,7 @@ private static final String replaceQueryForFifo(String query) { public static final String FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1252,6 +1086,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1315,36 +1150,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER (ORDER BY " + @@ -1361,6 +1167,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java index b17ae14e3..c2af24e0f 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -172,6 +173,11 @@ else if (cached.isExpired()) { return bookableShows.get(key).shows; } + private String handleInClause(String key, String query, int inValueLength) { + String placeholders = String.join(",", Collections.nCopies(inValueLength, "?")); + return query.replace(key + " IN ?", key + " IN (" + placeholders + ")"); + } + private Set findDispatchJobs(DispatchHost host, int numJobs, boolean shuffleShows) { LinkedHashSet result = new LinkedHashSet(); List shows = new LinkedList(getBookableShows(host)); @@ -216,20 +222,24 @@ private Set findDispatchJobs(DispatchHost host, int numJobs, boolean shu @Override public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { - PreparedStatement find_jobs_stmt = conn.prepareStatement( - FIND_JOBS_BY_SHOW_NO_GPU); - find_jobs_stmt.setString(1, s.getShowId()); - find_jobs_stmt.setString(2, host.getFacilityId()); - find_jobs_stmt.setString(3, host.os); - find_jobs_stmt.setInt(4, host.idleCores); - find_jobs_stmt.setLong(5, host.idleMemory); - find_jobs_stmt.setInt(6, threadMode(host.threadMode)); - find_jobs_stmt.setString(7, host.getName()); - find_jobs_stmt.setInt(8, numJobs * 10); + String query = handleInClause("str_os", FIND_JOBS_BY_SHOW_NO_GPU, host.getOs().length); + PreparedStatement find_jobs_stmt = conn.prepareStatement(query); + + int index = 1; + find_jobs_stmt.setString(index++, s.getShowId()); + find_jobs_stmt.setString(index++, host.getFacilityId()); + for (String item : host.getOs()) { + find_jobs_stmt.setString(index++, item); + } + find_jobs_stmt.setInt(index++, host.idleCores); + find_jobs_stmt.setLong(index++, host.idleMemory); + find_jobs_stmt.setInt(index++, threadMode(host.threadMode)); + find_jobs_stmt.setString(index++, host.getName()); + find_jobs_stmt.setInt(index++, numJobs * 10); return find_jobs_stmt; }}, PKJOB_MAPPER )); - prometheusMetrics.setBookingDurationMetric("findDispatchJobs nogpu findByShowQuery", + prometheusMetrics.setBookingDurationMetric("findDispatchJobs nogpu findByShowQuery", System.currentTimeMillis() - lastTime); } else { @@ -237,19 +247,22 @@ public PreparedStatement createPreparedStatement(Connection conn) @Override public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { - PreparedStatement find_jobs_stmt = conn.prepareStatement( - findByShowQuery()); - find_jobs_stmt.setString(1, s.getShowId()); - find_jobs_stmt.setString(2, host.getFacilityId()); - find_jobs_stmt.setString(3, host.os); - find_jobs_stmt.setInt(4, host.idleCores); - find_jobs_stmt.setLong(5, host.idleMemory); - find_jobs_stmt.setInt(6, threadMode(host.threadMode)); - find_jobs_stmt.setInt(7, host.idleGpus); - find_jobs_stmt.setLong(8, (host.idleGpuMemory > 0) ? 1 : 0); - find_jobs_stmt.setLong(9, host.idleGpuMemory); - find_jobs_stmt.setString(10, host.getName()); - find_jobs_stmt.setInt(11, numJobs * 10); + String query = handleInClause("str_os", findByShowQuery(), host.getOs().length); + PreparedStatement find_jobs_stmt = conn.prepareStatement(query); + int index = 1; + find_jobs_stmt.setString(index++, s.getShowId()); + find_jobs_stmt.setString(index++, host.getFacilityId()); + for (String item : host.getOs()) { + find_jobs_stmt.setString(index++, item); + } + find_jobs_stmt.setInt(index++, host.idleCores); + find_jobs_stmt.setLong(index++, host.idleMemory); + find_jobs_stmt.setInt(index++, threadMode(host.threadMode)); + find_jobs_stmt.setInt(index++, host.idleGpus); + find_jobs_stmt.setLong(index++, (host.idleGpuMemory > 0) ? 1 : 0); + find_jobs_stmt.setLong(index++, host.idleGpuMemory); + find_jobs_stmt.setString(index++, host.getName()); + find_jobs_stmt.setInt(index++, numJobs * 10); return find_jobs_stmt; }}, PKJOB_MAPPER )); @@ -308,31 +321,48 @@ public Set findDispatchJobs(DispatchHost host, GroupInterface g) { long lastTime = System.currentTimeMillis(); if (host.idleGpus == 0 && (schedulingMode == SchedulingMode.BALANCED)) { + String query = handleInClause("str_os", FIND_JOBS_BY_GROUP_NO_GPU, host.getOs().length); + ArrayList args = new ArrayList(); + + args.add(g.getGroupId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.getName()); + args.add(50); result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_GROUP_NO_GPU, - PKJOB_MAPPER, - g.getGroupId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.getName(), 50)); + query, + PKJOB_MAPPER, args.toArray())); prometheusMetrics.setBookingDurationMetric("findDispatchJobs by group nogpu query", System.currentTimeMillis() - lastTime); } else { + String query = handleInClause("str_os", findByGroupQuery(), host.getOs().length); + ArrayList args = new ArrayList(); + + args.add(g.getGroupId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.idleGpus); + args.add(host.idleGpuMemory > 0 ? 1 : 0); + args.add(host.idleGpuMemory); + args.add(host.getName()); + args.add(50); result.addAll(getJdbcTemplate().query( - findByGroupQuery(), - PKJOB_MAPPER, - g.getGroupId(),host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.idleGpus, - (host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory, - host.getName(), 50)); + query, + PKJOB_MAPPER, args.toArray())); prometheusMetrics.setBookingDurationMetric("findDispatchJobs by group query", System.currentTimeMillis() - lastTime); - } - return result; } @@ -515,26 +545,47 @@ public Set findDispatchJobs(DispatchHost host, LinkedHashSet result = new LinkedHashSet(numJobs); long start = System.currentTimeMillis(); if (host.idleGpus == 0 && (schedulingMode == SchedulingMode.BALANCED)) { + String query = handleInClause("str_os", FIND_JOBS_BY_SHOW_NO_GPU, host.getOs().length); + ArrayList args = new ArrayList(); + args.add(show.getShowId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.getName()); + args.add(numJobs * 10); + result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_SHOW_NO_GPU, - PKJOB_MAPPER, - show.getShowId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.getName(), numJobs * 10)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findDispatchJobs by show nogpu query", System.currentTimeMillis() - start); } else { + String query = handleInClause("str_os", findByShowQuery(), host.getOs().length); + ArrayList args = new ArrayList(); + args.add(show.getShowId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.idleGpus); + args.add(host.idleGpuMemory > 0 ? 1 : 0); + args.add(host.idleGpuMemory); + args.add(host.getName()); + args.add(numJobs * 10); + result.addAll(getJdbcTemplate().query( - findByShowQuery(), - PKJOB_MAPPER, - show.getShowId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.idleGpus, - (host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory, - host.getName(), numJobs * 10)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findDispatchJobs by show query", System.currentTimeMillis() - start); } @@ -548,11 +599,24 @@ public Set findDispatchJobs(DispatchHost host, public Set findLocalDispatchJobs(DispatchHost host) { LinkedHashSet result = new LinkedHashSet(5); long start = System.currentTimeMillis(); + + String query = handleInClause("str_os", FIND_JOBS_BY_LOCAL, host.getOs().length); + ArrayList args = new ArrayList(); + args.add(host.getHostId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.getHostId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_LOCAL, - PKJOB_MAPPER, - host.getHostId(), host.getFacilityId(), - host.os, host.getHostId(), host.getFacilityId(), host.os)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findLocalDispatchJobs query", System.currentTimeMillis() - start); return result; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 0546d4558..9e0f6f80c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -331,6 +331,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { frame.minGpuMemory = rs.getLong("int_gpu_mem_min"); frame.version = rs.getInt("int_version"); frame.services = rs.getString("str_services"); + frame.os = rs.getString("str_os"); return frame; } }; @@ -347,6 +348,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { "job.str_user,"+ "job.int_uid,"+ "job.str_log_dir,"+ + "job.str_os,"+ "frame.str_name AS frame_name, "+ "frame.str_state AS frame_state, "+ "frame.pk_frame, "+ diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java index 244a4778b..20fe7b1ef 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java @@ -219,7 +219,7 @@ public DispatchHost mapRow(ResultSet rs, int rowNum) throws SQLException { host.isNimby = rs.getBoolean("b_nimby"); host.threadMode = rs.getInt("int_thread_mode"); host.tags = rs.getString("str_tags"); - host.os = rs.getString("str_os"); + host.setOs(rs.getString("str_os")); host.hardwareState = HardwareState.valueOf(rs.getString("str_state")); return host; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java index cf54eb85d..2a597cfeb 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java @@ -356,9 +356,10 @@ public VirtualProc mapRow(ResultSet rs, int rowNum) throws SQLException { "proc.int_virt_max_used,"+ "proc.int_virt_used,"+ "host.str_name AS host_name, " + - "host_stat.str_os " + + "job.str_os " + "FROM " + - "proc," + + "proc, " + + "job, " + "host, " + "host_stat, " + "alloc " + @@ -367,7 +368,9 @@ public VirtualProc mapRow(ResultSet rs, int rowNum) throws SQLException { "AND " + "host.pk_host = host_stat.pk_host " + "AND " + - "host.pk_alloc = alloc.pk_alloc "; + "host.pk_alloc = alloc.pk_alloc " + + "AND " + + "job.pk_job = proc.pk_job "; public VirtualProc getVirtualProc(String id) { return getJdbcTemplate().queryForObject( @@ -386,7 +389,7 @@ public VirtualProc findVirtualProc(FrameInterface frame) { "proc.*, " + "host.str_name AS host_name, " + "host.pk_alloc, " + - "host_stat.str_os, " + + "job.str_os, " + "alloc.pk_facility " + "FROM " + "proc, " + @@ -527,20 +530,23 @@ public String getCurrentFrameId(ProcInterface p) { "SELECT " + "proc.*, " + "host.str_name AS host_name, " + - "host_stat.str_os, " + + "job.str_os, " + "host.pk_alloc, " + "alloc.pk_facility " + "FROM " + "proc, " + "host, " + "host_stat,"+ - "alloc " + + "alloc, " + + "job " + "WHERE " + "proc.pk_host = host.pk_host " + "AND " + "host.pk_host = host_stat.pk_host " + "AND " + "host.pk_alloc = alloc.pk_alloc " + + "AND " + + "job.pk_job = proc.pk_job " + "AND " + "current_timestamp - proc.ts_ping > " + ORPHANED_PROC_INTERVAL; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index f60b2c1e6..0779209b0 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -395,6 +395,7 @@ public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { .setNumGpus(proc.gpusReserved) .setStartTime(System.currentTimeMillis()) .setIgnoreNimby(proc.isLocalDispatch) + .setOs(proc.os) .putAllEnvironment(jobDao.getEnvironment(frame)) .putAllEnvironment(layerDao.getLayerEnvironment(frame)) .putEnvironment("CUE3", "1") diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index e89c285bb..39423e80b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -258,7 +258,7 @@ public void handleHostReport(HostReport report, boolean isBoot) { "dispatcher.memory.mem_reserved_min", Long.class); - if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) { + if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.getOs())) { msg = String.format( "%s doesn't have enough free space in the temporary directory (mcp), %dMB", host.name, (report.getHost().getFreeMcp()/1024)); @@ -351,16 +351,19 @@ else if (!dispatchSupport.isCueBookable(host)) { * * @param tempTotalStorage Total storage on the temp directory * @param tempFreeStorage Free storage on the temp directory - * @param hostOs Reported os + * @param hostOs Reported operational systems * @return */ - private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStorage, String hostOs) { + private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStorage, String[] hostOs) { // The minimum amount of free space in the temporary directory to book a host int minAvailableTempPercentage = env.getRequiredProperty( "dispatcher.min_available_temp_storage_percentage", Integer.class); - return minAvailableTempPercentage == -1 || hostOs.equalsIgnoreCase(WINDOWS_OS) || - (((tempFreeStorage * 100.0) / tempTotalStorage) >= minAvailableTempPercentage); + return minAvailableTempPercentage == -1 + // It is safe to assume multiple OSs imply windows is not the base OS, + // threfore Windows will always report a single hostOs + || (hostOs.length == 1 && hostOs[0].equalsIgnoreCase(WINDOWS_OS)) + || (((tempFreeStorage * 100.0) / tempTotalStorage) >= minAvailableTempPercentage); } /** @@ -427,7 +430,10 @@ private boolean changeStateForTempDirStorage(DispatchHost host, RenderHost repor "dispatcher.min_available_temp_storage_percentage", Integer.class); // Prevent cue frames from booking on hosts with full temporary directories - boolean hasEnoughTempStorage = isTempDirStorageEnough(reportHost.getTotalMcp(), reportHost.getFreeMcp(), host.os); + boolean hasEnoughTempStorage = isTempDirStorageEnough( + reportHost.getTotalMcp(), + reportHost.getFreeMcp(), + host.getOs()); if (!hasEnoughTempStorage && host.hardwareState == HardwareState.UP) { // Insert a comment indicating that the Host status = Repair with reason = Full temporary directory CommentDetail c = new CommentDetail(); diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V31__increase_os_size.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V31__increase_os_size.sql new file mode 100644 index 000000000..9ad89e437 --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V31__increase_os_size.sql @@ -0,0 +1,3 @@ +-- Increase size of os column on host_stat + +ALTER TABLE host_stat ALTER COLUMN str_os TYPE VARCHAR(32); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java index 5b7eaee72..1ff849473 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java @@ -203,7 +203,7 @@ public void testFindNextDispatchFrameByProc() { assertNotNull(frame); assertEquals("0001-pass_1", frame.name); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -235,7 +235,7 @@ public void testFindNextDispatchFramesByProc() { DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -288,7 +288,7 @@ public void testFindNextDispatchFramesByProcAndJobLocal() { assertEquals(10, frames.size()); DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; proc.isLocalDispatch = true; @@ -310,7 +310,7 @@ public void testFindNextDispatchFramesByProcAndLayerLocal() { assertEquals(10, frames.size()); DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; proc.isLocalDispatch = true; @@ -406,7 +406,7 @@ public void testfindUnderProcedJob() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job1.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -442,7 +442,7 @@ public void testHigherPriorityJobExistsTrue() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -476,7 +476,7 @@ public void testHigherPriorityJobExistsFalse() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -511,7 +511,7 @@ public void testHigherPriorityJobExistsMaxProcBound() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -525,4 +525,46 @@ public void testHigherPriorityJobExistsMaxProcBound() { public void testFifoSchedulingEnabled() { assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.PRIORITY_ONLY); } + + @Test + @Transactional + @Rollback(true) + public void testFindDispatchJobsByShowMultiOs() { + DispatchHost host = getHost(); + // Set multiple Os and confirm jobs with Linux are still being found + final JobDetail job = getJob1(); + assertNotNull(job); + + // Host with different os + host.setOs("centos7,SomethingElse"); + Set jobs = dispatcherDao.findDispatchJobs(host, + adminManager.findShowEntity("pipe"), 5); + assertTrue(jobs.size() == 0); + + // Host with Linux Os (same as defined on spec) + host.setOs("centos7,Linux,rocky9"); + jobs = dispatcherDao.findDispatchJobs(host, + adminManager.findShowEntity("pipe"), 5); + assertTrue(jobs.size() > 0); + } + + @Test + @Transactional + @Rollback(true) + public void testFindDispatchJobsAllShowsMultiOs() { + DispatchHost host = getHost(); + // Set multiple Os and confirm jobs with Linux are still being found + final JobDetail job = getJob1(); + assertNotNull(job); + + // Host with incompatible OS shouldn't find any job + host.setOs("centos7,SomethingElse"); + Set jobs = dispatcherDao.findDispatchJobs(host, 5); + assertTrue(jobs.size() == 0); + + // Host with Linux Os (same as defined on spec) should find jobs + host.setOs("centos7,Linux,rocky9"); + jobs = dispatcherDao.findDispatchJobs(host, 5); + assertTrue(jobs.size() > 0); + } } diff --git a/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd b/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd new file mode 100644 index 000000000..8bbcbf6f1 --- /dev/null +++ b/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd @@ -0,0 +1,104 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml b/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml index 2c372eff2..b656f499f 100644 --- a/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml +++ b/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml @@ -18,7 +18,7 @@ - + spi @@ -30,9 +30,10 @@ false + Linux - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 @@ -44,7 +45,7 @@ - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 @@ -61,9 +62,10 @@ false + Linux - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 diff --git a/proto/rqd.proto b/proto/rqd.proto index 1ff75a4be..73216c6da 100644 --- a/proto/rqd.proto +++ b/proto/rqd.proto @@ -114,6 +114,7 @@ message RunFrame { map attributes = 22; int32 num_gpus = 23; report.ChildrenProcStats children = 24; + string os = 25; } message RunFrameSeq { diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..b055a1807 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.pyright] +venvPath = "." +venv = "venv" diff --git a/requirements.txt b/requirements.txt index 4fc14043b..946853794 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,6 @@ pytest==8.3.3 # Optional requirements # Sentry support for rqd -sentry-sdk==2.11.0 \ No newline at end of file +sentry-sdk==2.11.0 + +docker==7.1.0 \ No newline at end of file diff --git a/rqd/rqd.example.conf b/rqd/rqd.example.conf index e51782272..870419b24 100644 --- a/rqd/rqd.example.conf +++ b/rqd/rqd.example.conf @@ -27,3 +27,21 @@ SYSTEMDRIVE MAYA_MODULE_PATH MAYA_SCRIPT_PATH PIXAR_LICENSE_FILE + +[docker.config] +# Setting this to True requires all the additional "docker.[]" sections to be filled +RUN_ON_DOCKER=False +DOCKER_SHELL_PATH=/usr/bin/sh + +# This section is only required if RUN_ON_DOCKER=True +# List of volume mounts following docker run's format, but replacing = with : +[docker.mounts] +TEMP=type:bind,source:/tmp,target:/tmp,bind-propagation:slave +NET=type:bind,source:/net,target:/net,bind-propagation:slave + +# This section is only required if RUN_ON_DOCKER=True +# - keys represent OSs this rqd is capable of executing jobs in +# - values are docker image tags +[docker.images] +centos7=centos7.3:latest +rocky9=rocky9.3:latest diff --git a/rqd/rqd/rqconstants.py b/rqd/rqd/rqconstants.py index 4537b3113..64116710e 100644 --- a/rqd/rqd/rqconstants.py +++ b/rqd/rqd/rqconstants.py @@ -156,6 +156,12 @@ SP_OS = platform.system() +# Docker mode config +RUN_ON_DOCKER = False +DOCKER_IMAGES = {} +DOCKER_MOUNTS = [] +DOCKER_SHELL_PATH = "/bin/sh" + try: if os.path.isfile(CONFIG_FILE): # Hostname can come from here: rqutil.getHostname() @@ -237,6 +243,83 @@ if config.has_section(__host_env_var_section): RQD_HOST_ENV_VARS = config.options(__host_env_var_section) + __docker_mounts = "docker.mounts" + __docker_config = "docker.config" + __docker_images = "docker.images" + + if config.has_section(__docker_config): + RUN_ON_DOCKER = config.getboolean(__docker_config, "RUN_ON_DOCKER") + if RUN_ON_DOCKER: + import docker + import docker.models + import docker.types + + # rqd needs to run as root to be able to run docker + RQD_UID = 0 + RQD_GID = 0 + + # Path to the shell to be used in the frame environment + if config.has_option(__docker_config, "DOCKER_SHELL_PATH"): + DOCKER_SHELL_PATH = config.get( + __docker_config, + "DOCKER_SHELL_PATH") + + # Every key:value on the config file under docker.images + # is parsed as key=SP_OS and value=image_tag. + # SP_OS is set to a list of all available keys + # For example: + # + # rqd.conf + # [docker.images] + # centos7=centos7.3:latest + # rocky9=rocky9.3:latest + # + # becomes: + # SP_OS=centos7,rocky9 + # DOCKER_IMAGES={ + # "centos7": "centos7.3:latest", + # "rocky9": "rocky9.3:latest" + # } + keys = config.options(__docker_images) + DOCKER_IMAGES = {} + for key in keys: + DOCKER_IMAGES[key] = config.get(__docker_images, key) + SP_OS = ",".join(keys) + if not DOCKER_IMAGES: + raise RuntimeError("Misconfigured rqd. RUN_ON_DOCKER=True requires at " + "least one image on DOCKER_IMAGES ([docker.images] " + "section of rqd.conf)") + def parse_mount(mount_string): + """ + Parse mount definitions similar to a docker run command into a docker + mount obj + + Format: type=bind,source=/tmp,target=/tmp,bind-propagation=slave + """ + parsed_mounts = {} + # bind-propagation defaults to None as only type=bind accepts it + parsed_mounts["bind-propagation"] = None + for item in mount_string.split(","): + name, mount_path = item.split(":") + parsed_mounts[name.strip()] = mount_path.strip() + return parsed_mounts + + # Parse values under the category docker.mounts into Mount objects + mounts = config.options(__docker_mounts) + for mount_name in mounts: + mount_str = "" + try: + mount_str = config.get(__docker_mounts, mount_name) + mount_dict = parse_mount(mount_str) + mount = docker.types.Mount(mount_dict["target"], + mount_dict["source"], + type=mount_dict["type"], + propagation=mount_dict["bind-propagation"]) + DOCKER_MOUNTS.append(mount) + except KeyError as e: + logging.exception("Failed to create Mount for key=%s, value=%s", + mount_name, mount_str) + # pylint: disable=broad-except except Exception as e: logging.warning( diff --git a/rqd/rqd/rqcore.py b/rqd/rqd/rqcore.py index f3d42e475..5beb73b76 100644 --- a/rqd/rqd/rqcore.py +++ b/rqd/rqd/rqcore.py @@ -35,6 +35,9 @@ import time import traceback import select +import uuid + +from docker.errors import APIError, ImageNotFound import rqd.compiled_proto.host_pb2 import rqd.compiled_proto.report_pb2 @@ -51,1071 +54,1326 @@ log = logging.getLogger(__name__) -class FrameAttendantThread(threading.Thread): - """Once a frame has been received and checked by RQD, this class handles - the launching, waiting on, and cleanup work related to running the - frame.""" - def __init__(self, rqCore, runFrame, frameInfo): - """FrameAttendantThread class initialization - @type rqCore: RqCore - @param rqCore: Main RQD Object - @type runFrame: RunFrame - @param runFrame: rqd_pb2.RunFrame - @type frameInfo: rqd.rqnetwork.RunningFrame - @param frameInfo: Servant for running frame - """ - threading.Thread.__init__(self) - self.rqCore = rqCore - self.frameId = runFrame.frame_id - self.runFrame = runFrame - self.startTime = 0 - self.endTime = 0 - self.frameInfo = frameInfo - self._tempLocations = [] - self.rqlog = None +class RqCore(object): + """Main body of RQD, handles the integration of all components, + the setup and launching of a frame and acts on all gRPC calls + that are passed from the Network module.""" - def __createEnvVariables(self): - """Define the environmental variables for the frame""" - # If linux specific, they need to move into self.runLinux() - # pylint: disable=attribute-defined-outside-init - self.frameEnv = {} - self.frameEnv["PATH"] = self.rqCore.machine.getPathEnv() - self.frameEnv["TERM"] = "unknown" - self.frameEnv["TZ"] = self.rqCore.machine.getTimezone() - self.frameEnv["USER"] = self.runFrame.user_name - self.frameEnv["LOGNAME"] = self.runFrame.user_name - self.frameEnv["mcp"] = "1" - self.frameEnv["show"] = self.runFrame.show - self.frameEnv["shot"] = self.runFrame.shot - self.frameEnv["jobid"] = self.runFrame.job_name - self.frameEnv["jobhost"] = self.rqCore.machine.getHostname() - self.frameEnv["frame"] = self.runFrame.frame_name - self.frameEnv["zframe"] = self.runFrame.frame_name - self.frameEnv["logfile"] = self.runFrame.log_file - self.frameEnv["maxframetime"] = "0" - self.frameEnv["minspace"] = "200" - self.frameEnv["CUE3"] = "True" - self.frameEnv["CUE_GPU_MEMORY"] = str(self.rqCore.machine.getGpuMemoryFree()) - self.frameEnv["SP_NOMYCSHRC"] = "1" + def __init__(self, optNimbyoff=False): + """RqCore class initialization""" + self.__whenIdle = False + self.__reboot = False - if rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX: - self.frameEnv["HOME"] = "%s/%s" % ( - rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX, - self.runFrame.user_name) + self.__optNimbyoff = optNimbyoff - if rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX: - self.frameEnv["MAIL"] = "%s/%s" % ( - rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX, - self.runFrame.user_name) + self.cores = rqd.compiled_proto.report_pb2.CoreDetail( + total_cores=0, + idle_cores=0, + locked_cores=0, + booked_cores=0, + reserved_cores=[], + ) - if platform.system() == "Windows": - for variable in ["SYSTEMROOT", "APPDATA", "TMP", "COMMONPROGRAMFILES", "SYSTEMDRIVE"]: - if variable in os.environ: - self.frameEnv[variable] = os.environ[variable] - for variable in rqd.rqconstants.RQD_HOST_ENV_VARS: - # Fallback to empty string, easy to spot what is missing in the log - self.frameEnv[variable] = os.environ.get(variable, '') + self.nimby = rqd.rqnimby.NimbyFactory.getNimby(self) - for key, value in self.runFrame.environment.items(): - if key == 'PATH': - self.frameEnv[key] += os.pathsep + value - else: - self.frameEnv[key] = value + self.machine = rqd.rqmachine.Machine(self, self.cores) - # Add threads to use all assigned hyper-threading cores - if 'CPU_LIST' in self.runFrame.attributes and 'CUE_THREADS' in self.frameEnv: - self.frameEnv['CUE_THREADS'] = str(max( - int(self.frameEnv['CUE_THREADS']), - len(self.runFrame.attributes['CPU_LIST'].split(',')))) - self.frameEnv['CUE_HT'] = "True" + self.network = rqd.rqnetwork.Network(self) + self.__threadLock = threading.Lock() + self.__cache = {} + self.updateRssThread = None + self.onIntervalThread = None + self.intervalStartTime = None + self.intervalSleepTime = rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC - # Add GPU's to use all assigned GPU cores - if 'GPU_LIST' in self.runFrame.attributes: - self.frameEnv['CUE_GPU_CORES'] = self.runFrame.attributes['GPU_LIST'] + # pylint: disable=unused-private-member + self.__cluster = None + self.__session = None + self.__stmt = None - # pylint: disable=inconsistent-return-statements - def _createCommandFile(self, command): - """Creates a file that subprocess. Popen then executes. - @type command: string - @param command: The command specified in the runFrame request - @rtype: string - @return: Command file location""" - # TODO: this should use tempfile to create the files and clean them up afterwards - commandFile = None - try: - if platform.system() == "Windows": - rqd_tmp_dir = os.path.join(tempfile.gettempdir(), 'rqd') - try: - os.mkdir(rqd_tmp_dir) - except OSError: - pass # okay, already exists + self.docker = None + self.docker_mounts = [] + self.docker_images = {} + self.docker_lock = threading.Lock() + if rqd.rqconstants.RUN_ON_DOCKER: + # pylint: disable=import-outside-toplevel + import docker + self.docker = docker + self.docker_images = rqd.rqconstants.DOCKER_IMAGES + self.docker_mounts = rqd.rqconstants.DOCKER_MOUNTS + self.handleFrameImages() - # Windows Batch needs some characters escaped: - command = command.replace('%', '%%') - for char in '^&<>|': - command = command.replace(char, '^' + char) + signal.signal(signal.SIGINT, self.handleExit) + signal.signal(signal.SIGTERM, self.handleExit) - commandFile = os.path.join( - rqd_tmp_dir, - 'cmd-%s-%s.bat' % (self.runFrame.frame_id, time.time())) + def start(self): + """Called by main to start the rqd service""" + if self.machine.isDesktop(): + if self.__optNimbyoff: + log.warning('Nimby startup has been disabled via --nimbyoff') + elif not rqd.rqconstants.OVERRIDE_NIMBY: + if rqd.rqconstants.OVERRIDE_NIMBY is None: + log.warning('OVERRIDE_NIMBY is not defined, Nimby startup has been disabled') + else: + log.warning('OVERRIDE_NIMBY is False, Nimby startup has been disabled') else: - commandFile = os.path.join(tempfile.gettempdir(), - 'rqd-cmd-%s-%s' % (self.runFrame.frame_id, time.time())) - with open(commandFile, "w", encoding='utf-8') as rqexe: - self._tempLocations.append(commandFile) - rqexe.write(command) - rqexe.close() - os.chmod(commandFile, 0o777) - return commandFile - # pylint: disable=broad-except - except Exception as e: - log.critical( - "Unable to make command file: %s due to %s at %s", - commandFile, e, traceback.extract_tb(sys.exc_info()[2])) + self.nimbyOn() + elif rqd.rqconstants.OVERRIDE_NIMBY: + log.warning('Nimby startup has been triggered by OVERRIDE_NIMBY') + self.nimbyOn() + self.network.start_grpc() - def __writeHeader(self): - """Writes the frame's log header""" + def grpcConnected(self): + """After gRPC connects to the cuebot, this function is called""" + self.network.reportRqdStartup(self.machine.getBootReport()) - self.startTime = time.time() + self.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) + self.updateRssThread.start() - try: - print("="*59, file=self.rqlog) - print("RenderQ JobSpec %s" % time.ctime(self.startTime), "\n", file=self.rqlog) - print("proxy rqd.rqnetwork.RunningFrame/%s -t:tcp -h %s -p 10021" % ( - self.runFrame.frame_id, - self.rqCore.machine.getHostname()), file=self.rqlog) - print("%-21s%s" % ("command", self.runFrame.command), file=self.rqlog) - print("%-21s%s" % ("uid", self.runFrame.uid), file=self.rqlog) - print("%-21s%s" % ("gid", self.runFrame.gid), file=self.rqlog) - print("%-21s%s" % ("logDestination", - self.runFrame.log_dir_file), file=self.rqlog) - print("%-21s%s" % ("cwd", self.runFrame.frame_temp_dir), file=self.rqlog) - print("%-21s%s" % ("renderHost", - self.rqCore.machine.getHostname()), file=self.rqlog) - print("%-21s%s" % ("jobId", self.runFrame.job_id), file=self.rqlog) - print("%-21s%s" % ("frameId", self.runFrame.frame_id), file=self.rqlog) - for env in sorted(self.frameEnv): - print("%-21s%s=%s" % ("env", env, self.frameEnv[env]), file=self.rqlog) - print("="*59, file=self.rqlog) + self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) + self.intervalStartTime = time.time() + self.onIntervalThread.start() - if 'CPU_LIST' in self.runFrame.attributes: - print('Hyper-threading enabled', file=self.rqlog) + log.warning('RQD Started') + + def onInterval(self, sleepTime=None): + """This is called by self.grpcConnected as a timer thread to execute + every interval""" + if sleepTime is None: + self.intervalSleepTime = random.randint( + rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC, + rqd.rqconstants.RQD_MAX_PING_INTERVAL_SEC) + else: + self.intervalSleepTime = sleepTime + try: + self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) + self.intervalStartTime = time.time() + self.onIntervalThread.start() # pylint: disable=broad-except except Exception as e: log.critical( - "Unable to write header to rqlog: %s due to %s at %s", - self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) - - def __writeFooter(self): - """Writes frame's log footer""" + 'Unable to schedule a ping due to %s at %s', + e, traceback.extract_tb(sys.exc_info()[2])) - self.endTime = time.time() - self.frameInfo.runTime = int(self.endTime - self.startTime) try: - print("", file=self.rqlog) - print("="*59, file=self.rqlog) - print("RenderQ Job Complete\n", file=self.rqlog) - print("%-20s%s" % ("exitStatus", self.frameInfo.exitStatus), file=self.rqlog) - print("%-20s%s" % ("exitSignal", self.frameInfo.exitSignal), file=self.rqlog) - if self.frameInfo.killMessage: - print("%-20s%s" % ("killMessage", self.frameInfo.killMessage), file=self.rqlog) - print("%-20s%s" % ("startTime", - time.ctime(self.startTime)), file=self.rqlog) - print("%-20s%s" % ("endTime", - time.ctime(self.endTime)), file=self.rqlog) - print("%-20s%s" % ("maxrss", self.frameInfo.maxRss), file=self.rqlog) - print("%-20s%s" % ("maxUsedGpuMemory", - self.frameInfo.maxUsedGpuMemory), file=self.rqlog) - print("%-20s%s" % ("utime", self.frameInfo.utime), file=self.rqlog) - print("%-20s%s" % ("stime", self.frameInfo.stime), file=self.rqlog) - print("%-20s%s" % ("renderhost", self.rqCore.machine.getHostname()), file=self.rqlog) - - print("%-20s%s" % ("maxrss (KB)", self.frameInfo.maxRss), file=self.rqlog) - for child in sorted(self.frameInfo.childrenProcs.items(), - key=lambda item: item[1]['start_time']): - print("\t%-20s%s" % (child[1]['name'], child[1]['rss']), file=self.rqlog) - print("\t%-20s%s" % ("start_time", - datetime.timedelta(seconds=child[1]["start_time"])), - file=self.rqlog) - print("\t%-20s%s" % ("cmdline", " ".join(child[1]["cmd_line"])), file=self.rqlog) - - print("="*59, file=self.rqlog) - + if self.__whenIdle and not self.__cache: + if not self.machine.isUserLoggedIn(): + self.shutdownRqdNow() + else: + log.warning('Shutdown requested but a user is logged in.') # pylint: disable=broad-except except Exception as e: - log.critical( - "Unable to write footer: %s due to %s at %s", - self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) + log.warning( + 'Unable to shutdown due to %s at %s', e, traceback.extract_tb(sys.exc_info()[2])) - def __cleanup(self): - """Cleans up temporary files""" - rqd.rqutil.permissionsHigh() try: - for location in self._tempLocations: - if os.path.isfile(location): - try: - os.remove(location) - # pylint: disable=broad-except - except Exception as e: - log.warning( - "Unable to delete file: %s due to %s at %s", - location, e, traceback.extract_tb(sys.exc_info()[2])) - finally: - rqd.rqutil.permissionsLow() - - # Close log file - try: - self.rqlog.close() + self.sendStatusReport() # pylint: disable=broad-except - except Exception as e: - log.warning( - "Unable to close file: %s due to %s at %s", - self.runFrame.log_file, e, traceback.extract_tb(sys.exc_info()[2])) + except Exception: + log.exception('Unable to send status report') - def runLinux(self): - """The steps required to handle a frame under linux""" - frameInfo = self.frameInfo - runFrame = self.runFrame + def updateRss(self): + """Triggers and schedules the updating of rss information""" + if self.__cache: + try: + self.machine.rssUpdate(self.__cache) + finally: + self.updateRssThread = threading.Timer( + rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) + self.updateRssThread.start() - self.__createEnvVariables() - self.__writeHeader() + def getFrame(self, frameId): + """Gets a frame from the cache based on frameId + @type frameId: string + @param frameId: A frame's unique Id + @rtype: rqd.rqnetwork.RunningFrame + @return: rqd.rqnetwork.RunningFrame object""" + return self.__cache[frameId] - tempStatFile = "%srqd-stat-%s-%s" % (self.rqCore.machine.getTempPath(), - frameInfo.frameId, - time.time()) - self._tempLocations.append(tempStatFile) - tempCommand = [] - if self.rqCore.machine.isDesktop(): - tempCommand += ["/bin/nice"] - tempCommand += ["/usr/bin/time", "-p", "-o", tempStatFile] + def getFrameKeys(self): + """Gets a list of all keys from the cache + @rtype: list + @return: List of all frameIds running on host""" + return list(self.__cache.keys()) - if 'CPU_LIST' in runFrame.attributes: - tempCommand += ['taskset', '-c', runFrame.attributes['CPU_LIST']] + def storeFrame(self, frameId, runningFrame): + """Stores a frame in the cache and adds the network adapter + @type frameId: string + @param frameId: A frame's unique Id + @type runningFrame: rqd.rqnetwork.RunningFrame + @param runningFrame: rqd.rqnetwork.RunningFrame object""" + with self.__threadLock: + if frameId in self.__cache: + raise rqd.rqexceptions.RqdException( + "frameId " + frameId + " is already running on this machine") + self.__cache[frameId] = runningFrame - rqd.rqutil.permissionsHigh() - try: - if rqd.rqconstants.RQD_BECOME_JOB_USER: - tempCommand += ["/bin/su", runFrame.user_name, rqd.rqconstants.SU_ARGUMENT, - '"' + self._createCommandFile(runFrame.command) + '"'] + def deleteFrame(self, frameId): + """Deletes a frame from the cache + @type frameId: string + @param frameId: A frame's unique Id""" + with self.__threadLock: + if frameId in self.__cache: + del self.__cache[frameId] + # pylint: disable=no-member + if not self.__cache and self.cores.reserved_cores: + # pylint: disable=no-member + log.error( + 'No running frames but reserved_cores is not empty: %s', + self.cores.reserved_cores) + # pylint: disable=no-member + self.cores.reserved_cores.clear() + log.info("Successfully delete frame with Id: %s", frameId) else: - tempCommand += [self._createCommandFile(runFrame.command)] - - # pylint: disable=subprocess-popen-preexec-fn,consider-using-with - frameInfo.forkedCommand = subprocess.Popen(tempCommand, - env=self.frameEnv, - cwd=self.rqCore.machine.getTempPath(), - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - preexec_fn=os.setsid) - finally: - rqd.rqutil.permissionsLow() - - frameInfo.pid = frameInfo.forkedCommand.pid + log.warning("Frame with Id: %s not found in cache", frameId) - if not self.rqCore.updateRssThread.is_alive(): - self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, - self.rqCore.updateRss) - self.rqCore.updateRssThread.start() + def killAllFrame(self, reason): + """Will execute .kill() on every frame in cache until no frames remain + @type reason: string + @param reason: Reason for requesting all frames to be killed""" - poller = select.poll() - poller.register(frameInfo.forkedCommand.stdout, select.POLLIN) - poller.register(frameInfo.forkedCommand.stderr, select.POLLIN) - while True: - for fd, event in poller.poll(): - if event & select.POLLIN: - if fd == frameInfo.forkedCommand.stdout.fileno(): - line = frameInfo.forkedCommand.stdout.readline() - elif fd == frameInfo.forkedCommand.stderr.fileno(): - line = frameInfo.forkedCommand.stderr.readline() - else: - continue - if not line: - break - self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) - if frameInfo.forkedCommand.poll() is not None: - break + if self.__cache: + log.warning( + "killAllFrame called due to: %s\n%s", reason, ",".join(self.getFrameKeys())) - returncode = frameInfo.forkedCommand.wait() + while self.__cache: + if reason.startswith("NIMBY"): + # Since this is a nimby kill, ignore any frames that are ignoreNimby + frameKeys = [ + frame.frameId for frame in list(self.__cache.values()) if not frame.ignoreNimby] + else: + frameKeys = list(self.__cache.keys()) - # Find exitStatus and exitSignal - if returncode < 0: - # Exited with a signal - frameInfo.exitStatus = 1 - frameInfo.exitSignal = -returncode - else: - frameInfo.exitStatus = returncode - frameInfo.exitSignal = 0 + if not frameKeys: + # No frames left to kill + return - try: - with open(tempStatFile, "r", encoding='utf-8') as statFile: - frameInfo.realtime = statFile.readline().split()[1] - frameInfo.utime = statFile.readline().split()[1] - frameInfo.stime = statFile.readline().split()[1] - statFile.close() - # pylint: disable=broad-except - except Exception: - pass # This happens when frames are killed + for frameKey in frameKeys: + try: + self.__cache[frameKey].kill(reason) + except KeyError: + pass + time.sleep(1) - self.__writeFooter() - self.__cleanup() + def releaseCores(self, reqRelease, releaseHT=None, releaseGpus=None): + """The requested number of cores are released + @type reqRelease: int + @param reqRelease: Number of cores to release, 100 = 1 physical core""" + with self.__threadLock: + # pylint: disable=no-member + self.cores.booked_cores -= reqRelease + maxRelease = (self.cores.total_cores - + self.cores.locked_cores - + self.cores.idle_cores - + self.cores.booked_cores) - def runWindows(self): - """The steps required to handle a frame under windows""" - frameInfo = self.frameInfo - runFrame = self.runFrame + if maxRelease > 0: + self.cores.idle_cores += min(maxRelease, reqRelease) + # pylint: enable=no-member - self.__createEnvVariables() - self.__writeHeader() + if releaseHT: + self.machine.releaseHT(releaseHT) - try: - runFrame.command = runFrame.command.replace('%{frame}', self.frameEnv['CUE_IFRAME']) - tempCommand = [self._createCommandFile(runFrame.command)] + if releaseGpus: + self.machine.releaseGpus(releaseGpus) - # pylint: disable=consider-using-with - frameInfo.forkedCommand = subprocess.Popen(tempCommand, - env=self.frameEnv, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - # pylint: disable=broad-except - except Exception: + # pylint: disable=no-member + if self.cores.idle_cores > self.cores.total_cores: log.critical( - "Failed subprocess.Popen: Due to: \n%s", - ''.join(traceback.format_exception(*sys.exc_info()))) + "idle_cores (%d) have become greater than total_cores (%d): %s at %s", + self.cores.idle_cores, self.cores.total_cores, sys.exc_info()[0], + traceback.extract_tb(sys.exc_info()[2])) + # pylint: enable=no-member - frameInfo.pid = frameInfo.forkedCommand.pid + def shutdown(self): + """Shuts down all rqd systems""" + self.nimbyOff() + if self.onIntervalThread is not None: + self.onIntervalThread.cancel() + if self.updateRssThread is not None: + self.updateRssThread.cancel() + elif self.__reboot: + log.warning("Rebooting machine by request") + self.machine.reboot() + else: + log.warning("Shutting down RQD by request. pid(%s)", os.getpid()) + self.network.stopGrpc() + # Using sys.exit would raise SystemExit, giving exception handlers a chance + # to block this + # pylint: disable=protected-access + os._exit(0) - if not self.rqCore.updateRssThread.is_alive(): - self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, - self.rqCore.updateRss) - self.rqCore.updateRssThread.start() + def handleExit(self, signalnum, flag): + """Shutdown threads and exit RQD.""" + del signalnum + del flag + self.shutdown() - while True: - output = frameInfo.forkedCommand.stdout.readline() - if not output and frameInfo.forkedCommand.poll() is not None: - break - if output: - self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + def launchFrame(self, runFrame): + """This will setup for the launch the frame specified in the arguments. + If a problem is encountered, a CueException will be thrown. + @type runFrame: RunFrame + @param runFrame: rqd_pb2.RunFrame""" + log.info("Running command %s for %s", runFrame.command, runFrame.frame_id) + log.debug(runFrame) - frameInfo.forkedCommand.wait() + # + # Check for reasons to abort launch + # - # Find exitStatus and exitSignal - returncode = frameInfo.forkedCommand.returncode - if returncode < INT32_MIN: - returncode = 303 - if returncode > INT32_MAX: - returncode = 304 - frameInfo.exitStatus = returncode - frameInfo.exitSignal = returncode - - frameInfo.realtime = 0 - frameInfo.utime = 0 - frameInfo.stime = 0 + if self.machine.state != rqd.compiled_proto.host_pb2.UP: + err = "Not launching, rqd HardwareState is not Up" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - self.__writeFooter() - self.__cleanup() + if self.__whenIdle: + err = "Not launching, rqd is waiting for idle to shutdown" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - def runDarwin(self): - """The steps required to handle a frame under mac""" - frameInfo = self.frameInfo + if self.nimby.locked and not runFrame.ignore_nimby: + err = "Not launching, rqd is lockNimby and not Ignore Nimby" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - self.__createEnvVariables() - self.__writeHeader() + if rqd.rqconstants.OVERRIDE_NIMBY and self.nimby.isNimbyActive(): + err = "Not launching, rqd is lockNimby and User is Active" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - rqd.rqutil.permissionsHigh() - try: - tempCommand = ["/usr/bin/su", frameInfo.runFrame.user_name, "-c", '"' + - self._createCommandFile(frameInfo.runFrame.command) + '"'] + if runFrame.frame_id in self.__cache: + err = "Not launching, frame is already running on this proc %s" % runFrame.frame_id + log.critical(err) + raise rqd.rqexceptions.DuplicateFrameViolationException(err) - # pylint: disable=subprocess-popen-preexec-fn,consider-using-with - frameInfo.forkedCommand = subprocess.Popen(tempCommand, - env=self.frameEnv, - cwd=self.rqCore.machine.getTempPath(), - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - preexec_fn=os.setsid) - finally: - rqd.rqutil.permissionsLow() + if runFrame.HasField("uid") and runFrame.uid <= 0: + err = "Not launching, will not run frame as uid=%d" % runFrame.uid + log.warning(err) + raise rqd.rqexceptions.InvalidUserException(err) - frameInfo.pid = frameInfo.forkedCommand.pid + if runFrame.num_cores <= 0: + err = "Not launching, numCores must be > 0" + log.warning(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - if not self.rqCore.updateRssThread.is_alive(): - self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, - self.rqCore.updateRss) - self.rqCore.updateRssThread.start() + # See if all requested cores are available + with self.__threadLock: + # pylint: disable=no-member + if self.cores.idle_cores < runFrame.num_cores: + err = "Not launching, insufficient idle cores" + log.critical(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) + # pylint: enable=no-member - while True: - output = frameInfo.forkedCommand.stdout.readline() - if not output and frameInfo.forkedCommand.poll() is not None: - break - if output: - self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + if runFrame.environment.get('CUE_THREADABLE') == '1': + reserveHT = self.machine.reserveHT(runFrame.num_cores) + if reserveHT: + runFrame.attributes['CPU_LIST'] = reserveHT - frameInfo.forkedCommand.wait() + if runFrame.num_gpus: + reserveGpus = self.machine.reserveGpus(runFrame.num_gpus) + if reserveGpus: + runFrame.attributes['GPU_LIST'] = reserveGpus - # Find exitStatus and exitSignal - returncode = frameInfo.forkedCommand.returncode - if os.WIFEXITED(returncode): - frameInfo.exitStatus = os.WEXITSTATUS(returncode) - else: - frameInfo.exitStatus = 1 - if os.WIFSIGNALED(returncode): - frameInfo.exitSignal = os.WTERMSIG(returncode) + # They must be available at this point, reserve them + # pylint: disable=no-member + self.cores.idle_cores -= runFrame.num_cores + self.cores.booked_cores += runFrame.num_cores + # pylint: enable=no-member - self.__writeFooter() - self.__cleanup() + runningFrame = rqd.rqnetwork.RunningFrame(self, runFrame) + runningFrame.frameAttendantThread = FrameAttendantThread(self, runFrame, runningFrame) + runningFrame.frameAttendantThread.start() - def runUnknown(self): - """The steps required to handle a frame under an unknown OS.""" + def getRunningFrame(self, frameId): + """Gets the currently running frame.""" + try: + return self.__cache[frameId] + except KeyError: + log.info("frameId %s is not running on this machine", frameId) + return None - def run(self): - """Thread initialization""" - log.info("Monitor frame started for frameId=%s", self.frameId) + def getCoreInfo(self): + """Gets the core info report.""" + return self.cores - runFrame = self.runFrame + def reportStatus(self): + """Replies with hostReport""" + return self.machine.getHostReport() - # pylint: disable=too-many-nested-blocks + def shutdownRqdNow(self): + """Kill all running frames and shutdown RQD""" + self.machine.state = rqd.compiled_proto.host_pb2.DOWN try: - runFrame.job_temp_dir = os.path.join(self.rqCore.machine.getTempPath(), - runFrame.job_name) - runFrame.frame_temp_dir = os.path.join(runFrame.job_temp_dir, - runFrame.frame_name) - runFrame.log_file = "%s.%s.rqlog" % (runFrame.job_name, - runFrame.frame_name) - runFrame.log_dir_file = os.path.join(runFrame.log_dir, runFrame.log_file) - - try: # Exception block for all exceptions - # Ensure permissions return to Low after this block - try: - if rqd.rqconstants.RQD_CREATE_USER_IF_NOT_EXISTS and runFrame.HasField("uid"): - rqd.rqutil.checkAndCreateUser(runFrame.user_name, - runFrame.uid, - runFrame.gid) - # Do everything as launching user: - runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID - rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid) - - # Setup frame logging - try: - self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) - self.rqlog.waitForFile() - # pylint: disable=broad-except - except Exception as e: - err = "Unable to write to %s due to %s" % (runFrame.log_dir_file, e) - raise RuntimeError(err) + self.lockAll() + self.killAllFrame("shutdownRqdNow Command") + # pylint: disable=broad-except + except Exception: + log.exception("Failed to kill frames, stopping service anyways") + if not self.__cache: + self.shutdown() - finally: - rqd.rqutil.permissionsLow() + def shutdownRqdIdle(self): + """When machine is idle, shutdown RQD""" + log.info("shutdownRqdIdle") + self.lockAll() + self.__whenIdle = True + self.sendStatusReport() + if not self.__cache: + self.shutdownRqdNow() - # Store frame in cache and register servant - self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo) + def rebootNow(self): + """Kill all running frames and reboot machine. + This is not available when a user is logged in""" + log.warning('Requested to reboot now') + if self.machine.isUserLoggedIn(): + err = ('Rebooting via RQD is not supported for a desktop machine ' + 'when a user is logged in') + log.warning(err) + raise rqd.rqexceptions.RqdException(err) + self.__reboot = True + self.shutdownRqdNow() - if platform.system() == "Linux": - self.runLinux() - elif platform.system() == "Windows": - self.runWindows() - elif platform.system() == "Darwin": - self.runDarwin() - else: - self.runUnknown() + def rebootIdle(self): + """When machine is idle, reboot it""" + log.warning('Requested to reboot machine when idle') + self.lockAll() + self.__whenIdle = True + self.__reboot = True + self.sendStatusReport() + if not self.__cache and not self.machine.isUserLoggedIn(): + self.shutdownRqdNow() + def nimbyOn(self): + """Activates nimby, does not kill any running frames until next nimby + event. Also does not unlock until sufficient idle time is reached.""" + if self.nimby and not self.nimby.active: + try: + self.nimby.run() + log.warning("Nimby has been activated") # pylint: disable=broad-except except Exception: - log.critical( - "Failed launchFrame: For %s due to: \n%s", - runFrame.frame_id, ''.join(traceback.format_exception(*sys.exc_info()))) - # Notifies the cuebot that there was an error launching - self.frameInfo.exitStatus = rqd.rqconstants.EXITSTATUS_FOR_FAILED_LAUNCH - # Delay keeps the cuebot from spamming failing booking requests - time.sleep(10) - finally: - self.rqCore.releaseCores(self.runFrame.num_cores, runFrame.attributes.get('CPU_LIST'), - runFrame.attributes.get('GPU_LIST') - if 'GPU_LIST' in self.runFrame.attributes else None) - - self.rqCore.deleteFrame(self.runFrame.frame_id) - - self.rqCore.sendFrameCompleteReport(self.frameInfo) - time_till_next = ( - (self.rqCore.intervalStartTime + self.rqCore.intervalSleepTime) - time.time()) - if time_till_next > (2 * rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC): - self.rqCore.onIntervalThread.cancel() - self.rqCore.onInterval(rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC) - - log.info("Monitor frame ended for frameId=%s", - self.runFrame.frame_id) - + self.nimby.locked = False + err = "Nimby is in the process of shutting down" + log.exception(err) + raise rqd.rqexceptions.RqdException(err) -class RqCore(object): - """Main body of RQD, handles the integration of all components, - the setup and launching of a frame and acts on all gRPC calls - that are passed from the Network module.""" + def nimbyOff(self): + """Deactivates nimby and unlocks any nimby lock""" + if self.nimby.active: + self.nimby.stop() + log.info("Nimby has been deactivated") - def __init__(self, optNimbyoff=False): - """RqCore class initialization""" - self.__whenIdle = False - self.__reboot = False + def onNimbyLock(self): + """This is called by nimby when it locks the machine. + All running frames are killed. + A new report is sent to the cuebot.""" + self.killAllFrame("NIMBY Triggered") + self.sendStatusReport() - self.__optNimbyoff = optNimbyoff + def onNimbyUnlock(self, asOf=None): + """This is called by nimby when it unlocks the machine due to sufficient + idle. A new report is sent to the cuebot. + @param asOf: Time when idle state began, if known.""" + del asOf + self.sendStatusReport() - self.cores = rqd.compiled_proto.report_pb2.CoreDetail( - total_cores=0, - idle_cores=0, - locked_cores=0, - booked_cores=0, - reserved_cores=[], - ) - - self.nimby = rqd.rqnimby.NimbyFactory.getNimby(self) - - self.machine = rqd.rqmachine.Machine(self, self.cores) - - self.network = rqd.rqnetwork.Network(self) - self.__threadLock = threading.Lock() - self.__cache = {} + def lock(self, reqLock): + """Locks the requested core. + If a locked status changes, a status report is sent to the cuebot. + @type reqLock: int + @param reqLock: Number of cores to lock, 100 = 1 physical core""" + sendUpdate = False + with self.__threadLock: + # pylint: disable=no-member + numLock = min(self.cores.total_cores - self.cores.locked_cores, + reqLock) + if numLock > 0: + self.cores.locked_cores += numLock + self.cores.idle_cores -= min(numLock, self.cores.idle_cores) + sendUpdate = True + # pylint: enable=no-member - self.updateRssThread = None - self.onIntervalThread = None - self.intervalStartTime = None - self.intervalSleepTime = rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC + log.debug(self.cores) - # pylint: disable=unused-private-member - self.__cluster = None - self.__session = None - self.__stmt = None + if sendUpdate: + self.sendStatusReport() - signal.signal(signal.SIGINT, self.handleExit) - signal.signal(signal.SIGTERM, self.handleExit) + def lockAll(self): + """"Locks all cores on the machine. + If a locked status changes, a status report is sent.""" + sendUpdate = False + with self.__threadLock: + # pylint: disable=no-member + if self.cores.locked_cores < self.cores.total_cores: + self.cores.locked_cores = self.cores.total_cores + self.cores.idle_cores = 0 + sendUpdate = True + # pylint: enable=no-member - def start(self): - """Called by main to start the rqd service""" - if self.machine.isDesktop(): - if self.__optNimbyoff: - log.warning('Nimby startup has been disabled via --nimbyoff') - elif not rqd.rqconstants.OVERRIDE_NIMBY: - if rqd.rqconstants.OVERRIDE_NIMBY is None: - log.warning('OVERRIDE_NIMBY is not defined, Nimby startup has been disabled') - else: - log.warning('OVERRIDE_NIMBY is False, Nimby startup has been disabled') - else: - self.nimbyOn() - elif rqd.rqconstants.OVERRIDE_NIMBY: - log.warning('Nimby startup has been triggered by OVERRIDE_NIMBY') - self.nimbyOn() - self.network.start_grpc() + log.debug(self.cores) - def grpcConnected(self): - """After gRPC connects to the cuebot, this function is called""" - self.network.reportRqdStartup(self.machine.getBootReport()) + if sendUpdate: + self.sendStatusReport() - self.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) - self.updateRssThread.start() + def unlock(self, reqUnlock): + """Unlocks the requested number of cores. + Also resets reboot/shutdown/restart when idle requests. + If a locked status changes, a status report is sent to the cuebot. + @type reqUnlock: int + @param reqUnlock: Number of cores to unlock, 100 = 1 physical core""" - self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) - self.intervalStartTime = time.time() - self.onIntervalThread.start() + sendUpdate = False - log.warning('RQD Started') + if (self.__whenIdle or self.__reboot or + self.machine.state != rqd.compiled_proto.host_pb2.UP): + sendUpdate = True - def onInterval(self, sleepTime=None): + self.__whenIdle = False + self.__reboot = False + self.machine.state = rqd.compiled_proto.host_pb2.UP - """This is called by self.grpcConnected as a timer thread to execute - every interval""" - if sleepTime is None: - self.intervalSleepTime = random.randint( - rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC, - rqd.rqconstants.RQD_MAX_PING_INTERVAL_SEC) - else: - self.intervalSleepTime = sleepTime - try: - self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) - self.intervalStartTime = time.time() - self.onIntervalThread.start() - # pylint: disable=broad-except - except Exception as e: - log.critical( - 'Unable to schedule a ping due to %s at %s', - e, traceback.extract_tb(sys.exc_info()[2])) + with self.__threadLock: + # pylint: disable=no-member + numUnlock = min(self.cores.locked_cores, reqUnlock) + if numUnlock > 0: + self.cores.locked_cores -= numUnlock + self.cores.idle_cores += numUnlock + sendUpdate = True + # pylint: enable=no-member - try: - if self.__whenIdle and not self.__cache: - if not self.machine.isUserLoggedIn(): - self.shutdownRqdNow() - else: - log.warning('Shutdown requested but a user is logged in.') - # pylint: disable=broad-except - except Exception as e: - log.warning( - 'Unable to shutdown due to %s at %s', e, traceback.extract_tb(sys.exc_info()[2])) + log.debug(self.cores) - try: + if sendUpdate: self.sendStatusReport() - # pylint: disable=broad-except - except Exception as e: - log.critical( - 'Unable to send status report due to %s at %s', - e, traceback.extract_tb(sys.exc_info()[2])) - def updateRss(self): - """Triggers and schedules the updating of rss information""" - if self.__cache: - try: - self.machine.rssUpdate(self.__cache) - finally: - self.updateRssThread = threading.Timer( - rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) - self.updateRssThread.start() + def unlockAll(self): + """"Unlocks all cores on the machine. + Also resets reboot/shutdown/restart when idle requests. + If a locked status changes, a status report is sent.""" - def getFrame(self, frameId): - """Gets a frame from the cache based on frameId - @type frameId: string - @param frameId: A frame's unique Id - @rtype: rqd.rqnetwork.RunningFrame - @return: rqd.rqnetwork.RunningFrame object""" - return self.__cache[frameId] + sendUpdate = False - def getFrameKeys(self): - """Gets a list of all keys from the cache - @rtype: list - @return: List of all frameIds running on host""" - return list(self.__cache.keys()) + if (self.__whenIdle or self.__reboot + or self.machine.state != rqd.compiled_proto.host_pb2.UP): + sendUpdate = True - def storeFrame(self, frameId, runningFrame): - """Stores a frame in the cache and adds the network adapter - @type frameId: string - @param frameId: A frame's unique Id - @type runningFrame: rqd.rqnetwork.RunningFrame - @param runningFrame: rqd.rqnetwork.RunningFrame object""" - with self.__threadLock: - if frameId in self.__cache: - raise rqd.rqexceptions.RqdException( - "frameId " + frameId + " is already running on this machine") - self.__cache[frameId] = runningFrame + self.__whenIdle = False + self.__reboot = False + self.machine.state = rqd.compiled_proto.host_pb2.UP - def deleteFrame(self, frameId): - """Deletes a frame from the cache - @type frameId: string - @param frameId: A frame's unique Id""" with self.__threadLock: - if frameId in self.__cache: - del self.__cache[frameId] - # pylint: disable=no-member - if not self.__cache and self.cores.reserved_cores: - # pylint: disable=no-member - log.error( - 'No running frames but reserved_cores is not empty: %s', - self.cores.reserved_cores) - # pylint: disable=no-member - self.cores.reserved_cores.clear() - log.info("Successfully delete frame with Id: %s", frameId) - else: - log.warning("Frame with Id: %s not found in cache", frameId) - - def killAllFrame(self, reason): - """Will execute .kill() on every frame in cache until no frames remain - @type reason: string - @param reason: Reason for requesting all frames to be killed""" + # pylint: disable=no-member + if self.cores.locked_cores > 0: + if not self.nimby.locked: + self.cores.idle_cores += self.cores.locked_cores + self.cores.locked_cores = 0 + sendUpdate = True + # pylint: enable=no-member - if self.__cache: - log.warning( - "killAllFrame called due to: %s\n%s", reason, ",".join(self.getFrameKeys())) + log.debug(self.cores) - while self.__cache: - if reason.startswith("NIMBY"): - # Since this is a nimby kill, ignore any frames that are ignoreNimby - frameKeys = [ - frame.frameId for frame in list(self.__cache.values()) if not frame.ignoreNimby] - else: - frameKeys = list(self.__cache.keys()) + if sendUpdate: + self.sendStatusReport() - if not frameKeys: - # No frames left to kill - return + def sendStatusReport(self): + """Sends the current host report to Cuebot.""" + self.network.reportStatus(self.machine.getHostReport()) - for frameKey in frameKeys: - try: - self.__cache[frameKey].kill(reason) - except KeyError: - pass - time.sleep(1) + def isWaitingForIdle(self): + """Returns whether the host is waiting until idle to take some action.""" + return self.__whenIdle - def releaseCores(self, reqRelease, releaseHT=None, releaseGpus=None): - """The requested number of cores are released - @type reqRelease: int - @param reqRelease: Number of cores to release, 100 = 1 physical core""" - with self.__threadLock: + def sendFrameCompleteReport(self, runningFrame): + """Send a frameCompleteReport to Cuebot""" + if not runningFrame.completeReportSent: + report = rqd.compiled_proto.report_pb2.FrameCompleteReport() # pylint: disable=no-member - self.cores.booked_cores -= reqRelease - maxRelease = (self.cores.total_cores - - self.cores.locked_cores - - self.cores.idle_cores - - self.cores.booked_cores) - - if maxRelease > 0: - self.cores.idle_cores += min(maxRelease, reqRelease) + report.host.CopyFrom(self.machine.getHostInfo()) + report.frame.CopyFrom(runningFrame.runningFrameInfo()) # pylint: enable=no-member - if releaseHT: - self.machine.releaseHT(releaseHT) + if runningFrame.exitStatus is None: + report.exit_status = 1 + else: + report.exit_status = runningFrame.exitStatus - if releaseGpus: - self.machine.releaseGpus(releaseGpus) + report.exit_signal = runningFrame.exitSignal + report.run_time = int(runningFrame.runTime) - # pylint: disable=no-member - if self.cores.idle_cores > self.cores.total_cores: + # If nimby is active, then frame must have been killed by nimby + # Set the exitSignal to indicate this event + if self.nimby.locked and not runningFrame.ignoreNimby: + report.exit_status = rqd.rqconstants.EXITSTATUS_FOR_NIMBY_KILL + + self.network.reportRunningFrameCompletion(report) + runningFrame.completeReportSent = True + + def sanitizeFrames(self): + """ + Iterate over the cache and update the status of frames that might have + completed but never reported back to cuebot. + """ + for frameId in list(self.__cache.keys()): + runningFrame = self.__cache[frameId] + # If the frame was marked as completed (exitStatus) and a report has not been sent + # try to file the report again + if runningFrame.exitStatus is not None and not runningFrame.completeReportSent: + try: + self.sendFrameCompleteReport(runningFrame) + self.deleteFrame(frameId) + log.info("Successfully deleted frame from cache for %s/%s (%s)", + runningFrame.runFrame.job_name, + runningFrame.runFrame.frame_name, + frameId) + # pylint: disable=broad-except + except Exception: + log.exception("Failed to sanitize frame %s/%s", + runningFrame.runFrame.job_name, + runningFrame.runFrame.frame_name) + + def handleFrameImages(self): + """ + Download docker images to be used by frames running on this host + """ + if self.docker: + docker_client = self.docker.from_env() + for image in self.docker_images.values(): + log.info("Downloading frame image: %s", image) + try: + name, tag = image.split(":") + docker_client.images.pull(name, tag) + except (ImageNotFound, APIError) as e: + raise RuntimeError("Failed to download frame docker image for %s:%s - %s" % + (name, tag, e)) + log.info("Finished downloading frame images") + + +class FrameAttendantThread(threading.Thread): + """Once a frame has been received and checked by RQD, this class handles + the launching, waiting on, and cleanup work related to running the + frame.""" + def __init__(self, rqCore: RqCore, runFrame, frameInfo): + """FrameAttendantThread class initialization + @type rqCore: RqCore + @param rqCore: Main RQD Object + @type runFrame: RunFrame + @param runFrame: rqd_pb2.RunFrame + @type frameInfo: rqd.rqnetwork.RunningFrame + @param frameInfo: Servant for running frame + """ + threading.Thread.__init__(self) + self.rqCore = rqCore + self.frameId = runFrame.frame_id + self.runFrame = runFrame + self.startTime = 0 + self.endTime = 0 + self.frameInfo = frameInfo + self._tempLocations = [] + self.rqlog = None + + def __createEnvVariables(self): + """Define the environmental variables for the frame""" + # If linux specific, they need to move into self.runLinux() + # pylint: disable=attribute-defined-outside-init + self.frameEnv = {} + self.frameEnv["PATH"] = self.rqCore.machine.getPathEnv() + self.frameEnv["TERM"] = "unknown" + self.frameEnv["TZ"] = self.rqCore.machine.getTimezone() + self.frameEnv["USER"] = self.runFrame.user_name + self.frameEnv["LOGNAME"] = self.runFrame.user_name + self.frameEnv["mcp"] = "1" + self.frameEnv["show"] = self.runFrame.show + self.frameEnv["shot"] = self.runFrame.shot + self.frameEnv["jobid"] = self.runFrame.job_name + self.frameEnv["jobhost"] = self.rqCore.machine.getHostname() + self.frameEnv["frame"] = self.runFrame.frame_name + self.frameEnv["zframe"] = self.runFrame.frame_name + self.frameEnv["logfile"] = self.runFrame.log_file + self.frameEnv["maxframetime"] = "0" + self.frameEnv["minspace"] = "200" + self.frameEnv["CUE3"] = "True" + self.frameEnv["CUE_GPU_MEMORY"] = str(self.rqCore.machine.getGpuMemoryFree()) + self.frameEnv["SP_NOMYCSHRC"] = "1" + + if rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX: + self.frameEnv["HOME"] = "%s/%s" % ( + rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX, + self.runFrame.user_name) + + if rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX: + self.frameEnv["MAIL"] = "%s/%s" % ( + rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX, + self.runFrame.user_name) + + if platform.system() == "Windows": + for variable in ["SYSTEMROOT", "APPDATA", "TMP", "COMMONPROGRAMFILES", "SYSTEMDRIVE"]: + if variable in os.environ: + self.frameEnv[variable] = os.environ[variable] + for variable in rqd.rqconstants.RQD_HOST_ENV_VARS: + # Fallback to empty string, easy to spot what is missing in the log + self.frameEnv[variable] = os.environ.get(variable, '') + + + if platform.system() == "Windows": + for variable in ["SYSTEMROOT", "APPDATA", "TMP", "COMMONPROGRAMFILES", "SYSTEMDRIVE"]: + if variable in os.environ: + self.frameEnv[variable] = os.environ[variable] + for variable in rqd.rqconstants.RQD_HOST_ENV_VARS: + # Fallback to empty string, easy to spot what is missing in the log + self.frameEnv[variable] = os.environ.get(variable, '') + + for key, value in self.runFrame.environment.items(): + if key == 'PATH': + self.frameEnv[key] += os.pathsep + value + else: + self.frameEnv[key] = value + + # Add threads to use all assigned hyper-threading cores + if 'CPU_LIST' in self.runFrame.attributes and 'CUE_THREADS' in self.frameEnv: + self.frameEnv['CUE_THREADS'] = str(max( + int(self.frameEnv['CUE_THREADS']), + len(self.runFrame.attributes['CPU_LIST'].split(',')))) + self.frameEnv['CUE_HT'] = "True" + + # Add GPU's to use all assigned GPU cores + if 'GPU_LIST' in self.runFrame.attributes: + self.frameEnv['CUE_GPU_CORES'] = self.runFrame.attributes['GPU_LIST'] + + # pylint: disable=inconsistent-return-statements + def _createCommandFile(self, command): + """Creates a file that subprocess. Popen then executes. + @type command: string + @param command: The command specified in the runFrame request + @rtype: string + @return: Command file location""" + commandFile = "" + try: + if platform.system() == "Windows": + rqd_tmp_dir = os.path.join(tempfile.gettempdir(), 'rqd') + try: + os.mkdir(rqd_tmp_dir) + except OSError: + pass # okay, already exists + + # Windows Batch needs some characters escaped: + command = command.replace('%', '%%') + for char in '^&<>|': + command = command.replace(char, '^' + char) + + commandFile = os.path.join( + rqd_tmp_dir, + 'cmd-%s-%s.bat' % (self.runFrame.frame_id, time.time())) + else: + commandFile = os.path.join(tempfile.gettempdir(), + 'rqd-cmd-%s-%s' % (self.runFrame.frame_id, time.time())) + with open(commandFile, "w", encoding='utf-8') as rqexe: + self._tempLocations.append(commandFile) + rqexe.write(command) + rqexe.close() + os.chmod(commandFile, 0o777) + return commandFile + # pylint: disable=broad-except + except Exception as e: log.critical( - "idle_cores (%d) have become greater than total_cores (%d): %s at %s", - self.cores.idle_cores, self.cores.total_cores, sys.exc_info()[0], - traceback.extract_tb(sys.exc_info()[2])) - # pylint: enable=no-member + "Unable to make command file: %s due to %s at %s", + commandFile, e, traceback.extract_tb(sys.exc_info()[2])) + raise e - def shutdown(self): - """Shuts down all rqd systems""" - self.nimbyOff() - if self.onIntervalThread is not None: - self.onIntervalThread.cancel() - if self.updateRssThread is not None: - self.updateRssThread.cancel() - elif self.__reboot: - log.warning("Rebooting machine by request") - self.machine.reboot() - else: - log.warning("Shutting down RQD by request. pid(%s)", os.getpid()) - self.network.stopGrpc() - # Using sys.exit would raise SystemExit, giving exception handlers a chance - # to block this - # pylint: disable=protected-access - os._exit(0) + def __writeHeader(self): + """Writes the frame's log header""" - def handleExit(self, signalnum, flag): - """Shutdown threads and exit RQD.""" - del signalnum - del flag - self.shutdown() + self.startTime = time.time() + + try: + print("="*59, file=self.rqlog) + print("RenderQ JobSpec %s" % time.ctime(self.startTime), "\n", file=self.rqlog) + print("proxy rqd.rqnetwork.RunningFrame/%s -t:tcp -h %s -p 10021" % ( + self.runFrame.frame_id, + self.rqCore.machine.getHostname()), file=self.rqlog) + print("%-21s%s" % ("command", self.runFrame.command), file=self.rqlog) + print("%-21s%s" % ("uid", self.runFrame.uid), file=self.rqlog) + print("%-21s%s" % ("gid", self.runFrame.gid), file=self.rqlog) + print("%-21s%s" % ("logDestination", + self.runFrame.log_dir_file), file=self.rqlog) + print("%-21s%s" % ("cwd", self.runFrame.frame_temp_dir), file=self.rqlog) + print("%-21s%s" % ("renderHost", + self.rqCore.machine.getHostname()), file=self.rqlog) + print("%-21s%s" % ("jobId", self.runFrame.job_id), file=self.rqlog) + print("%-21s%s" % ("frameId", self.runFrame.frame_id), file=self.rqlog) + for env in sorted(self.frameEnv): + print("%-21s%s=%s" % ("env", env, self.frameEnv[env]), file=self.rqlog) + print("="*59, file=self.rqlog) + + if 'CPU_LIST' in self.runFrame.attributes: + print('Hyper-threading enabled', file=self.rqlog) + + # pylint: disable=broad-except + except Exception as e: + log.critical( + "Unable to write header to rqlog: %s due to %s at %s", + self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) + + def __writeFooter(self): + """Writes frame's log footer""" + + self.endTime = time.time() + self.frameInfo.runTime = int(self.endTime - self.startTime) + try: + print("", file=self.rqlog) + print("="*59, file=self.rqlog) + print("RenderQ Job Complete\n", file=self.rqlog) + print("%-20s%s" % ("exitStatus", self.frameInfo.exitStatus), file=self.rqlog) + print("%-20s%s" % ("exitSignal", self.frameInfo.exitSignal), file=self.rqlog) + if self.frameInfo.killMessage: + print("%-20s%s" % ("killMessage", self.frameInfo.killMessage), file=self.rqlog) + print("%-20s%s" % ("startTime", + time.ctime(self.startTime)), file=self.rqlog) + print("%-20s%s" % ("endTime", + time.ctime(self.endTime)), file=self.rqlog) + print("%-20s%s" % ("maxrss", self.frameInfo.maxRss), file=self.rqlog) + print("%-20s%s" % ("maxUsedGpuMemory", + self.frameInfo.maxUsedGpuMemory), file=self.rqlog) + print("%-20s%s" % ("utime", self.frameInfo.utime), file=self.rqlog) + print("%-20s%s" % ("stime", self.frameInfo.stime), file=self.rqlog) + print("%-20s%s" % ("renderhost", self.rqCore.machine.getHostname()), file=self.rqlog) + + print("%-20s%s" % ("maxrss (KB)", self.frameInfo.maxRss), file=self.rqlog) + for child in sorted(self.frameInfo.childrenProcs.items(), + key=lambda item: item[1]['start_time']): + print("\t%-20s%s" % (child[1]['name'], child[1]['rss']), file=self.rqlog) + print("\t%-20s%s" % ("start_time", + datetime.timedelta(seconds=child[1]["start_time"])), + file=self.rqlog) + print("\t%-20s%s" % ("cmdline", " ".join(child[1]["cmd_line"])), file=self.rqlog) + + print("="*59, file=self.rqlog) + + # pylint: disable=broad-except + except Exception as e: + log.critical( + "Unable to write footer: %s due to %s at %s", + self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) + + def __cleanup(self): + """Cleans up temporary files""" + rqd.rqutil.permissionsHigh() + try: + for location in self._tempLocations: + if os.path.isfile(location): + try: + os.remove(location) + # pylint: disable=broad-except + except Exception as e: + log.warning( + "Unable to delete file: %s due to %s at %s", + location, e, traceback.extract_tb(sys.exc_info()[2])) + finally: + rqd.rqutil.permissionsLow() + + # Close log file + try: + self.rqlog.close() + # pylint: disable=broad-except + except Exception as e: + log.warning( + "Unable to close file: %s due to %s at %s", + self.runFrame.log_file, e, traceback.extract_tb(sys.exc_info()[2])) + + def runLinux(self): + """The steps required to handle a frame under linux""" + frameInfo = self.frameInfo + runFrame = self.runFrame + + self.__createEnvVariables() + self.__writeHeader() + + tempStatFile = "%srqd-stat-%s-%s" % (self.rqCore.machine.getTempPath(), + frameInfo.frameId, + time.time()) + self._tempLocations.append(tempStatFile) + tempCommand = [] + if self.rqCore.machine.isDesktop(): + tempCommand += ["/bin/nice"] + tempCommand += ["/usr/bin/time", "-p", "-o", tempStatFile] + + if 'CPU_LIST' in runFrame.attributes: + tempCommand += ['taskset', '-c', runFrame.attributes['CPU_LIST']] - def launchFrame(self, runFrame): - """This will setup for the launch the frame specified in the arguments. - If a problem is encountered, a CueException will be thrown. - @type runFrame: RunFrame - @param runFrame: rqd_pb2.RunFrame""" - log.info("Running command %s for %s", runFrame.command, runFrame.frame_id) - log.debug(runFrame) + rqd.rqutil.permissionsHigh() + try: + if rqd.rqconstants.RQD_BECOME_JOB_USER: + tempCommand += ["/bin/su", runFrame.user_name, rqd.rqconstants.SU_ARGUMENT, + '"' + self._createCommandFile(runFrame.command) + '"'] + else: + tempCommand += [self._createCommandFile(runFrame.command)] - # - # Check for reasons to abort launch - # + # pylint: disable=subprocess-popen-preexec-fn,consider-using-with + frameInfo.forkedCommand = subprocess.Popen(tempCommand, + env=self.frameEnv, + cwd=self.rqCore.machine.getTempPath(), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + preexec_fn=os.setsid) + finally: + rqd.rqutil.permissionsLow() - if self.machine.state != rqd.compiled_proto.host_pb2.UP: - err = "Not launching, rqd HardwareState is not Up" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + frameInfo.pid = frameInfo.forkedCommand.pid - if self.__whenIdle: - err = "Not launching, rqd is waiting for idle to shutdown" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + if not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() - if self.nimby.locked and not runFrame.ignore_nimby: - err = "Not launching, rqd is lockNimby and not Ignore Nimby" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + poller = select.poll() + poller.register(frameInfo.forkedCommand.stdout, select.POLLIN) + poller.register(frameInfo.forkedCommand.stderr, select.POLLIN) + while True: + for fd, event in poller.poll(): + if event & select.POLLIN: + if fd == frameInfo.forkedCommand.stdout.fileno(): + line = frameInfo.forkedCommand.stdout.readline() + elif fd == frameInfo.forkedCommand.stderr.fileno(): + line = frameInfo.forkedCommand.stderr.readline() + else: + continue + if not line: + break + self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + if frameInfo.forkedCommand.poll() is not None: + break - if rqd.rqconstants.OVERRIDE_NIMBY and self.nimby.isNimbyActive(): - err = "Not launching, rqd is lockNimby and User is Active" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + returncode = frameInfo.forkedCommand.wait() - if runFrame.frame_id in self.__cache: - err = "Not launching, frame is already running on this proc %s" % runFrame.frame_id - log.critical(err) - raise rqd.rqexceptions.DuplicateFrameViolationException(err) + # Find exitStatus and exitSignal + if returncode < 0: + # Exited with a signal + frameInfo.exitStatus = 1 + frameInfo.exitSignal = -returncode + else: + frameInfo.exitStatus = returncode + frameInfo.exitSignal = 0 - if runFrame.HasField("uid") and runFrame.uid <= 0: - err = "Not launching, will not run frame as uid=%d" % runFrame.uid - log.warning(err) - raise rqd.rqexceptions.InvalidUserException(err) + try: + with open(tempStatFile, "r", encoding='utf-8') as statFile: + frameInfo.realtime = statFile.readline().split()[1] + frameInfo.utime = statFile.readline().split()[1] + frameInfo.stime = statFile.readline().split()[1] + statFile.close() + # pylint: disable=broad-except + except Exception: + pass # This happens when frames are killed - if runFrame.num_cores <= 0: - err = "Not launching, numCores must be > 0" - log.warning(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + self.__writeFooter() + self.__cleanup() - # See if all requested cores are available - with self.__threadLock: - # pylint: disable=no-member - if self.cores.idle_cores < runFrame.num_cores: - err = "Not launching, insufficient idle cores" - log.critical(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) - # pylint: enable=no-member + def runDocker(self): + """The steps required to handle a frame under a docker container""" + frameInfo = self.frameInfo + runFrame = self.runFrame - if runFrame.environment.get('CUE_THREADABLE') == '1': - reserveHT = self.machine.reserveHT(runFrame.num_cores) - if reserveHT: - runFrame.attributes['CPU_LIST'] = reserveHT + # Ensure Nullable attributes have been initialized + if not self.rqlog: + raise RuntimeError("Invalid state. rqlog has not been initialized") + if not self.rqCore.docker: + raise RuntimeError("Invalid state: docker_client must have been initialized.") - if runFrame.num_gpus: - reserveGpus = self.machine.reserveGpus(runFrame.num_gpus) - if reserveGpus: - runFrame.attributes['GPU_LIST'] = reserveGpus + try: + image = self.__getFrameImage(runFrame.os) + except RuntimeError as e: + self.__writeHeader() + self.rqlog.write(str(e), prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + raise e - # They must be available at this point, reserve them - # pylint: disable=no-member - self.cores.idle_cores -= runFrame.num_cores - self.cores.booked_cores += runFrame.num_cores - # pylint: enable=no-member + self.__createEnvVariables() + self.__writeHeader() - runningFrame = rqd.rqnetwork.RunningFrame(self, runFrame) - runningFrame.frameAttendantThread = FrameAttendantThread(self, runFrame, runningFrame) - runningFrame.frameAttendantThread.start() + tempStatFile = "%srqd-stat-%s-%s" % (self.rqCore.machine.getTempPath(), + frameInfo.frameId, + time.time()) + self._tempLocations.append(tempStatFile) - def getRunningFrame(self, frameId): - """Gets the currently running frame.""" + # Prevent frame from attempting to run as ROOT + if runFrame.gid <= 0: + gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID + else: + gid = runFrame.gid + + # Never give frame ROOT permissions + if runFrame.uid == 0 or gid == 0: + msg = ("Frame %s cannot run as ROOT" % frameInfo.frameId) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + raise RuntimeError(msg) + + # Thread affinity + tasksetCmd = "" + if runFrame.attributes['CPU_LIST']: + tasksetCmd = "taskset -c %s" % runFrame.attributes['CPU_LIST'] + + # A temporary password for the user created inside of the frame container. + # This user is only valid inside the container, meaning a leakage would only + # be harmful if the perpetrator gains access to run docker commands. + tempPassword = str(uuid.uuid4()) + # Command wrapper + command = r"""#!/bin/sh +useradd -u %s -g %s -p %s %s >& /dev/null || true; +exec su -s %s %s -c "echo \$$; /bin/nice /usr/bin/time -p -o %s %s %s" +""" % ( + runFrame.uid, + gid, + tempPassword, + runFrame.user_name, + rqd.rqconstants.DOCKER_SHELL_PATH, + runFrame.user_name, + tempStatFile, + tasksetCmd, + runFrame.command.replace('"', r"""\"""") + ) + + # Log entrypoint on frame log to simplify replaying frames + self.rqlog.write("DOCKER_ENTRYPOINT = %s" % + # Mask password + command.replace(tempPassword, "[password]").replace(";", "\n"), + prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + # Write command to a file on the job tmpdir to simplify replaying a frame + command = self._createCommandFile(command) + docker_client = self.rqCore.docker.from_env() + container = None + container_id = "00000000" + frameInfo.pid = -1 try: - return self.__cache[frameId] - except KeyError: - log.info("frameId %s is not running on this machine", frameId) - return None + log_stream = None + with self.rqCore.docker_lock: + container = docker_client.containers.run(image=image, + detach=True, + environment=self.frameEnv, + working_dir=self.rqCore.machine.getTempPath(), + mounts=self.rqCore.docker_mounts, + privileged=True, + pid_mode="host", + network="host", + stderr=True, + hostname=self.frameEnv["jobhost"], + entrypoint=command) + + log_stream = container.logs(stream=True) + + if not container or not log_stream: + raise RuntimeError("Container failed to start for %s.%s(%s)" % ( + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId)) + + # Try to get the cmd pid from top if the container is still running. + # If that fails the pid can be acquired from the first line of the log + try: + # Docker SDK type hint states that `top` returns an str + # when in reality it returns a Dict {"Processes": [[]], "Columns": [[]]} + container_top: dict = container.top() + frameInfo.pid = int(container_top["Processes"][0][1]) + except (APIError, TypeError): + for first_line in log_stream: + frameInfo.pid = int(first_line) + break + + # Log frame start info + msg = "Container %s started for %s.%s(%s) with pid %s" % ( + container.short_id, + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId, + frameInfo.pid) + + log.info(msg) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + # Ping rss thread on rqCore + if self.rqCore.updateRssThread and not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() + + # Atatch to the job and follow the logs + for line in log_stream: + self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + output = container.wait() + returncode = output["StatusCode"] + except StopIteration: + # This exception can happen when a container is interrupted + # If frame pid is set it means the container has started successfully + if frameInfo.pid and container: + output = container.wait() + returncode = output["StatusCode"] + else: + returncode = -1 + container_id = container.short_id if container else -1 + msg = "Failed to read frame container logs on %s for %s.%s(%s)" % ( + container_id, + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId) + logging.error(msg) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + # pylint: disable=broad-except + except Exception as e: + returncode = -1 + msg = "Failed to launch frame container" + logging.exception(msg) + self.rqlog.write("%s - %s" % (msg, e), + prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + finally: + # Clear up container after if finishes + if container: + container_id = container.short_id + container.remove() + docker_client.close() - def getCoreInfo(self): - """Gets the core info report.""" - return self.cores + # Find exitStatus and exitSignal + if returncode < 0: + # Exited with a signal + frameInfo.exitStatus = 1 + frameInfo.exitSignal = -returncode + else: + frameInfo.exitStatus = returncode + frameInfo.exitSignal = 0 - def reportStatus(self): - """Replies with hostReport""" - return self.machine.getHostReport() + # Log frame start info + log.warning("Frame %s.%s(%s) with pid %s finished on container %s with exitStatus %s %s ", + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId, + frameInfo.pid, + container_id, + frameInfo.exitStatus, + "" if frameInfo.exitStatus == 0 else " - " + runFrame.log_dir_file) - def shutdownRqdNow(self): - """Kill all running frames and shutdown RQD""" - self.machine.state = rqd.compiled_proto.host_pb2.DOWN try: - self.lockAll() - self.killAllFrame("shutdownRqdNow Command") + with open(tempStatFile, "r", encoding='utf-8') as statFile: + frameInfo.realtime = statFile.readline().split()[1] + frameInfo.utime = statFile.readline().split()[1] + frameInfo.stime = statFile.readline().split()[1] + statFile.close() # pylint: disable=broad-except except Exception: - log.exception("Failed to kill frames, stopping service anyways") - if not self.__cache: - self.shutdown() + pass # This happens when frames are killed - def shutdownRqdIdle(self): - """When machine is idle, shutdown RQD""" - log.info("shutdownRqdIdle") - self.lockAll() - self.__whenIdle = True - self.sendStatusReport() - if not self.__cache: - self.shutdownRqdNow() + self.__writeFooter() + self.__cleanup() - def rebootNow(self): - """Kill all running frames and reboot machine. - This is not available when a user is logged in""" - log.warning('Requested to reboot now') - if self.machine.isUserLoggedIn(): - err = ('Rebooting via RQD is not supported for a desktop machine ' - 'when a user is logged in') - log.warning(err) - raise rqd.rqexceptions.RqdException(err) - self.__reboot = True - self.shutdownRqdNow() + def __getFrameImage(self, frame_os=None): + """ + Get the pre-configured image for the given frame_os. - def rebootIdle(self): - """When machine is idle, reboot it""" - log.warning('Requested to reboot machine when idle') - self.lockAll() - self.__whenIdle = True - self.__reboot = True - self.sendStatusReport() - if not self.__cache and not self.machine.isUserLoggedIn(): - self.shutdownRqdNow() + Raises: + RuntimeError - if a suitable image cannot be found + """ + if frame_os: + image = self.rqCore.docker_images.get(frame_os) + if image is None: + raise RuntimeError("This rqd is not configured to run an image " + "for this frame OS: %s. Check the [docker.images] " + "section of rqd.conf for more information." % frame_os) + return image + if self.rqCore.docker_images: + # If a frame doesn't require an specic OS, default to the first configured OS on + # [docker.images] + return list(self.rqCore.docker_images.values())[0] + + raise RuntimeError("Misconfigured rqd. RUN_ON_DOCKER=True requires at " + "least one image on DOCKER_IMAGES ([docker.images] section of rqd.conf)") - def nimbyOn(self): - """Activates nimby, does not kill any running frames until next nimby - event. Also does not unlock until sufficient idle time is reached.""" - if self.nimby and not self.nimby.active: - try: - self.nimby.run() - log.warning("Nimby has been activated") - # pylint: disable=broad-except - except Exception: - self.nimby.locked = False - err = "Nimby is in the process of shutting down" - log.exception(err) - raise rqd.rqexceptions.RqdException(err) + def runWindows(self): + """The steps required to handle a frame under windows""" + frameInfo = self.frameInfo + runFrame = self.runFrame - def nimbyOff(self): - """Deactivates nimby and unlocks any nimby lock""" - if self.nimby.active: - self.nimby.stop() - log.info("Nimby has been deactivated") + self.__createEnvVariables() + self.__writeHeader() - def onNimbyLock(self): - """This is called by nimby when it locks the machine. - All running frames are killed. - A new report is sent to the cuebot.""" - self.killAllFrame("NIMBY Triggered") - self.sendStatusReport() + try: + runFrame.command = runFrame.command.replace('%{frame}', self.frameEnv['CUE_IFRAME']) + tempCommand = [self._createCommandFile(runFrame.command)] - def onNimbyUnlock(self, asOf=None): - """This is called by nimby when it unlocks the machine due to sufficient - idle. A new report is sent to the cuebot. - @param asOf: Time when idle state began, if known.""" - del asOf - self.sendStatusReport() + # pylint: disable=consider-using-with + frameInfo.forkedCommand = subprocess.Popen(tempCommand, + env=self.frameEnv, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # pylint: disable=broad-except + except Exception: + log.critical( + "Failed subprocess.Popen: Due to: \n%s", + ''.join(traceback.format_exception(*sys.exc_info()))) + + frameInfo.pid = frameInfo.forkedCommand.pid - def lock(self, reqLock): - """Locks the requested core. - If a locked status changes, a status report is sent to the cuebot. - @type reqLock: int - @param reqLock: Number of cores to lock, 100 = 1 physical core""" - sendUpdate = False - with self.__threadLock: - # pylint: disable=no-member - numLock = min(self.cores.total_cores - self.cores.locked_cores, - reqLock) - if numLock > 0: - self.cores.locked_cores += numLock - self.cores.idle_cores -= min(numLock, self.cores.idle_cores) - sendUpdate = True - # pylint: enable=no-member + if not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() - log.debug(self.cores) + while True: + output = frameInfo.forkedCommand.stdout.readline() + if not output and frameInfo.forkedCommand.poll() is not None: + break + if output: + self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) - if sendUpdate: - self.sendStatusReport() + frameInfo.forkedCommand.wait() - def lockAll(self): - """"Locks all cores on the machine. - If a locked status changes, a status report is sent.""" - sendUpdate = False - with self.__threadLock: - # pylint: disable=no-member - if self.cores.locked_cores < self.cores.total_cores: - self.cores.locked_cores = self.cores.total_cores - self.cores.idle_cores = 0 - sendUpdate = True - # pylint: enable=no-member + # Find exitStatus and exitSignal + returncode = frameInfo.forkedCommand.returncode + if returncode < INT32_MIN: + returncode = 303 + if returncode > INT32_MAX: + returncode = 304 + frameInfo.exitStatus = returncode + frameInfo.exitSignal = returncode - log.debug(self.cores) + frameInfo.realtime = 0 + frameInfo.utime = 0 + frameInfo.stime = 0 - if sendUpdate: - self.sendStatusReport() + self.__writeFooter() + self.__cleanup() - def unlock(self, reqUnlock): - """Unlocks the requested number of cores. - Also resets reboot/shutdown/restart when idle requests. - If a locked status changes, a status report is sent to the cuebot. - @type reqUnlock: int - @param reqUnlock: Number of cores to unlock, 100 = 1 physical core""" + def runDarwin(self): + """The steps required to handle a frame under mac""" + frameInfo = self.frameInfo - sendUpdate = False + self.__createEnvVariables() + self.__writeHeader() - if (self.__whenIdle or self.__reboot or - self.machine.state != rqd.compiled_proto.host_pb2.UP): - sendUpdate = True + rqd.rqutil.permissionsHigh() + try: + tempCommand = ["/usr/bin/su", frameInfo.runFrame.user_name, "-c", '"' + + self._createCommandFile(frameInfo.runFrame.command) + '"'] - self.__whenIdle = False - self.__reboot = False - self.machine.state = rqd.compiled_proto.host_pb2.UP + # pylint: disable=subprocess-popen-preexec-fn,consider-using-with + frameInfo.forkedCommand = subprocess.Popen(tempCommand, + env=self.frameEnv, + cwd=self.rqCore.machine.getTempPath(), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid) + finally: + rqd.rqutil.permissionsLow() - with self.__threadLock: - # pylint: disable=no-member - numUnlock = min(self.cores.locked_cores, reqUnlock) - if numUnlock > 0: - self.cores.locked_cores -= numUnlock - self.cores.idle_cores += numUnlock - sendUpdate = True - # pylint: enable=no-member + frameInfo.pid = frameInfo.forkedCommand.pid - log.debug(self.cores) + if not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() - if sendUpdate: - self.sendStatusReport() + while True: + output = frameInfo.forkedCommand.stdout.readline() + if not output and frameInfo.forkedCommand.poll() is not None: + break + if output: + self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) - def unlockAll(self): - """"Unlocks all cores on the machine. - Also resets reboot/shutdown/restart when idle requests. - If a locked status changes, a status report is sent.""" + frameInfo.forkedCommand.wait() - sendUpdate = False + # Find exitStatus and exitSignal + returncode = frameInfo.forkedCommand.returncode + if os.WIFEXITED(returncode): + frameInfo.exitStatus = os.WEXITSTATUS(returncode) + else: + frameInfo.exitStatus = 1 + if os.WIFSIGNALED(returncode): + frameInfo.exitSignal = os.WTERMSIG(returncode) - if (self.__whenIdle or self.__reboot - or self.machine.state != rqd.compiled_proto.host_pb2.UP): - sendUpdate = True + self.__writeFooter() + self.__cleanup() - self.__whenIdle = False - self.__reboot = False - self.machine.state = rqd.compiled_proto.host_pb2.UP + def runUnknown(self): + """The steps required to handle a frame under an unknown OS.""" - with self.__threadLock: - # pylint: disable=no-member - if self.cores.locked_cores > 0: - if not self.nimby.locked: - self.cores.idle_cores += self.cores.locked_cores - self.cores.locked_cores = 0 - sendUpdate = True - # pylint: enable=no-member + def run(self): + """Thread initialization""" + log.info("Monitor frame started for frameId=%s", self.frameId) - log.debug(self.cores) + runFrame = self.runFrame + run_on_docker = self.rqCore.docker is not None - if sendUpdate: - self.sendStatusReport() + # pylint: disable=too-many-nested-blocks + try: + runFrame.job_temp_dir = os.path.join(self.rqCore.machine.getTempPath(), + runFrame.job_name) + runFrame.frame_temp_dir = os.path.join(runFrame.job_temp_dir, + runFrame.frame_name) + runFrame.log_file = "%s.%s.rqlog" % (runFrame.job_name, + runFrame.frame_name) + runFrame.log_dir_file = os.path.join(runFrame.log_dir, runFrame.log_file) - def sendStatusReport(self): - """Sends the current host report to Cuebot.""" - self.network.reportStatus(self.machine.getHostReport()) + try: # Exception block for all exceptions + # Ensure permissions return to Low after this block + try: + if rqd.rqconstants.RQD_CREATE_USER_IF_NOT_EXISTS and runFrame.HasField("uid"): + rqd.rqutil.checkAndCreateUser(runFrame.user_name, + runFrame.uid, + runFrame.gid) + if not run_on_docker: + # Do everything as launching user: + runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID + rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid) - def isWaitingForIdle(self): - """Returns whether the host is waiting until idle to take some action.""" - return self.__whenIdle + # Setup frame logging + try: + self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) + self.rqlog.waitForFile() + # pylint: disable=broad-except + except Exception as e: + err = "Unable to write to %s due to %s" % (runFrame.log_dir_file, e) + raise RuntimeError(err) - def sendFrameCompleteReport(self, runningFrame): - """Send a frameCompleteReport to Cuebot""" - if not runningFrame.completeReportSent: - report = rqd.compiled_proto.report_pb2.FrameCompleteReport() - # pylint: disable=no-member - report.host.CopyFrom(self.machine.getHostInfo()) - report.frame.CopyFrom(runningFrame.runningFrameInfo()) - # pylint: enable=no-member + finally: + rqd.rqutil.permissionsLow() - if runningFrame.exitStatus is None: - report.exit_status = 1 - else: - report.exit_status = runningFrame.exitStatus + # Store frame in cache and register servant + self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo) - report.exit_signal = runningFrame.exitSignal - report.run_time = int(runningFrame.runTime) + if run_on_docker: + self.runDocker() + elif platform.system() == "Linux": + self.runLinux() + elif platform.system() == "Windows": + self.runWindows() + elif platform.system() == "Darwin": + self.runDarwin() + else: + self.runUnknown() - # If nimby is active, then frame must have been killed by nimby - # Set the exitSignal to indicate this event - if self.nimby.locked and not runningFrame.ignoreNimby: - report.exit_status = rqd.rqconstants.EXITSTATUS_FOR_NIMBY_KILL + # pylint: disable=broad-except + except Exception: + log.critical( + "Failed launchFrame: For %s due to: \n%s", + runFrame.frame_id, ''.join(traceback.format_exception(*sys.exc_info()))) + # Notifies the cuebot that there was an error launching + self.frameInfo.exitStatus = rqd.rqconstants.EXITSTATUS_FOR_FAILED_LAUNCH + # Delay keeps the cuebot from spamming failing booking requests + time.sleep(10) + finally: + self.rqCore.releaseCores(self.runFrame.num_cores, runFrame.attributes.get('CPU_LIST'), + runFrame.attributes.get('GPU_LIST') + if 'GPU_LIST' in self.runFrame.attributes else None) - self.network.reportRunningFrameCompletion(report) - runningFrame.completeReportSent = True + self.rqCore.deleteFrame(self.runFrame.frame_id) - def sanitizeFrames(self): - """ - Iterate over the cache and update the status of frames that might have - completed but never reported back to cuebot. - """ - for frameId in list(self.__cache.keys()): - runningFrame = self.__cache[frameId] - # If the frame was marked as completed (exitStatus) and a report has not been sent - # try to file the report again - if runningFrame.exitStatus is not None and not runningFrame.completeReportSent: - try: - self.sendFrameCompleteReport(runningFrame) - self.deleteFrame(frameId) - log.info("Successfully deleted frame from cache for %s/%s (%s)", - runningFrame.runFrame.job_name, - runningFrame.runFrame.frame_name, - frameId) - # pylint: disable=broad-except - except Exception: - log.exception("Failed to sanitize frame %s/%s", - runningFrame.runFrame.job_name, - runningFrame.runFrame.frame_name) + self.rqCore.sendFrameCompleteReport(self.frameInfo) + time_till_next = ( + (self.rqCore.intervalStartTime + self.rqCore.intervalSleepTime) - time.time()) + if time_till_next > (2 * rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC): + self.rqCore.onIntervalThread.cancel() + self.rqCore.onInterval(rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC) + + log.info("Monitor frame ended for frameId=%s", + self.runFrame.frame_id) diff --git a/rqd/rqd/rqmachine.py b/rqd/rqd/rqmachine.py index 0687858c7..a6d9a6317 100644 --- a/rqd/rqd/rqmachine.py +++ b/rqd/rqd/rqmachine.py @@ -302,7 +302,7 @@ def rssUpdate(self, frames): if re.search(r"\d+", child_statm_fields[1]) else -1 # pylint: disable=broad-except - except (OSError, IOError): + except (OSError, IOError, psutil.ZombieProcess): # Many Linux processes are ephemeral and will disappear before we're able # to read them. This is not typically indicative of a problem. log.debug('Failed to read stat/statm file for pid %s', pid) @@ -315,7 +315,7 @@ def rssUpdate(self, frames): values = list(frames.values()) for frame in values: - if frame.pid > 0: + if frame.pid is not None and frame.pid > 0: session = str(frame.pid) rss = 0 vsize = 0 @@ -870,7 +870,7 @@ def reserveHT(self, frameCores): if frameCores % 100: log.warning('Taskset: Can not reserveHT with fractional cores') return None - log.warning('Taskset: Requesting reserve of %d', (frameCores // 100)) + log.info('Taskset: Requesting reserve of %d', (frameCores // 100)) # Look for the most idle physical cpu. # Prefer to assign cores from the same physical cpu. diff --git a/rqd/rqd/rqutil.py b/rqd/rqd/rqutil.py index 3c11e75ff..3d8abc964 100644 --- a/rqd/rqd/rqutil.py +++ b/rqd/rqd/rqutil.py @@ -157,6 +157,9 @@ def checkAndCreateUser(username, uid=None, gid=None): cmd.append(username) log.info("Frame's username not found on host. Adding user with: %s", cmd) subprocess.check_call(cmd) + # pylint: disable=broad-except + except Exception: + logging.info("useradd failed to add user: %s. User possibly already exists.", username) finally: permissionsLow() diff --git a/rqd/tests/rqcore_test.py b/rqd/tests/rqcore_test.py index fc155a3be..82e585147 100644 --- a/rqd/tests/rqcore_test.py +++ b/rqd/tests/rqcore_test.py @@ -637,6 +637,7 @@ def test_runLinux( rqCore.machine.isDesktop.return_value = True rqCore.machine.getHostInfo.return_value = renderHost rqCore.nimby.locked = False + rqCore.docker = None children = rqd.compiled_proto.report_pb2.ChildrenProcStats() runFrame = rqd.compiled_proto.rqd_pb2.RunFrame( @@ -686,6 +687,91 @@ def test_runLinux( frameInfo ) + @mock.patch('platform.system', new=mock.Mock(return_value='Linux')) + @mock.patch('tempfile.gettempdir') + def test_runDocker(self, getTempDirMock, permsUser, timeMock, popenMock): + # given + currentTime = 1568070634.3 + jobTempPath = '/job/temp/path/' + logDir = '/path/to/log/dir/' + tempDir = '/some/random/temp/dir' + frameId = 'arbitrary-frame-id' + jobName = 'arbitrary-job-name' + frameName = 'arbitrary-frame-name' + frameUid = 928 + frameUsername = 'my-random-user' + returnCode = 0 + renderHost = rqd.compiled_proto.report_pb2.RenderHost(name='arbitrary-host-name') + logFile = os.path.join(logDir, '%s.%s.rqlog' % (jobName, frameName)) + + self.fs.create_dir(tempDir) + + timeMock.return_value = currentTime + getTempDirMock.return_value = tempDir + + rqCore = mock.MagicMock() + rqCore.intervalStartTime = 20 + rqCore.intervalSleepTime = 40 + rqCore.machine.getTempPath.return_value = jobTempPath + rqCore.machine.isDesktop.return_value = True + rqCore.machine.getHostInfo.return_value = renderHost + rqCore.nimby.locked = False + + # Setup mock docker client + rqCore.docker.from_env.return_value.\ + containers.run.return_value.wait.return_value = {"StatusCode": returnCode} + rqCore.docker_images = { + "centos7": "centos7_image", + "rocky9": "rocky9_image", + } + rqCore.docker_mounts = { + "vol1": "/vol1/mount", + "vol2": "/vol2/mount", + } + + children = rqd.compiled_proto.report_pb2.ChildrenProcStats() + + runFrame = rqd.compiled_proto.rqd_pb2.RunFrame( + frame_id=frameId, + job_name=jobName, + frame_name=frameName, + uid=frameUid, + user_name=frameUsername, + log_dir=logDir, + children=children, + environment={"ENVVAR": "env_value"}, + os="centos7") + frameInfo = rqd.rqnetwork.RunningFrame(rqCore, runFrame) + + # when + attendantThread = rqd.rqcore.FrameAttendantThread(rqCore, runFrame, frameInfo) + attendantThread.start() + attendantThread.join() + + # then + cmd_file = os.path.join(tempDir, 'rqd-cmd-%s-%s' % (runFrame.frame_id, currentTime)) + rqCore.docker.from_env.return_value.containers.run.assert_called_with( + image="centos7_image", + detach=True, + environment=mock.ANY, + working_dir=jobTempPath, + mounts=rqCore.docker_mounts, + privileged=True, + pid_mode="host", + network="host", + stderr=True, + hostname=mock.ANY, + entrypoint=cmd_file + ) + + self.assertTrue(os.path.exists(logDir)) + self.assertTrue(os.path.isfile(logFile)) + + rqCore.sendFrameCompleteReport.assert_called_with( + frameInfo + ) + + # TODO(bcipriano) Re-enable this test once Windows is supported. The main sticking point here # is that the log directory is always overridden on Windows which makes mocking difficult. @mock.patch("platform.system", new=mock.Mock(return_value="Windows")) @@ -789,6 +875,7 @@ def test_runDarwin(self, getTempDirMock, permsUser, timeMock, popenMock): rqCore.machine.isDesktop.return_value = True rqCore.machine.getHostInfo.return_value = renderHost rqCore.nimby.locked = False + rqCore.docker = None children = rqd.compiled_proto.report_pb2.ChildrenProcStats() runFrame = rqd.compiled_proto.rqd_pb2.RunFrame(