From 291b6947f82f9fb436fb17fdec512d0abaff6f10 Mon Sep 17 00:00:00 2001 From: Diego Tavares Date: Fri, 15 Nov 2024 10:10:58 -0800 Subject: [PATCH] [cuebot/rqd] Add feature to run frames on a containerized environment using docker (#1549) ### Motivation Running OpenCue In a multi operational system environment requires segregating the farm, which means hosts have to be assigned to one OS and cannot be shared between shows that have different OS requirements. This can be a challenge when sharing resources between shows is necessary. ### Proposed solution A new execution mode on **rqd** `runDocker` to live alongside `runLinux`, `runWindows`, and `runDarwin` (macOs). This mode will launch the frame command on a docker container based on the frame expected OS. With this, rqd is now able to run jobs from different OSs on the same host. But to make this possible, a rqd host needs to advertise itself not with its own OS code (defined by `SP_OS` on rqd.conf), but with all the OSs of images it is capable of executing. ### Configuration changes The following sections were added to rqd.conf: ```ini [docker.config] # Setting this to True requires all the additional "docker.[]" sections to be filled RUN_ON_DOCKER=True # This section is only required if RUN_ON_DOCKER=True # List of volume mounts following docker run's format, but replacing = with : [docker.mounts] TEMP=type:bind,source:/tmp,target:/tmp,bind-propagation:slave NET=type:bind,source:/net,target:/net,bind-propagation:slave # This section is only required if RUN_ON_DOCKER=True # - keys represent OSs this rqd is capable of executing jobs in # - values are docker image tags [docker.images] centos7=centos7.3:latest rocky9=rocky9.3:latest ``` In this case, the rqd host would advertise itself with `OS=centos7,rocky9`, and the dispatch logic has been changed accordingly to account for dispatching frames to nodes that support multiple OSs. Feature has been documented at https://github.com/AcademySoftwareFoundation/opencue.io/pull/302 --------- Signed-off-by: Diego Tavares --- .github/workflows/testing-pipeline.yml | 11 - VERSION.in | 2 +- ci/run_python_tests.sh | 1 - .../com/imageworks/spcue/DispatchFrame.java | 3 + .../com/imageworks/spcue/DispatchHost.java | 10 +- .../com/imageworks/spcue/VirtualProc.java | 8 +- .../spcue/dao/postgres/DispatchQuery.java | 297 +-- .../spcue/dao/postgres/DispatcherDaoJdbc.java | 180 +- .../spcue/dao/postgres/FrameDaoJdbc.java | 2 + .../spcue/dao/postgres/HostDaoJdbc.java | 2 +- .../spcue/dao/postgres/ProcDaoJdbc.java | 18 +- .../dispatcher/DispatchSupportService.java | 1 + .../spcue/dispatcher/HostReportHandler.java | 18 +- .../migrations/V31__increase_os_size.sql | 3 + .../test/dao/postgres/DispatcherDaoTests.java | 58 +- .../src/test/resources/conf/dtd/cjsl-1.14.dtd | 104 + .../conf/jobspec/jobspec_dispatch_test.xml | 10 +- proto/rqd.proto | 1 + pyproject.toml | 3 + requirements.txt | 4 +- rqd/rqd.example.conf | 18 + rqd/rqd/rqconstants.py | 83 + rqd/rqd/rqcore.py | 2068 +++++++++-------- rqd/rqd/rqmachine.py | 6 +- rqd/rqd/rqutil.py | 3 + rqd/tests/rqcore_test.py | 87 + 26 files changed, 1747 insertions(+), 1254 deletions(-) create mode 100644 cuebot/src/main/resources/conf/ddl/postgres/migrations/V31__increase_os_size.sql create mode 100644 cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd create mode 100644 pyproject.toml diff --git a/.github/workflows/testing-pipeline.yml b/.github/workflows/testing-pipeline.yml index dd82f4dc5..fe3ade9a6 100644 --- a/.github/workflows/testing-pipeline.yml +++ b/.github/workflows/testing-pipeline.yml @@ -7,17 +7,6 @@ on: branches: [ master ] jobs: - test_python_2022: - name: Run Python Unit Tests (CY2022) - runs-on: ubuntu-22.04 - container: aswf/ci-opencue:2022 - env: - ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true - steps: - - uses: actions/checkout@v3 - - name: Run Python Tests - run: ci/run_python_tests.sh --no-gui - test_cuebot_2022: name: Build Cuebot and Run Unit Tests (CY2022) runs-on: ubuntu-22.04 diff --git a/VERSION.in b/VERSION.in index 9f8e9b69a..b123147e2 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -1.0 \ No newline at end of file +1.1 \ No newline at end of file diff --git a/ci/run_python_tests.sh b/ci/run_python_tests.sh index 101d75288..1259adf4c 100755 --- a/ci/run_python_tests.sh +++ b/ci/run_python_tests.sh @@ -28,7 +28,6 @@ PYTHONPATH=pycue python -m unittest discover -s cueadmin/tests -t cueadmin -p "* PYTHONPATH=pycue:pyoutline python -m unittest discover -s cuesubmit/tests -t cuesubmit -p "*.py" python -m pytest rqd/tests - # Xvfb no longer supports Python 2. if [[ "$python_version" =~ "Python 3" && ${args[0]} != "--no-gui" ]]; then ci/run_gui_test.sh diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java index faa1a9c04..2c60e9930 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java @@ -49,5 +49,8 @@ public class DispatchFrame extends FrameEntity implements FrameInterface { // A comma separated list of services public String services; + + // The Operational System this frame is expected to run in + public String os; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java index f01724e17..40a3e6bbc 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchHost.java @@ -51,7 +51,7 @@ public class DispatchHost extends Entity public long gpuMemory; public long idleGpuMemory; public String tags; - public String os; + private String os; public boolean isNimby; public boolean isLocalDispatch = false; @@ -81,6 +81,14 @@ public String getFacilityId() { return facilityId; } + public String[] getOs() { + return this.os.split(","); + } + + public void setOs(String os) { + this.os = os; + } + public boolean canHandleNegativeCoresRequest(int requestedCores) { // Request is positive, no need to test further. if (requestedCores > 0) { diff --git a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java index e89ced6ce..02ade6bb4 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java @@ -85,7 +85,9 @@ public String getName() { * @param frame * @return */ - public static final VirtualProc build(DispatchHost host, DispatchFrame frame, String... selfishServices) { + public static final VirtualProc build(DispatchHost host, + DispatchFrame frame, + String... selfishServices) { VirtualProc proc = new VirtualProc(); proc.allocationId = host.getAllocationId(); proc.hostId = host.getHostId(); @@ -94,7 +96,7 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame, St proc.jobId = frame.getJobId(); proc.showId = frame.getShowId(); proc.facilityId = frame.getFacilityId(); - proc.os = host.os; + proc.os = frame.os; proc.hostName = host.getName(); proc.unbooked = false; @@ -238,7 +240,7 @@ public static final VirtualProc build(DispatchHost host, proc.jobId = frame.getJobId(); proc.showId = frame.getShowId(); proc.facilityId = frame.getFacilityId(); - proc.os = host.os; + proc.os = frame.os; proc.hostName = host.getName(); proc.unbooked = false; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java index e36f97999..02dae0f22 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java @@ -72,9 +72,9 @@ public class DispatchQuery { "AND job.pk_facility = ? " + "AND " + "(" + - "job.str_os IS NULL OR job.str_os = '' " + + "job.str_os IS NULL OR job.str_os IN '' " + "OR " + - "job.str_os = ? " + + "job.str_os IN ? " + ") " + "AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " + "AND layer.int_cores_min <= ? " + @@ -135,7 +135,7 @@ public class DispatchQuery { "(" + "job.str_os IS NULL OR job.str_os = '' " + "OR " + - "job.str_os = ? " + + "job.str_os IN ? " + ") " + "AND (CASE WHEN layer_stat.int_waiting_count > 0 THEN 1 ELSE NULL END) = 1 " + "AND layer.int_cores_min <= ? " + @@ -250,7 +250,7 @@ private static final String replaceQueryForFifo(String query) { "AND " + "job.pk_facility = ? " + "AND " + - "(job.str_os = ? OR job.str_os IS NULL) " + + "(job.str_os IN ? OR job.str_os IS NULL) " + "AND " + "job.pk_job IN ( " + "SELECT " + @@ -276,7 +276,7 @@ private static final String replaceQueryForFifo(String query) { "AND " + "j.pk_facility = ? " + "AND " + - "(j.str_os = ? OR j.str_os IS NULL) " + + "(j.str_os IN ? OR j.str_os IS NULL) " + "AND " + "(CASE WHEN lst.int_waiting_count > 0 THEN lst.pk_layer ELSE NULL END) = l.pk_layer " + "AND " + @@ -519,40 +519,42 @@ private static final String replaceQueryForFifo(String query) { ") " + "LIMIT 1"; + private static final String FIND_DISPATCH_FRAME_COLUMNS = + "show_name, " + + "job_name, " + + "pk_job, " + + "pk_show, " + + "pk_facility, " + + "str_name, " + + "str_shot, " + + "str_user, " + + "int_uid, " + + "str_log_dir, " + + "str_os, " + + "frame_name, " + + "frame_state, " + + "pk_frame, " + + "pk_layer, " + + "int_retries, " + + "int_version, " + + "layer_name, " + + "layer_type, " + + "b_threadable, " + + "int_cores_min, " + + "int_cores_max, " + + "int_mem_min, " + + "int_gpus_min, " + + "int_gpus_max, " + + "int_gpu_mem_min, " + + "str_cmd, " + + "str_range, " + + "int_chunk_size, " + + "str_services "; /** * Finds the next frame in a job for a proc. */ public static final String FIND_DISPATCH_FRAME_BY_JOB_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -569,6 +571,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -638,36 +641,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_DISPATCH_FRAME_BY_JOB_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "int_gpus_min, " + - "int_gpus_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -684,6 +658,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -754,36 +729,7 @@ private static final String replaceQueryForFifo(String query) { public static final String FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM ( " + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -800,6 +746,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -863,36 +810,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_LOCAL_DISPATCH_FRAME_BY_JOB_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "int_gpus_min, " + - "int_gpus_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -909,6 +827,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -975,36 +894,7 @@ private static final String replaceQueryForFifo(String query) { * Finds the next frame in a job for a proc. */ public static final String FIND_DISPATCH_FRAME_BY_LAYER_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1021,6 +911,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1090,36 +981,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_DISPATCH_FRAME_BY_LAYER_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1136,6 +998,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1206,36 +1069,7 @@ private static final String replaceQueryForFifo(String query) { public static final String FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_PROC = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "b_threadable, " + - "int_cores_min, " + - "int_cores_max, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER ( ORDER BY " + @@ -1252,6 +1086,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1315,36 +1150,7 @@ private static final String replaceQueryForFifo(String query) { * Find the next frame in a job for a host. */ public static final String FIND_LOCAL_DISPATCH_FRAME_BY_LAYER_AND_HOST = - "SELECT " + - "show_name, " + - "job_name, " + - "pk_job, " + - "pk_show, " + - "pk_facility, " + - "str_name, " + - "str_shot, " + - "str_user, " + - "int_uid, " + - "str_log_dir, " + - "frame_name, " + - "frame_state, " + - "pk_frame, " + - "pk_layer, " + - "int_retries, " + - "int_version, " + - "layer_name, " + - "layer_type, " + - "int_cores_min, " + - "int_cores_max, " + - "b_threadable, " + - "int_mem_min, " + - "int_gpus_min, " + - "int_gpus_max, " + - "int_gpu_mem_min, " + - "str_cmd, " + - "str_range, " + - "int_chunk_size, " + - "str_services " + + "SELECT " + FIND_DISPATCH_FRAME_COLUMNS + "FROM (" + "SELECT " + "ROW_NUMBER() OVER (ORDER BY " + @@ -1361,6 +1167,7 @@ private static final String replaceQueryForFifo(String query) { "job.str_user, " + "job.int_uid, " + "job.str_log_dir, " + + "job.str_os, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java index b17ae14e3..c2af24e0f 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatcherDaoJdbc.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -172,6 +173,11 @@ else if (cached.isExpired()) { return bookableShows.get(key).shows; } + private String handleInClause(String key, String query, int inValueLength) { + String placeholders = String.join(",", Collections.nCopies(inValueLength, "?")); + return query.replace(key + " IN ?", key + " IN (" + placeholders + ")"); + } + private Set findDispatchJobs(DispatchHost host, int numJobs, boolean shuffleShows) { LinkedHashSet result = new LinkedHashSet(); List shows = new LinkedList(getBookableShows(host)); @@ -216,20 +222,24 @@ private Set findDispatchJobs(DispatchHost host, int numJobs, boolean shu @Override public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { - PreparedStatement find_jobs_stmt = conn.prepareStatement( - FIND_JOBS_BY_SHOW_NO_GPU); - find_jobs_stmt.setString(1, s.getShowId()); - find_jobs_stmt.setString(2, host.getFacilityId()); - find_jobs_stmt.setString(3, host.os); - find_jobs_stmt.setInt(4, host.idleCores); - find_jobs_stmt.setLong(5, host.idleMemory); - find_jobs_stmt.setInt(6, threadMode(host.threadMode)); - find_jobs_stmt.setString(7, host.getName()); - find_jobs_stmt.setInt(8, numJobs * 10); + String query = handleInClause("str_os", FIND_JOBS_BY_SHOW_NO_GPU, host.getOs().length); + PreparedStatement find_jobs_stmt = conn.prepareStatement(query); + + int index = 1; + find_jobs_stmt.setString(index++, s.getShowId()); + find_jobs_stmt.setString(index++, host.getFacilityId()); + for (String item : host.getOs()) { + find_jobs_stmt.setString(index++, item); + } + find_jobs_stmt.setInt(index++, host.idleCores); + find_jobs_stmt.setLong(index++, host.idleMemory); + find_jobs_stmt.setInt(index++, threadMode(host.threadMode)); + find_jobs_stmt.setString(index++, host.getName()); + find_jobs_stmt.setInt(index++, numJobs * 10); return find_jobs_stmt; }}, PKJOB_MAPPER )); - prometheusMetrics.setBookingDurationMetric("findDispatchJobs nogpu findByShowQuery", + prometheusMetrics.setBookingDurationMetric("findDispatchJobs nogpu findByShowQuery", System.currentTimeMillis() - lastTime); } else { @@ -237,19 +247,22 @@ public PreparedStatement createPreparedStatement(Connection conn) @Override public PreparedStatement createPreparedStatement(Connection conn) throws SQLException { - PreparedStatement find_jobs_stmt = conn.prepareStatement( - findByShowQuery()); - find_jobs_stmt.setString(1, s.getShowId()); - find_jobs_stmt.setString(2, host.getFacilityId()); - find_jobs_stmt.setString(3, host.os); - find_jobs_stmt.setInt(4, host.idleCores); - find_jobs_stmt.setLong(5, host.idleMemory); - find_jobs_stmt.setInt(6, threadMode(host.threadMode)); - find_jobs_stmt.setInt(7, host.idleGpus); - find_jobs_stmt.setLong(8, (host.idleGpuMemory > 0) ? 1 : 0); - find_jobs_stmt.setLong(9, host.idleGpuMemory); - find_jobs_stmt.setString(10, host.getName()); - find_jobs_stmt.setInt(11, numJobs * 10); + String query = handleInClause("str_os", findByShowQuery(), host.getOs().length); + PreparedStatement find_jobs_stmt = conn.prepareStatement(query); + int index = 1; + find_jobs_stmt.setString(index++, s.getShowId()); + find_jobs_stmt.setString(index++, host.getFacilityId()); + for (String item : host.getOs()) { + find_jobs_stmt.setString(index++, item); + } + find_jobs_stmt.setInt(index++, host.idleCores); + find_jobs_stmt.setLong(index++, host.idleMemory); + find_jobs_stmt.setInt(index++, threadMode(host.threadMode)); + find_jobs_stmt.setInt(index++, host.idleGpus); + find_jobs_stmt.setLong(index++, (host.idleGpuMemory > 0) ? 1 : 0); + find_jobs_stmt.setLong(index++, host.idleGpuMemory); + find_jobs_stmt.setString(index++, host.getName()); + find_jobs_stmt.setInt(index++, numJobs * 10); return find_jobs_stmt; }}, PKJOB_MAPPER )); @@ -308,31 +321,48 @@ public Set findDispatchJobs(DispatchHost host, GroupInterface g) { long lastTime = System.currentTimeMillis(); if (host.idleGpus == 0 && (schedulingMode == SchedulingMode.BALANCED)) { + String query = handleInClause("str_os", FIND_JOBS_BY_GROUP_NO_GPU, host.getOs().length); + ArrayList args = new ArrayList(); + + args.add(g.getGroupId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.getName()); + args.add(50); result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_GROUP_NO_GPU, - PKJOB_MAPPER, - g.getGroupId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.getName(), 50)); + query, + PKJOB_MAPPER, args.toArray())); prometheusMetrics.setBookingDurationMetric("findDispatchJobs by group nogpu query", System.currentTimeMillis() - lastTime); } else { + String query = handleInClause("str_os", findByGroupQuery(), host.getOs().length); + ArrayList args = new ArrayList(); + + args.add(g.getGroupId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.idleGpus); + args.add(host.idleGpuMemory > 0 ? 1 : 0); + args.add(host.idleGpuMemory); + args.add(host.getName()); + args.add(50); result.addAll(getJdbcTemplate().query( - findByGroupQuery(), - PKJOB_MAPPER, - g.getGroupId(),host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.idleGpus, - (host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory, - host.getName(), 50)); + query, + PKJOB_MAPPER, args.toArray())); prometheusMetrics.setBookingDurationMetric("findDispatchJobs by group query", System.currentTimeMillis() - lastTime); - } - return result; } @@ -515,26 +545,47 @@ public Set findDispatchJobs(DispatchHost host, LinkedHashSet result = new LinkedHashSet(numJobs); long start = System.currentTimeMillis(); if (host.idleGpus == 0 && (schedulingMode == SchedulingMode.BALANCED)) { + String query = handleInClause("str_os", FIND_JOBS_BY_SHOW_NO_GPU, host.getOs().length); + ArrayList args = new ArrayList(); + args.add(show.getShowId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.getName()); + args.add(numJobs * 10); + result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_SHOW_NO_GPU, - PKJOB_MAPPER, - show.getShowId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.getName(), numJobs * 10)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findDispatchJobs by show nogpu query", System.currentTimeMillis() - start); } else { + String query = handleInClause("str_os", findByShowQuery(), host.getOs().length); + ArrayList args = new ArrayList(); + args.add(show.getShowId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.idleCores); + args.add(host.idleMemory); + args.add(threadMode(host.threadMode)); + args.add(host.idleGpus); + args.add(host.idleGpuMemory > 0 ? 1 : 0); + args.add(host.idleGpuMemory); + args.add(host.getName()); + args.add(numJobs * 10); + result.addAll(getJdbcTemplate().query( - findByShowQuery(), - PKJOB_MAPPER, - show.getShowId(), host.getFacilityId(), host.os, - host.idleCores, host.idleMemory, - threadMode(host.threadMode), - host.idleGpus, - (host.idleGpuMemory > 0) ? 1 : 0, host.idleGpuMemory, - host.getName(), numJobs * 10)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findDispatchJobs by show query", System.currentTimeMillis() - start); } @@ -548,11 +599,24 @@ public Set findDispatchJobs(DispatchHost host, public Set findLocalDispatchJobs(DispatchHost host) { LinkedHashSet result = new LinkedHashSet(5); long start = System.currentTimeMillis(); + + String query = handleInClause("str_os", FIND_JOBS_BY_LOCAL, host.getOs().length); + ArrayList args = new ArrayList(); + args.add(host.getHostId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + args.add(host.getHostId()); + args.add(host.getFacilityId()); + for (String item : host.getOs()) { + args.add(item); + } + result.addAll(getJdbcTemplate().query( - FIND_JOBS_BY_LOCAL, - PKJOB_MAPPER, - host.getHostId(), host.getFacilityId(), - host.os, host.getHostId(), host.getFacilityId(), host.os)); + query, + PKJOB_MAPPER, args.toArray())); + prometheusMetrics.setBookingDurationMetric("findLocalDispatchJobs query", System.currentTimeMillis() - start); return result; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 0546d4558..9e0f6f80c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -331,6 +331,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { frame.minGpuMemory = rs.getLong("int_gpu_mem_min"); frame.version = rs.getInt("int_version"); frame.services = rs.getString("str_services"); + frame.os = rs.getString("str_os"); return frame; } }; @@ -347,6 +348,7 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { "job.str_user,"+ "job.int_uid,"+ "job.str_log_dir,"+ + "job.str_os,"+ "frame.str_name AS frame_name, "+ "frame.str_state AS frame_state, "+ "frame.pk_frame, "+ diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java index 244a4778b..20fe7b1ef 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java @@ -219,7 +219,7 @@ public DispatchHost mapRow(ResultSet rs, int rowNum) throws SQLException { host.isNimby = rs.getBoolean("b_nimby"); host.threadMode = rs.getInt("int_thread_mode"); host.tags = rs.getString("str_tags"); - host.os = rs.getString("str_os"); + host.setOs(rs.getString("str_os")); host.hardwareState = HardwareState.valueOf(rs.getString("str_state")); return host; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java index cf54eb85d..2a597cfeb 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java @@ -356,9 +356,10 @@ public VirtualProc mapRow(ResultSet rs, int rowNum) throws SQLException { "proc.int_virt_max_used,"+ "proc.int_virt_used,"+ "host.str_name AS host_name, " + - "host_stat.str_os " + + "job.str_os " + "FROM " + - "proc," + + "proc, " + + "job, " + "host, " + "host_stat, " + "alloc " + @@ -367,7 +368,9 @@ public VirtualProc mapRow(ResultSet rs, int rowNum) throws SQLException { "AND " + "host.pk_host = host_stat.pk_host " + "AND " + - "host.pk_alloc = alloc.pk_alloc "; + "host.pk_alloc = alloc.pk_alloc " + + "AND " + + "job.pk_job = proc.pk_job "; public VirtualProc getVirtualProc(String id) { return getJdbcTemplate().queryForObject( @@ -386,7 +389,7 @@ public VirtualProc findVirtualProc(FrameInterface frame) { "proc.*, " + "host.str_name AS host_name, " + "host.pk_alloc, " + - "host_stat.str_os, " + + "job.str_os, " + "alloc.pk_facility " + "FROM " + "proc, " + @@ -527,20 +530,23 @@ public String getCurrentFrameId(ProcInterface p) { "SELECT " + "proc.*, " + "host.str_name AS host_name, " + - "host_stat.str_os, " + + "job.str_os, " + "host.pk_alloc, " + "alloc.pk_facility " + "FROM " + "proc, " + "host, " + "host_stat,"+ - "alloc " + + "alloc, " + + "job " + "WHERE " + "proc.pk_host = host.pk_host " + "AND " + "host.pk_host = host_stat.pk_host " + "AND " + "host.pk_alloc = alloc.pk_alloc " + + "AND " + + "job.pk_job = proc.pk_job " + "AND " + "current_timestamp - proc.ts_ping > " + ORPHANED_PROC_INTERVAL; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index f60b2c1e6..0779209b0 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -395,6 +395,7 @@ public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { .setNumGpus(proc.gpusReserved) .setStartTime(System.currentTimeMillis()) .setIgnoreNimby(proc.isLocalDispatch) + .setOs(proc.os) .putAllEnvironment(jobDao.getEnvironment(frame)) .putAllEnvironment(layerDao.getLayerEnvironment(frame)) .putEnvironment("CUE3", "1") diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index e89c285bb..39423e80b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -258,7 +258,7 @@ public void handleHostReport(HostReport report, boolean isBoot) { "dispatcher.memory.mem_reserved_min", Long.class); - if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.os)) { + if (!isTempDirStorageEnough(report.getHost().getTotalMcp(), report.getHost().getFreeMcp(), host.getOs())) { msg = String.format( "%s doesn't have enough free space in the temporary directory (mcp), %dMB", host.name, (report.getHost().getFreeMcp()/1024)); @@ -351,16 +351,19 @@ else if (!dispatchSupport.isCueBookable(host)) { * * @param tempTotalStorage Total storage on the temp directory * @param tempFreeStorage Free storage on the temp directory - * @param hostOs Reported os + * @param hostOs Reported operational systems * @return */ - private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStorage, String hostOs) { + private boolean isTempDirStorageEnough(Long tempTotalStorage, Long tempFreeStorage, String[] hostOs) { // The minimum amount of free space in the temporary directory to book a host int minAvailableTempPercentage = env.getRequiredProperty( "dispatcher.min_available_temp_storage_percentage", Integer.class); - return minAvailableTempPercentage == -1 || hostOs.equalsIgnoreCase(WINDOWS_OS) || - (((tempFreeStorage * 100.0) / tempTotalStorage) >= minAvailableTempPercentage); + return minAvailableTempPercentage == -1 + // It is safe to assume multiple OSs imply windows is not the base OS, + // threfore Windows will always report a single hostOs + || (hostOs.length == 1 && hostOs[0].equalsIgnoreCase(WINDOWS_OS)) + || (((tempFreeStorage * 100.0) / tempTotalStorage) >= minAvailableTempPercentage); } /** @@ -427,7 +430,10 @@ private boolean changeStateForTempDirStorage(DispatchHost host, RenderHost repor "dispatcher.min_available_temp_storage_percentage", Integer.class); // Prevent cue frames from booking on hosts with full temporary directories - boolean hasEnoughTempStorage = isTempDirStorageEnough(reportHost.getTotalMcp(), reportHost.getFreeMcp(), host.os); + boolean hasEnoughTempStorage = isTempDirStorageEnough( + reportHost.getTotalMcp(), + reportHost.getFreeMcp(), + host.getOs()); if (!hasEnoughTempStorage && host.hardwareState == HardwareState.UP) { // Insert a comment indicating that the Host status = Repair with reason = Full temporary directory CommentDetail c = new CommentDetail(); diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V31__increase_os_size.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V31__increase_os_size.sql new file mode 100644 index 000000000..9ad89e437 --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V31__increase_os_size.sql @@ -0,0 +1,3 @@ +-- Increase size of os column on host_stat + +ALTER TABLE host_stat ALTER COLUMN str_os TYPE VARCHAR(32); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java index 5b7eaee72..1ff849473 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/DispatcherDaoTests.java @@ -203,7 +203,7 @@ public void testFindNextDispatchFrameByProc() { assertNotNull(frame); assertEquals("0001-pass_1", frame.name); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -235,7 +235,7 @@ public void testFindNextDispatchFramesByProc() { DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -288,7 +288,7 @@ public void testFindNextDispatchFramesByProcAndJobLocal() { assertEquals(10, frames.size()); DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; proc.isLocalDispatch = true; @@ -310,7 +310,7 @@ public void testFindNextDispatchFramesByProcAndLayerLocal() { assertEquals(10, frames.size()); DispatchFrame frame = frames.get(0); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job.os); proc.coresReserved = 100; proc.isLocalDispatch = true; @@ -406,7 +406,7 @@ public void testfindUnderProcedJob() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job1.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -442,7 +442,7 @@ public void testHigherPriorityJobExistsTrue() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -476,7 +476,7 @@ public void testHigherPriorityJobExistsFalse() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -511,7 +511,7 @@ public void testHigherPriorityJobExistsMaxProcBound() { "SELECT str_state FROM job WHERE pk_job=?", String.class, job2.id)); - VirtualProc proc = VirtualProc.build(host, frame); + VirtualProc proc = VirtualProc.build(host, frame, job2.os); proc.coresReserved = 100; dispatcher.dispatch(frame, proc); @@ -525,4 +525,46 @@ public void testHigherPriorityJobExistsMaxProcBound() { public void testFifoSchedulingEnabled() { assertEquals(dispatcherDao.getSchedulingMode(), DispatcherDao.SchedulingMode.PRIORITY_ONLY); } + + @Test + @Transactional + @Rollback(true) + public void testFindDispatchJobsByShowMultiOs() { + DispatchHost host = getHost(); + // Set multiple Os and confirm jobs with Linux are still being found + final JobDetail job = getJob1(); + assertNotNull(job); + + // Host with different os + host.setOs("centos7,SomethingElse"); + Set jobs = dispatcherDao.findDispatchJobs(host, + adminManager.findShowEntity("pipe"), 5); + assertTrue(jobs.size() == 0); + + // Host with Linux Os (same as defined on spec) + host.setOs("centos7,Linux,rocky9"); + jobs = dispatcherDao.findDispatchJobs(host, + adminManager.findShowEntity("pipe"), 5); + assertTrue(jobs.size() > 0); + } + + @Test + @Transactional + @Rollback(true) + public void testFindDispatchJobsAllShowsMultiOs() { + DispatchHost host = getHost(); + // Set multiple Os and confirm jobs with Linux are still being found + final JobDetail job = getJob1(); + assertNotNull(job); + + // Host with incompatible OS shouldn't find any job + host.setOs("centos7,SomethingElse"); + Set jobs = dispatcherDao.findDispatchJobs(host, 5); + assertTrue(jobs.size() == 0); + + // Host with Linux Os (same as defined on spec) should find jobs + host.setOs("centos7,Linux,rocky9"); + jobs = dispatcherDao.findDispatchJobs(host, 5); + assertTrue(jobs.size() > 0); + } } diff --git a/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd b/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd new file mode 100644 index 000000000..8bbcbf6f1 --- /dev/null +++ b/cuebot/src/test/resources/conf/dtd/cjsl-1.14.dtd @@ -0,0 +1,104 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml b/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml index 2c372eff2..b656f499f 100644 --- a/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml +++ b/cuebot/src/test/resources/conf/jobspec/jobspec_dispatch_test.xml @@ -18,7 +18,7 @@ - + spi @@ -30,9 +30,10 @@ false + Linux - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 @@ -44,7 +45,7 @@ - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 @@ -61,9 +62,10 @@ false + Linux - + /shots/pipe/usr_testuser/logs/help.py 1-10 1 diff --git a/proto/rqd.proto b/proto/rqd.proto index 1ff75a4be..73216c6da 100644 --- a/proto/rqd.proto +++ b/proto/rqd.proto @@ -114,6 +114,7 @@ message RunFrame { map attributes = 22; int32 num_gpus = 23; report.ChildrenProcStats children = 24; + string os = 25; } message RunFrameSeq { diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..b055a1807 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[tool.pyright] +venvPath = "." +venv = "venv" diff --git a/requirements.txt b/requirements.txt index 4fc14043b..946853794 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,4 +19,6 @@ pytest==8.3.3 # Optional requirements # Sentry support for rqd -sentry-sdk==2.11.0 \ No newline at end of file +sentry-sdk==2.11.0 + +docker==7.1.0 \ No newline at end of file diff --git a/rqd/rqd.example.conf b/rqd/rqd.example.conf index e51782272..870419b24 100644 --- a/rqd/rqd.example.conf +++ b/rqd/rqd.example.conf @@ -27,3 +27,21 @@ SYSTEMDRIVE MAYA_MODULE_PATH MAYA_SCRIPT_PATH PIXAR_LICENSE_FILE + +[docker.config] +# Setting this to True requires all the additional "docker.[]" sections to be filled +RUN_ON_DOCKER=False +DOCKER_SHELL_PATH=/usr/bin/sh + +# This section is only required if RUN_ON_DOCKER=True +# List of volume mounts following docker run's format, but replacing = with : +[docker.mounts] +TEMP=type:bind,source:/tmp,target:/tmp,bind-propagation:slave +NET=type:bind,source:/net,target:/net,bind-propagation:slave + +# This section is only required if RUN_ON_DOCKER=True +# - keys represent OSs this rqd is capable of executing jobs in +# - values are docker image tags +[docker.images] +centos7=centos7.3:latest +rocky9=rocky9.3:latest diff --git a/rqd/rqd/rqconstants.py b/rqd/rqd/rqconstants.py index 4537b3113..64116710e 100644 --- a/rqd/rqd/rqconstants.py +++ b/rqd/rqd/rqconstants.py @@ -156,6 +156,12 @@ SP_OS = platform.system() +# Docker mode config +RUN_ON_DOCKER = False +DOCKER_IMAGES = {} +DOCKER_MOUNTS = [] +DOCKER_SHELL_PATH = "/bin/sh" + try: if os.path.isfile(CONFIG_FILE): # Hostname can come from here: rqutil.getHostname() @@ -237,6 +243,83 @@ if config.has_section(__host_env_var_section): RQD_HOST_ENV_VARS = config.options(__host_env_var_section) + __docker_mounts = "docker.mounts" + __docker_config = "docker.config" + __docker_images = "docker.images" + + if config.has_section(__docker_config): + RUN_ON_DOCKER = config.getboolean(__docker_config, "RUN_ON_DOCKER") + if RUN_ON_DOCKER: + import docker + import docker.models + import docker.types + + # rqd needs to run as root to be able to run docker + RQD_UID = 0 + RQD_GID = 0 + + # Path to the shell to be used in the frame environment + if config.has_option(__docker_config, "DOCKER_SHELL_PATH"): + DOCKER_SHELL_PATH = config.get( + __docker_config, + "DOCKER_SHELL_PATH") + + # Every key:value on the config file under docker.images + # is parsed as key=SP_OS and value=image_tag. + # SP_OS is set to a list of all available keys + # For example: + # + # rqd.conf + # [docker.images] + # centos7=centos7.3:latest + # rocky9=rocky9.3:latest + # + # becomes: + # SP_OS=centos7,rocky9 + # DOCKER_IMAGES={ + # "centos7": "centos7.3:latest", + # "rocky9": "rocky9.3:latest" + # } + keys = config.options(__docker_images) + DOCKER_IMAGES = {} + for key in keys: + DOCKER_IMAGES[key] = config.get(__docker_images, key) + SP_OS = ",".join(keys) + if not DOCKER_IMAGES: + raise RuntimeError("Misconfigured rqd. RUN_ON_DOCKER=True requires at " + "least one image on DOCKER_IMAGES ([docker.images] " + "section of rqd.conf)") + def parse_mount(mount_string): + """ + Parse mount definitions similar to a docker run command into a docker + mount obj + + Format: type=bind,source=/tmp,target=/tmp,bind-propagation=slave + """ + parsed_mounts = {} + # bind-propagation defaults to None as only type=bind accepts it + parsed_mounts["bind-propagation"] = None + for item in mount_string.split(","): + name, mount_path = item.split(":") + parsed_mounts[name.strip()] = mount_path.strip() + return parsed_mounts + + # Parse values under the category docker.mounts into Mount objects + mounts = config.options(__docker_mounts) + for mount_name in mounts: + mount_str = "" + try: + mount_str = config.get(__docker_mounts, mount_name) + mount_dict = parse_mount(mount_str) + mount = docker.types.Mount(mount_dict["target"], + mount_dict["source"], + type=mount_dict["type"], + propagation=mount_dict["bind-propagation"]) + DOCKER_MOUNTS.append(mount) + except KeyError as e: + logging.exception("Failed to create Mount for key=%s, value=%s", + mount_name, mount_str) + # pylint: disable=broad-except except Exception as e: logging.warning( diff --git a/rqd/rqd/rqcore.py b/rqd/rqd/rqcore.py index f3d42e475..5beb73b76 100644 --- a/rqd/rqd/rqcore.py +++ b/rqd/rqd/rqcore.py @@ -35,6 +35,9 @@ import time import traceback import select +import uuid + +from docker.errors import APIError, ImageNotFound import rqd.compiled_proto.host_pb2 import rqd.compiled_proto.report_pb2 @@ -51,1071 +54,1326 @@ log = logging.getLogger(__name__) -class FrameAttendantThread(threading.Thread): - """Once a frame has been received and checked by RQD, this class handles - the launching, waiting on, and cleanup work related to running the - frame.""" - def __init__(self, rqCore, runFrame, frameInfo): - """FrameAttendantThread class initialization - @type rqCore: RqCore - @param rqCore: Main RQD Object - @type runFrame: RunFrame - @param runFrame: rqd_pb2.RunFrame - @type frameInfo: rqd.rqnetwork.RunningFrame - @param frameInfo: Servant for running frame - """ - threading.Thread.__init__(self) - self.rqCore = rqCore - self.frameId = runFrame.frame_id - self.runFrame = runFrame - self.startTime = 0 - self.endTime = 0 - self.frameInfo = frameInfo - self._tempLocations = [] - self.rqlog = None +class RqCore(object): + """Main body of RQD, handles the integration of all components, + the setup and launching of a frame and acts on all gRPC calls + that are passed from the Network module.""" - def __createEnvVariables(self): - """Define the environmental variables for the frame""" - # If linux specific, they need to move into self.runLinux() - # pylint: disable=attribute-defined-outside-init - self.frameEnv = {} - self.frameEnv["PATH"] = self.rqCore.machine.getPathEnv() - self.frameEnv["TERM"] = "unknown" - self.frameEnv["TZ"] = self.rqCore.machine.getTimezone() - self.frameEnv["USER"] = self.runFrame.user_name - self.frameEnv["LOGNAME"] = self.runFrame.user_name - self.frameEnv["mcp"] = "1" - self.frameEnv["show"] = self.runFrame.show - self.frameEnv["shot"] = self.runFrame.shot - self.frameEnv["jobid"] = self.runFrame.job_name - self.frameEnv["jobhost"] = self.rqCore.machine.getHostname() - self.frameEnv["frame"] = self.runFrame.frame_name - self.frameEnv["zframe"] = self.runFrame.frame_name - self.frameEnv["logfile"] = self.runFrame.log_file - self.frameEnv["maxframetime"] = "0" - self.frameEnv["minspace"] = "200" - self.frameEnv["CUE3"] = "True" - self.frameEnv["CUE_GPU_MEMORY"] = str(self.rqCore.machine.getGpuMemoryFree()) - self.frameEnv["SP_NOMYCSHRC"] = "1" + def __init__(self, optNimbyoff=False): + """RqCore class initialization""" + self.__whenIdle = False + self.__reboot = False - if rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX: - self.frameEnv["HOME"] = "%s/%s" % ( - rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX, - self.runFrame.user_name) + self.__optNimbyoff = optNimbyoff - if rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX: - self.frameEnv["MAIL"] = "%s/%s" % ( - rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX, - self.runFrame.user_name) + self.cores = rqd.compiled_proto.report_pb2.CoreDetail( + total_cores=0, + idle_cores=0, + locked_cores=0, + booked_cores=0, + reserved_cores=[], + ) - if platform.system() == "Windows": - for variable in ["SYSTEMROOT", "APPDATA", "TMP", "COMMONPROGRAMFILES", "SYSTEMDRIVE"]: - if variable in os.environ: - self.frameEnv[variable] = os.environ[variable] - for variable in rqd.rqconstants.RQD_HOST_ENV_VARS: - # Fallback to empty string, easy to spot what is missing in the log - self.frameEnv[variable] = os.environ.get(variable, '') + self.nimby = rqd.rqnimby.NimbyFactory.getNimby(self) - for key, value in self.runFrame.environment.items(): - if key == 'PATH': - self.frameEnv[key] += os.pathsep + value - else: - self.frameEnv[key] = value + self.machine = rqd.rqmachine.Machine(self, self.cores) - # Add threads to use all assigned hyper-threading cores - if 'CPU_LIST' in self.runFrame.attributes and 'CUE_THREADS' in self.frameEnv: - self.frameEnv['CUE_THREADS'] = str(max( - int(self.frameEnv['CUE_THREADS']), - len(self.runFrame.attributes['CPU_LIST'].split(',')))) - self.frameEnv['CUE_HT'] = "True" + self.network = rqd.rqnetwork.Network(self) + self.__threadLock = threading.Lock() + self.__cache = {} + self.updateRssThread = None + self.onIntervalThread = None + self.intervalStartTime = None + self.intervalSleepTime = rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC - # Add GPU's to use all assigned GPU cores - if 'GPU_LIST' in self.runFrame.attributes: - self.frameEnv['CUE_GPU_CORES'] = self.runFrame.attributes['GPU_LIST'] + # pylint: disable=unused-private-member + self.__cluster = None + self.__session = None + self.__stmt = None - # pylint: disable=inconsistent-return-statements - def _createCommandFile(self, command): - """Creates a file that subprocess. Popen then executes. - @type command: string - @param command: The command specified in the runFrame request - @rtype: string - @return: Command file location""" - # TODO: this should use tempfile to create the files and clean them up afterwards - commandFile = None - try: - if platform.system() == "Windows": - rqd_tmp_dir = os.path.join(tempfile.gettempdir(), 'rqd') - try: - os.mkdir(rqd_tmp_dir) - except OSError: - pass # okay, already exists + self.docker = None + self.docker_mounts = [] + self.docker_images = {} + self.docker_lock = threading.Lock() + if rqd.rqconstants.RUN_ON_DOCKER: + # pylint: disable=import-outside-toplevel + import docker + self.docker = docker + self.docker_images = rqd.rqconstants.DOCKER_IMAGES + self.docker_mounts = rqd.rqconstants.DOCKER_MOUNTS + self.handleFrameImages() - # Windows Batch needs some characters escaped: - command = command.replace('%', '%%') - for char in '^&<>|': - command = command.replace(char, '^' + char) + signal.signal(signal.SIGINT, self.handleExit) + signal.signal(signal.SIGTERM, self.handleExit) - commandFile = os.path.join( - rqd_tmp_dir, - 'cmd-%s-%s.bat' % (self.runFrame.frame_id, time.time())) + def start(self): + """Called by main to start the rqd service""" + if self.machine.isDesktop(): + if self.__optNimbyoff: + log.warning('Nimby startup has been disabled via --nimbyoff') + elif not rqd.rqconstants.OVERRIDE_NIMBY: + if rqd.rqconstants.OVERRIDE_NIMBY is None: + log.warning('OVERRIDE_NIMBY is not defined, Nimby startup has been disabled') + else: + log.warning('OVERRIDE_NIMBY is False, Nimby startup has been disabled') else: - commandFile = os.path.join(tempfile.gettempdir(), - 'rqd-cmd-%s-%s' % (self.runFrame.frame_id, time.time())) - with open(commandFile, "w", encoding='utf-8') as rqexe: - self._tempLocations.append(commandFile) - rqexe.write(command) - rqexe.close() - os.chmod(commandFile, 0o777) - return commandFile - # pylint: disable=broad-except - except Exception as e: - log.critical( - "Unable to make command file: %s due to %s at %s", - commandFile, e, traceback.extract_tb(sys.exc_info()[2])) + self.nimbyOn() + elif rqd.rqconstants.OVERRIDE_NIMBY: + log.warning('Nimby startup has been triggered by OVERRIDE_NIMBY') + self.nimbyOn() + self.network.start_grpc() - def __writeHeader(self): - """Writes the frame's log header""" + def grpcConnected(self): + """After gRPC connects to the cuebot, this function is called""" + self.network.reportRqdStartup(self.machine.getBootReport()) - self.startTime = time.time() + self.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) + self.updateRssThread.start() - try: - print("="*59, file=self.rqlog) - print("RenderQ JobSpec %s" % time.ctime(self.startTime), "\n", file=self.rqlog) - print("proxy rqd.rqnetwork.RunningFrame/%s -t:tcp -h %s -p 10021" % ( - self.runFrame.frame_id, - self.rqCore.machine.getHostname()), file=self.rqlog) - print("%-21s%s" % ("command", self.runFrame.command), file=self.rqlog) - print("%-21s%s" % ("uid", self.runFrame.uid), file=self.rqlog) - print("%-21s%s" % ("gid", self.runFrame.gid), file=self.rqlog) - print("%-21s%s" % ("logDestination", - self.runFrame.log_dir_file), file=self.rqlog) - print("%-21s%s" % ("cwd", self.runFrame.frame_temp_dir), file=self.rqlog) - print("%-21s%s" % ("renderHost", - self.rqCore.machine.getHostname()), file=self.rqlog) - print("%-21s%s" % ("jobId", self.runFrame.job_id), file=self.rqlog) - print("%-21s%s" % ("frameId", self.runFrame.frame_id), file=self.rqlog) - for env in sorted(self.frameEnv): - print("%-21s%s=%s" % ("env", env, self.frameEnv[env]), file=self.rqlog) - print("="*59, file=self.rqlog) + self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) + self.intervalStartTime = time.time() + self.onIntervalThread.start() - if 'CPU_LIST' in self.runFrame.attributes: - print('Hyper-threading enabled', file=self.rqlog) + log.warning('RQD Started') + + def onInterval(self, sleepTime=None): + """This is called by self.grpcConnected as a timer thread to execute + every interval""" + if sleepTime is None: + self.intervalSleepTime = random.randint( + rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC, + rqd.rqconstants.RQD_MAX_PING_INTERVAL_SEC) + else: + self.intervalSleepTime = sleepTime + try: + self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) + self.intervalStartTime = time.time() + self.onIntervalThread.start() # pylint: disable=broad-except except Exception as e: log.critical( - "Unable to write header to rqlog: %s due to %s at %s", - self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) - - def __writeFooter(self): - """Writes frame's log footer""" + 'Unable to schedule a ping due to %s at %s', + e, traceback.extract_tb(sys.exc_info()[2])) - self.endTime = time.time() - self.frameInfo.runTime = int(self.endTime - self.startTime) try: - print("", file=self.rqlog) - print("="*59, file=self.rqlog) - print("RenderQ Job Complete\n", file=self.rqlog) - print("%-20s%s" % ("exitStatus", self.frameInfo.exitStatus), file=self.rqlog) - print("%-20s%s" % ("exitSignal", self.frameInfo.exitSignal), file=self.rqlog) - if self.frameInfo.killMessage: - print("%-20s%s" % ("killMessage", self.frameInfo.killMessage), file=self.rqlog) - print("%-20s%s" % ("startTime", - time.ctime(self.startTime)), file=self.rqlog) - print("%-20s%s" % ("endTime", - time.ctime(self.endTime)), file=self.rqlog) - print("%-20s%s" % ("maxrss", self.frameInfo.maxRss), file=self.rqlog) - print("%-20s%s" % ("maxUsedGpuMemory", - self.frameInfo.maxUsedGpuMemory), file=self.rqlog) - print("%-20s%s" % ("utime", self.frameInfo.utime), file=self.rqlog) - print("%-20s%s" % ("stime", self.frameInfo.stime), file=self.rqlog) - print("%-20s%s" % ("renderhost", self.rqCore.machine.getHostname()), file=self.rqlog) - - print("%-20s%s" % ("maxrss (KB)", self.frameInfo.maxRss), file=self.rqlog) - for child in sorted(self.frameInfo.childrenProcs.items(), - key=lambda item: item[1]['start_time']): - print("\t%-20s%s" % (child[1]['name'], child[1]['rss']), file=self.rqlog) - print("\t%-20s%s" % ("start_time", - datetime.timedelta(seconds=child[1]["start_time"])), - file=self.rqlog) - print("\t%-20s%s" % ("cmdline", " ".join(child[1]["cmd_line"])), file=self.rqlog) - - print("="*59, file=self.rqlog) - + if self.__whenIdle and not self.__cache: + if not self.machine.isUserLoggedIn(): + self.shutdownRqdNow() + else: + log.warning('Shutdown requested but a user is logged in.') # pylint: disable=broad-except except Exception as e: - log.critical( - "Unable to write footer: %s due to %s at %s", - self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) + log.warning( + 'Unable to shutdown due to %s at %s', e, traceback.extract_tb(sys.exc_info()[2])) - def __cleanup(self): - """Cleans up temporary files""" - rqd.rqutil.permissionsHigh() try: - for location in self._tempLocations: - if os.path.isfile(location): - try: - os.remove(location) - # pylint: disable=broad-except - except Exception as e: - log.warning( - "Unable to delete file: %s due to %s at %s", - location, e, traceback.extract_tb(sys.exc_info()[2])) - finally: - rqd.rqutil.permissionsLow() - - # Close log file - try: - self.rqlog.close() + self.sendStatusReport() # pylint: disable=broad-except - except Exception as e: - log.warning( - "Unable to close file: %s due to %s at %s", - self.runFrame.log_file, e, traceback.extract_tb(sys.exc_info()[2])) + except Exception: + log.exception('Unable to send status report') - def runLinux(self): - """The steps required to handle a frame under linux""" - frameInfo = self.frameInfo - runFrame = self.runFrame + def updateRss(self): + """Triggers and schedules the updating of rss information""" + if self.__cache: + try: + self.machine.rssUpdate(self.__cache) + finally: + self.updateRssThread = threading.Timer( + rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) + self.updateRssThread.start() - self.__createEnvVariables() - self.__writeHeader() + def getFrame(self, frameId): + """Gets a frame from the cache based on frameId + @type frameId: string + @param frameId: A frame's unique Id + @rtype: rqd.rqnetwork.RunningFrame + @return: rqd.rqnetwork.RunningFrame object""" + return self.__cache[frameId] - tempStatFile = "%srqd-stat-%s-%s" % (self.rqCore.machine.getTempPath(), - frameInfo.frameId, - time.time()) - self._tempLocations.append(tempStatFile) - tempCommand = [] - if self.rqCore.machine.isDesktop(): - tempCommand += ["/bin/nice"] - tempCommand += ["/usr/bin/time", "-p", "-o", tempStatFile] + def getFrameKeys(self): + """Gets a list of all keys from the cache + @rtype: list + @return: List of all frameIds running on host""" + return list(self.__cache.keys()) - if 'CPU_LIST' in runFrame.attributes: - tempCommand += ['taskset', '-c', runFrame.attributes['CPU_LIST']] + def storeFrame(self, frameId, runningFrame): + """Stores a frame in the cache and adds the network adapter + @type frameId: string + @param frameId: A frame's unique Id + @type runningFrame: rqd.rqnetwork.RunningFrame + @param runningFrame: rqd.rqnetwork.RunningFrame object""" + with self.__threadLock: + if frameId in self.__cache: + raise rqd.rqexceptions.RqdException( + "frameId " + frameId + " is already running on this machine") + self.__cache[frameId] = runningFrame - rqd.rqutil.permissionsHigh() - try: - if rqd.rqconstants.RQD_BECOME_JOB_USER: - tempCommand += ["/bin/su", runFrame.user_name, rqd.rqconstants.SU_ARGUMENT, - '"' + self._createCommandFile(runFrame.command) + '"'] + def deleteFrame(self, frameId): + """Deletes a frame from the cache + @type frameId: string + @param frameId: A frame's unique Id""" + with self.__threadLock: + if frameId in self.__cache: + del self.__cache[frameId] + # pylint: disable=no-member + if not self.__cache and self.cores.reserved_cores: + # pylint: disable=no-member + log.error( + 'No running frames but reserved_cores is not empty: %s', + self.cores.reserved_cores) + # pylint: disable=no-member + self.cores.reserved_cores.clear() + log.info("Successfully delete frame with Id: %s", frameId) else: - tempCommand += [self._createCommandFile(runFrame.command)] - - # pylint: disable=subprocess-popen-preexec-fn,consider-using-with - frameInfo.forkedCommand = subprocess.Popen(tempCommand, - env=self.frameEnv, - cwd=self.rqCore.machine.getTempPath(), - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - preexec_fn=os.setsid) - finally: - rqd.rqutil.permissionsLow() - - frameInfo.pid = frameInfo.forkedCommand.pid + log.warning("Frame with Id: %s not found in cache", frameId) - if not self.rqCore.updateRssThread.is_alive(): - self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, - self.rqCore.updateRss) - self.rqCore.updateRssThread.start() + def killAllFrame(self, reason): + """Will execute .kill() on every frame in cache until no frames remain + @type reason: string + @param reason: Reason for requesting all frames to be killed""" - poller = select.poll() - poller.register(frameInfo.forkedCommand.stdout, select.POLLIN) - poller.register(frameInfo.forkedCommand.stderr, select.POLLIN) - while True: - for fd, event in poller.poll(): - if event & select.POLLIN: - if fd == frameInfo.forkedCommand.stdout.fileno(): - line = frameInfo.forkedCommand.stdout.readline() - elif fd == frameInfo.forkedCommand.stderr.fileno(): - line = frameInfo.forkedCommand.stderr.readline() - else: - continue - if not line: - break - self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) - if frameInfo.forkedCommand.poll() is not None: - break + if self.__cache: + log.warning( + "killAllFrame called due to: %s\n%s", reason, ",".join(self.getFrameKeys())) - returncode = frameInfo.forkedCommand.wait() + while self.__cache: + if reason.startswith("NIMBY"): + # Since this is a nimby kill, ignore any frames that are ignoreNimby + frameKeys = [ + frame.frameId for frame in list(self.__cache.values()) if not frame.ignoreNimby] + else: + frameKeys = list(self.__cache.keys()) - # Find exitStatus and exitSignal - if returncode < 0: - # Exited with a signal - frameInfo.exitStatus = 1 - frameInfo.exitSignal = -returncode - else: - frameInfo.exitStatus = returncode - frameInfo.exitSignal = 0 + if not frameKeys: + # No frames left to kill + return - try: - with open(tempStatFile, "r", encoding='utf-8') as statFile: - frameInfo.realtime = statFile.readline().split()[1] - frameInfo.utime = statFile.readline().split()[1] - frameInfo.stime = statFile.readline().split()[1] - statFile.close() - # pylint: disable=broad-except - except Exception: - pass # This happens when frames are killed + for frameKey in frameKeys: + try: + self.__cache[frameKey].kill(reason) + except KeyError: + pass + time.sleep(1) - self.__writeFooter() - self.__cleanup() + def releaseCores(self, reqRelease, releaseHT=None, releaseGpus=None): + """The requested number of cores are released + @type reqRelease: int + @param reqRelease: Number of cores to release, 100 = 1 physical core""" + with self.__threadLock: + # pylint: disable=no-member + self.cores.booked_cores -= reqRelease + maxRelease = (self.cores.total_cores - + self.cores.locked_cores - + self.cores.idle_cores - + self.cores.booked_cores) - def runWindows(self): - """The steps required to handle a frame under windows""" - frameInfo = self.frameInfo - runFrame = self.runFrame + if maxRelease > 0: + self.cores.idle_cores += min(maxRelease, reqRelease) + # pylint: enable=no-member - self.__createEnvVariables() - self.__writeHeader() + if releaseHT: + self.machine.releaseHT(releaseHT) - try: - runFrame.command = runFrame.command.replace('%{frame}', self.frameEnv['CUE_IFRAME']) - tempCommand = [self._createCommandFile(runFrame.command)] + if releaseGpus: + self.machine.releaseGpus(releaseGpus) - # pylint: disable=consider-using-with - frameInfo.forkedCommand = subprocess.Popen(tempCommand, - env=self.frameEnv, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - # pylint: disable=broad-except - except Exception: + # pylint: disable=no-member + if self.cores.idle_cores > self.cores.total_cores: log.critical( - "Failed subprocess.Popen: Due to: \n%s", - ''.join(traceback.format_exception(*sys.exc_info()))) + "idle_cores (%d) have become greater than total_cores (%d): %s at %s", + self.cores.idle_cores, self.cores.total_cores, sys.exc_info()[0], + traceback.extract_tb(sys.exc_info()[2])) + # pylint: enable=no-member - frameInfo.pid = frameInfo.forkedCommand.pid + def shutdown(self): + """Shuts down all rqd systems""" + self.nimbyOff() + if self.onIntervalThread is not None: + self.onIntervalThread.cancel() + if self.updateRssThread is not None: + self.updateRssThread.cancel() + elif self.__reboot: + log.warning("Rebooting machine by request") + self.machine.reboot() + else: + log.warning("Shutting down RQD by request. pid(%s)", os.getpid()) + self.network.stopGrpc() + # Using sys.exit would raise SystemExit, giving exception handlers a chance + # to block this + # pylint: disable=protected-access + os._exit(0) - if not self.rqCore.updateRssThread.is_alive(): - self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, - self.rqCore.updateRss) - self.rqCore.updateRssThread.start() + def handleExit(self, signalnum, flag): + """Shutdown threads and exit RQD.""" + del signalnum + del flag + self.shutdown() - while True: - output = frameInfo.forkedCommand.stdout.readline() - if not output and frameInfo.forkedCommand.poll() is not None: - break - if output: - self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + def launchFrame(self, runFrame): + """This will setup for the launch the frame specified in the arguments. + If a problem is encountered, a CueException will be thrown. + @type runFrame: RunFrame + @param runFrame: rqd_pb2.RunFrame""" + log.info("Running command %s for %s", runFrame.command, runFrame.frame_id) + log.debug(runFrame) - frameInfo.forkedCommand.wait() + # + # Check for reasons to abort launch + # - # Find exitStatus and exitSignal - returncode = frameInfo.forkedCommand.returncode - if returncode < INT32_MIN: - returncode = 303 - if returncode > INT32_MAX: - returncode = 304 - frameInfo.exitStatus = returncode - frameInfo.exitSignal = returncode - - frameInfo.realtime = 0 - frameInfo.utime = 0 - frameInfo.stime = 0 + if self.machine.state != rqd.compiled_proto.host_pb2.UP: + err = "Not launching, rqd HardwareState is not Up" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - self.__writeFooter() - self.__cleanup() + if self.__whenIdle: + err = "Not launching, rqd is waiting for idle to shutdown" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - def runDarwin(self): - """The steps required to handle a frame under mac""" - frameInfo = self.frameInfo + if self.nimby.locked and not runFrame.ignore_nimby: + err = "Not launching, rqd is lockNimby and not Ignore Nimby" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - self.__createEnvVariables() - self.__writeHeader() + if rqd.rqconstants.OVERRIDE_NIMBY and self.nimby.isNimbyActive(): + err = "Not launching, rqd is lockNimby and User is Active" + log.info(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - rqd.rqutil.permissionsHigh() - try: - tempCommand = ["/usr/bin/su", frameInfo.runFrame.user_name, "-c", '"' + - self._createCommandFile(frameInfo.runFrame.command) + '"'] + if runFrame.frame_id in self.__cache: + err = "Not launching, frame is already running on this proc %s" % runFrame.frame_id + log.critical(err) + raise rqd.rqexceptions.DuplicateFrameViolationException(err) - # pylint: disable=subprocess-popen-preexec-fn,consider-using-with - frameInfo.forkedCommand = subprocess.Popen(tempCommand, - env=self.frameEnv, - cwd=self.rqCore.machine.getTempPath(), - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - preexec_fn=os.setsid) - finally: - rqd.rqutil.permissionsLow() + if runFrame.HasField("uid") and runFrame.uid <= 0: + err = "Not launching, will not run frame as uid=%d" % runFrame.uid + log.warning(err) + raise rqd.rqexceptions.InvalidUserException(err) - frameInfo.pid = frameInfo.forkedCommand.pid + if runFrame.num_cores <= 0: + err = "Not launching, numCores must be > 0" + log.warning(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) - if not self.rqCore.updateRssThread.is_alive(): - self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, - self.rqCore.updateRss) - self.rqCore.updateRssThread.start() + # See if all requested cores are available + with self.__threadLock: + # pylint: disable=no-member + if self.cores.idle_cores < runFrame.num_cores: + err = "Not launching, insufficient idle cores" + log.critical(err) + raise rqd.rqexceptions.CoreReservationFailureException(err) + # pylint: enable=no-member - while True: - output = frameInfo.forkedCommand.stdout.readline() - if not output and frameInfo.forkedCommand.poll() is not None: - break - if output: - self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + if runFrame.environment.get('CUE_THREADABLE') == '1': + reserveHT = self.machine.reserveHT(runFrame.num_cores) + if reserveHT: + runFrame.attributes['CPU_LIST'] = reserveHT - frameInfo.forkedCommand.wait() + if runFrame.num_gpus: + reserveGpus = self.machine.reserveGpus(runFrame.num_gpus) + if reserveGpus: + runFrame.attributes['GPU_LIST'] = reserveGpus - # Find exitStatus and exitSignal - returncode = frameInfo.forkedCommand.returncode - if os.WIFEXITED(returncode): - frameInfo.exitStatus = os.WEXITSTATUS(returncode) - else: - frameInfo.exitStatus = 1 - if os.WIFSIGNALED(returncode): - frameInfo.exitSignal = os.WTERMSIG(returncode) + # They must be available at this point, reserve them + # pylint: disable=no-member + self.cores.idle_cores -= runFrame.num_cores + self.cores.booked_cores += runFrame.num_cores + # pylint: enable=no-member - self.__writeFooter() - self.__cleanup() + runningFrame = rqd.rqnetwork.RunningFrame(self, runFrame) + runningFrame.frameAttendantThread = FrameAttendantThread(self, runFrame, runningFrame) + runningFrame.frameAttendantThread.start() - def runUnknown(self): - """The steps required to handle a frame under an unknown OS.""" + def getRunningFrame(self, frameId): + """Gets the currently running frame.""" + try: + return self.__cache[frameId] + except KeyError: + log.info("frameId %s is not running on this machine", frameId) + return None - def run(self): - """Thread initialization""" - log.info("Monitor frame started for frameId=%s", self.frameId) + def getCoreInfo(self): + """Gets the core info report.""" + return self.cores - runFrame = self.runFrame + def reportStatus(self): + """Replies with hostReport""" + return self.machine.getHostReport() - # pylint: disable=too-many-nested-blocks + def shutdownRqdNow(self): + """Kill all running frames and shutdown RQD""" + self.machine.state = rqd.compiled_proto.host_pb2.DOWN try: - runFrame.job_temp_dir = os.path.join(self.rqCore.machine.getTempPath(), - runFrame.job_name) - runFrame.frame_temp_dir = os.path.join(runFrame.job_temp_dir, - runFrame.frame_name) - runFrame.log_file = "%s.%s.rqlog" % (runFrame.job_name, - runFrame.frame_name) - runFrame.log_dir_file = os.path.join(runFrame.log_dir, runFrame.log_file) - - try: # Exception block for all exceptions - # Ensure permissions return to Low after this block - try: - if rqd.rqconstants.RQD_CREATE_USER_IF_NOT_EXISTS and runFrame.HasField("uid"): - rqd.rqutil.checkAndCreateUser(runFrame.user_name, - runFrame.uid, - runFrame.gid) - # Do everything as launching user: - runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID - rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid) - - # Setup frame logging - try: - self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) - self.rqlog.waitForFile() - # pylint: disable=broad-except - except Exception as e: - err = "Unable to write to %s due to %s" % (runFrame.log_dir_file, e) - raise RuntimeError(err) + self.lockAll() + self.killAllFrame("shutdownRqdNow Command") + # pylint: disable=broad-except + except Exception: + log.exception("Failed to kill frames, stopping service anyways") + if not self.__cache: + self.shutdown() - finally: - rqd.rqutil.permissionsLow() + def shutdownRqdIdle(self): + """When machine is idle, shutdown RQD""" + log.info("shutdownRqdIdle") + self.lockAll() + self.__whenIdle = True + self.sendStatusReport() + if not self.__cache: + self.shutdownRqdNow() - # Store frame in cache and register servant - self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo) + def rebootNow(self): + """Kill all running frames and reboot machine. + This is not available when a user is logged in""" + log.warning('Requested to reboot now') + if self.machine.isUserLoggedIn(): + err = ('Rebooting via RQD is not supported for a desktop machine ' + 'when a user is logged in') + log.warning(err) + raise rqd.rqexceptions.RqdException(err) + self.__reboot = True + self.shutdownRqdNow() - if platform.system() == "Linux": - self.runLinux() - elif platform.system() == "Windows": - self.runWindows() - elif platform.system() == "Darwin": - self.runDarwin() - else: - self.runUnknown() + def rebootIdle(self): + """When machine is idle, reboot it""" + log.warning('Requested to reboot machine when idle') + self.lockAll() + self.__whenIdle = True + self.__reboot = True + self.sendStatusReport() + if not self.__cache and not self.machine.isUserLoggedIn(): + self.shutdownRqdNow() + def nimbyOn(self): + """Activates nimby, does not kill any running frames until next nimby + event. Also does not unlock until sufficient idle time is reached.""" + if self.nimby and not self.nimby.active: + try: + self.nimby.run() + log.warning("Nimby has been activated") # pylint: disable=broad-except except Exception: - log.critical( - "Failed launchFrame: For %s due to: \n%s", - runFrame.frame_id, ''.join(traceback.format_exception(*sys.exc_info()))) - # Notifies the cuebot that there was an error launching - self.frameInfo.exitStatus = rqd.rqconstants.EXITSTATUS_FOR_FAILED_LAUNCH - # Delay keeps the cuebot from spamming failing booking requests - time.sleep(10) - finally: - self.rqCore.releaseCores(self.runFrame.num_cores, runFrame.attributes.get('CPU_LIST'), - runFrame.attributes.get('GPU_LIST') - if 'GPU_LIST' in self.runFrame.attributes else None) - - self.rqCore.deleteFrame(self.runFrame.frame_id) - - self.rqCore.sendFrameCompleteReport(self.frameInfo) - time_till_next = ( - (self.rqCore.intervalStartTime + self.rqCore.intervalSleepTime) - time.time()) - if time_till_next > (2 * rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC): - self.rqCore.onIntervalThread.cancel() - self.rqCore.onInterval(rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC) - - log.info("Monitor frame ended for frameId=%s", - self.runFrame.frame_id) - + self.nimby.locked = False + err = "Nimby is in the process of shutting down" + log.exception(err) + raise rqd.rqexceptions.RqdException(err) -class RqCore(object): - """Main body of RQD, handles the integration of all components, - the setup and launching of a frame and acts on all gRPC calls - that are passed from the Network module.""" + def nimbyOff(self): + """Deactivates nimby and unlocks any nimby lock""" + if self.nimby.active: + self.nimby.stop() + log.info("Nimby has been deactivated") - def __init__(self, optNimbyoff=False): - """RqCore class initialization""" - self.__whenIdle = False - self.__reboot = False + def onNimbyLock(self): + """This is called by nimby when it locks the machine. + All running frames are killed. + A new report is sent to the cuebot.""" + self.killAllFrame("NIMBY Triggered") + self.sendStatusReport() - self.__optNimbyoff = optNimbyoff + def onNimbyUnlock(self, asOf=None): + """This is called by nimby when it unlocks the machine due to sufficient + idle. A new report is sent to the cuebot. + @param asOf: Time when idle state began, if known.""" + del asOf + self.sendStatusReport() - self.cores = rqd.compiled_proto.report_pb2.CoreDetail( - total_cores=0, - idle_cores=0, - locked_cores=0, - booked_cores=0, - reserved_cores=[], - ) - - self.nimby = rqd.rqnimby.NimbyFactory.getNimby(self) - - self.machine = rqd.rqmachine.Machine(self, self.cores) - - self.network = rqd.rqnetwork.Network(self) - self.__threadLock = threading.Lock() - self.__cache = {} + def lock(self, reqLock): + """Locks the requested core. + If a locked status changes, a status report is sent to the cuebot. + @type reqLock: int + @param reqLock: Number of cores to lock, 100 = 1 physical core""" + sendUpdate = False + with self.__threadLock: + # pylint: disable=no-member + numLock = min(self.cores.total_cores - self.cores.locked_cores, + reqLock) + if numLock > 0: + self.cores.locked_cores += numLock + self.cores.idle_cores -= min(numLock, self.cores.idle_cores) + sendUpdate = True + # pylint: enable=no-member - self.updateRssThread = None - self.onIntervalThread = None - self.intervalStartTime = None - self.intervalSleepTime = rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC + log.debug(self.cores) - # pylint: disable=unused-private-member - self.__cluster = None - self.__session = None - self.__stmt = None + if sendUpdate: + self.sendStatusReport() - signal.signal(signal.SIGINT, self.handleExit) - signal.signal(signal.SIGTERM, self.handleExit) + def lockAll(self): + """"Locks all cores on the machine. + If a locked status changes, a status report is sent.""" + sendUpdate = False + with self.__threadLock: + # pylint: disable=no-member + if self.cores.locked_cores < self.cores.total_cores: + self.cores.locked_cores = self.cores.total_cores + self.cores.idle_cores = 0 + sendUpdate = True + # pylint: enable=no-member - def start(self): - """Called by main to start the rqd service""" - if self.machine.isDesktop(): - if self.__optNimbyoff: - log.warning('Nimby startup has been disabled via --nimbyoff') - elif not rqd.rqconstants.OVERRIDE_NIMBY: - if rqd.rqconstants.OVERRIDE_NIMBY is None: - log.warning('OVERRIDE_NIMBY is not defined, Nimby startup has been disabled') - else: - log.warning('OVERRIDE_NIMBY is False, Nimby startup has been disabled') - else: - self.nimbyOn() - elif rqd.rqconstants.OVERRIDE_NIMBY: - log.warning('Nimby startup has been triggered by OVERRIDE_NIMBY') - self.nimbyOn() - self.network.start_grpc() + log.debug(self.cores) - def grpcConnected(self): - """After gRPC connects to the cuebot, this function is called""" - self.network.reportRqdStartup(self.machine.getBootReport()) + if sendUpdate: + self.sendStatusReport() - self.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) - self.updateRssThread.start() + def unlock(self, reqUnlock): + """Unlocks the requested number of cores. + Also resets reboot/shutdown/restart when idle requests. + If a locked status changes, a status report is sent to the cuebot. + @type reqUnlock: int + @param reqUnlock: Number of cores to unlock, 100 = 1 physical core""" - self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) - self.intervalStartTime = time.time() - self.onIntervalThread.start() + sendUpdate = False - log.warning('RQD Started') + if (self.__whenIdle or self.__reboot or + self.machine.state != rqd.compiled_proto.host_pb2.UP): + sendUpdate = True - def onInterval(self, sleepTime=None): + self.__whenIdle = False + self.__reboot = False + self.machine.state = rqd.compiled_proto.host_pb2.UP - """This is called by self.grpcConnected as a timer thread to execute - every interval""" - if sleepTime is None: - self.intervalSleepTime = random.randint( - rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC, - rqd.rqconstants.RQD_MAX_PING_INTERVAL_SEC) - else: - self.intervalSleepTime = sleepTime - try: - self.onIntervalThread = threading.Timer(self.intervalSleepTime, self.onInterval) - self.intervalStartTime = time.time() - self.onIntervalThread.start() - # pylint: disable=broad-except - except Exception as e: - log.critical( - 'Unable to schedule a ping due to %s at %s', - e, traceback.extract_tb(sys.exc_info()[2])) + with self.__threadLock: + # pylint: disable=no-member + numUnlock = min(self.cores.locked_cores, reqUnlock) + if numUnlock > 0: + self.cores.locked_cores -= numUnlock + self.cores.idle_cores += numUnlock + sendUpdate = True + # pylint: enable=no-member - try: - if self.__whenIdle and not self.__cache: - if not self.machine.isUserLoggedIn(): - self.shutdownRqdNow() - else: - log.warning('Shutdown requested but a user is logged in.') - # pylint: disable=broad-except - except Exception as e: - log.warning( - 'Unable to shutdown due to %s at %s', e, traceback.extract_tb(sys.exc_info()[2])) + log.debug(self.cores) - try: + if sendUpdate: self.sendStatusReport() - # pylint: disable=broad-except - except Exception as e: - log.critical( - 'Unable to send status report due to %s at %s', - e, traceback.extract_tb(sys.exc_info()[2])) - def updateRss(self): - """Triggers and schedules the updating of rss information""" - if self.__cache: - try: - self.machine.rssUpdate(self.__cache) - finally: - self.updateRssThread = threading.Timer( - rqd.rqconstants.RSS_UPDATE_INTERVAL, self.updateRss) - self.updateRssThread.start() + def unlockAll(self): + """"Unlocks all cores on the machine. + Also resets reboot/shutdown/restart when idle requests. + If a locked status changes, a status report is sent.""" - def getFrame(self, frameId): - """Gets a frame from the cache based on frameId - @type frameId: string - @param frameId: A frame's unique Id - @rtype: rqd.rqnetwork.RunningFrame - @return: rqd.rqnetwork.RunningFrame object""" - return self.__cache[frameId] + sendUpdate = False - def getFrameKeys(self): - """Gets a list of all keys from the cache - @rtype: list - @return: List of all frameIds running on host""" - return list(self.__cache.keys()) + if (self.__whenIdle or self.__reboot + or self.machine.state != rqd.compiled_proto.host_pb2.UP): + sendUpdate = True - def storeFrame(self, frameId, runningFrame): - """Stores a frame in the cache and adds the network adapter - @type frameId: string - @param frameId: A frame's unique Id - @type runningFrame: rqd.rqnetwork.RunningFrame - @param runningFrame: rqd.rqnetwork.RunningFrame object""" - with self.__threadLock: - if frameId in self.__cache: - raise rqd.rqexceptions.RqdException( - "frameId " + frameId + " is already running on this machine") - self.__cache[frameId] = runningFrame + self.__whenIdle = False + self.__reboot = False + self.machine.state = rqd.compiled_proto.host_pb2.UP - def deleteFrame(self, frameId): - """Deletes a frame from the cache - @type frameId: string - @param frameId: A frame's unique Id""" with self.__threadLock: - if frameId in self.__cache: - del self.__cache[frameId] - # pylint: disable=no-member - if not self.__cache and self.cores.reserved_cores: - # pylint: disable=no-member - log.error( - 'No running frames but reserved_cores is not empty: %s', - self.cores.reserved_cores) - # pylint: disable=no-member - self.cores.reserved_cores.clear() - log.info("Successfully delete frame with Id: %s", frameId) - else: - log.warning("Frame with Id: %s not found in cache", frameId) - - def killAllFrame(self, reason): - """Will execute .kill() on every frame in cache until no frames remain - @type reason: string - @param reason: Reason for requesting all frames to be killed""" + # pylint: disable=no-member + if self.cores.locked_cores > 0: + if not self.nimby.locked: + self.cores.idle_cores += self.cores.locked_cores + self.cores.locked_cores = 0 + sendUpdate = True + # pylint: enable=no-member - if self.__cache: - log.warning( - "killAllFrame called due to: %s\n%s", reason, ",".join(self.getFrameKeys())) + log.debug(self.cores) - while self.__cache: - if reason.startswith("NIMBY"): - # Since this is a nimby kill, ignore any frames that are ignoreNimby - frameKeys = [ - frame.frameId for frame in list(self.__cache.values()) if not frame.ignoreNimby] - else: - frameKeys = list(self.__cache.keys()) + if sendUpdate: + self.sendStatusReport() - if not frameKeys: - # No frames left to kill - return + def sendStatusReport(self): + """Sends the current host report to Cuebot.""" + self.network.reportStatus(self.machine.getHostReport()) - for frameKey in frameKeys: - try: - self.__cache[frameKey].kill(reason) - except KeyError: - pass - time.sleep(1) + def isWaitingForIdle(self): + """Returns whether the host is waiting until idle to take some action.""" + return self.__whenIdle - def releaseCores(self, reqRelease, releaseHT=None, releaseGpus=None): - """The requested number of cores are released - @type reqRelease: int - @param reqRelease: Number of cores to release, 100 = 1 physical core""" - with self.__threadLock: + def sendFrameCompleteReport(self, runningFrame): + """Send a frameCompleteReport to Cuebot""" + if not runningFrame.completeReportSent: + report = rqd.compiled_proto.report_pb2.FrameCompleteReport() # pylint: disable=no-member - self.cores.booked_cores -= reqRelease - maxRelease = (self.cores.total_cores - - self.cores.locked_cores - - self.cores.idle_cores - - self.cores.booked_cores) - - if maxRelease > 0: - self.cores.idle_cores += min(maxRelease, reqRelease) + report.host.CopyFrom(self.machine.getHostInfo()) + report.frame.CopyFrom(runningFrame.runningFrameInfo()) # pylint: enable=no-member - if releaseHT: - self.machine.releaseHT(releaseHT) + if runningFrame.exitStatus is None: + report.exit_status = 1 + else: + report.exit_status = runningFrame.exitStatus - if releaseGpus: - self.machine.releaseGpus(releaseGpus) + report.exit_signal = runningFrame.exitSignal + report.run_time = int(runningFrame.runTime) - # pylint: disable=no-member - if self.cores.idle_cores > self.cores.total_cores: + # If nimby is active, then frame must have been killed by nimby + # Set the exitSignal to indicate this event + if self.nimby.locked and not runningFrame.ignoreNimby: + report.exit_status = rqd.rqconstants.EXITSTATUS_FOR_NIMBY_KILL + + self.network.reportRunningFrameCompletion(report) + runningFrame.completeReportSent = True + + def sanitizeFrames(self): + """ + Iterate over the cache and update the status of frames that might have + completed but never reported back to cuebot. + """ + for frameId in list(self.__cache.keys()): + runningFrame = self.__cache[frameId] + # If the frame was marked as completed (exitStatus) and a report has not been sent + # try to file the report again + if runningFrame.exitStatus is not None and not runningFrame.completeReportSent: + try: + self.sendFrameCompleteReport(runningFrame) + self.deleteFrame(frameId) + log.info("Successfully deleted frame from cache for %s/%s (%s)", + runningFrame.runFrame.job_name, + runningFrame.runFrame.frame_name, + frameId) + # pylint: disable=broad-except + except Exception: + log.exception("Failed to sanitize frame %s/%s", + runningFrame.runFrame.job_name, + runningFrame.runFrame.frame_name) + + def handleFrameImages(self): + """ + Download docker images to be used by frames running on this host + """ + if self.docker: + docker_client = self.docker.from_env() + for image in self.docker_images.values(): + log.info("Downloading frame image: %s", image) + try: + name, tag = image.split(":") + docker_client.images.pull(name, tag) + except (ImageNotFound, APIError) as e: + raise RuntimeError("Failed to download frame docker image for %s:%s - %s" % + (name, tag, e)) + log.info("Finished downloading frame images") + + +class FrameAttendantThread(threading.Thread): + """Once a frame has been received and checked by RQD, this class handles + the launching, waiting on, and cleanup work related to running the + frame.""" + def __init__(self, rqCore: RqCore, runFrame, frameInfo): + """FrameAttendantThread class initialization + @type rqCore: RqCore + @param rqCore: Main RQD Object + @type runFrame: RunFrame + @param runFrame: rqd_pb2.RunFrame + @type frameInfo: rqd.rqnetwork.RunningFrame + @param frameInfo: Servant for running frame + """ + threading.Thread.__init__(self) + self.rqCore = rqCore + self.frameId = runFrame.frame_id + self.runFrame = runFrame + self.startTime = 0 + self.endTime = 0 + self.frameInfo = frameInfo + self._tempLocations = [] + self.rqlog = None + + def __createEnvVariables(self): + """Define the environmental variables for the frame""" + # If linux specific, they need to move into self.runLinux() + # pylint: disable=attribute-defined-outside-init + self.frameEnv = {} + self.frameEnv["PATH"] = self.rqCore.machine.getPathEnv() + self.frameEnv["TERM"] = "unknown" + self.frameEnv["TZ"] = self.rqCore.machine.getTimezone() + self.frameEnv["USER"] = self.runFrame.user_name + self.frameEnv["LOGNAME"] = self.runFrame.user_name + self.frameEnv["mcp"] = "1" + self.frameEnv["show"] = self.runFrame.show + self.frameEnv["shot"] = self.runFrame.shot + self.frameEnv["jobid"] = self.runFrame.job_name + self.frameEnv["jobhost"] = self.rqCore.machine.getHostname() + self.frameEnv["frame"] = self.runFrame.frame_name + self.frameEnv["zframe"] = self.runFrame.frame_name + self.frameEnv["logfile"] = self.runFrame.log_file + self.frameEnv["maxframetime"] = "0" + self.frameEnv["minspace"] = "200" + self.frameEnv["CUE3"] = "True" + self.frameEnv["CUE_GPU_MEMORY"] = str(self.rqCore.machine.getGpuMemoryFree()) + self.frameEnv["SP_NOMYCSHRC"] = "1" + + if rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX: + self.frameEnv["HOME"] = "%s/%s" % ( + rqd.rqconstants.RQD_CUSTOM_HOME_PREFIX, + self.runFrame.user_name) + + if rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX: + self.frameEnv["MAIL"] = "%s/%s" % ( + rqd.rqconstants.RQD_CUSTOM_MAIL_PREFIX, + self.runFrame.user_name) + + if platform.system() == "Windows": + for variable in ["SYSTEMROOT", "APPDATA", "TMP", "COMMONPROGRAMFILES", "SYSTEMDRIVE"]: + if variable in os.environ: + self.frameEnv[variable] = os.environ[variable] + for variable in rqd.rqconstants.RQD_HOST_ENV_VARS: + # Fallback to empty string, easy to spot what is missing in the log + self.frameEnv[variable] = os.environ.get(variable, '') + + + if platform.system() == "Windows": + for variable in ["SYSTEMROOT", "APPDATA", "TMP", "COMMONPROGRAMFILES", "SYSTEMDRIVE"]: + if variable in os.environ: + self.frameEnv[variable] = os.environ[variable] + for variable in rqd.rqconstants.RQD_HOST_ENV_VARS: + # Fallback to empty string, easy to spot what is missing in the log + self.frameEnv[variable] = os.environ.get(variable, '') + + for key, value in self.runFrame.environment.items(): + if key == 'PATH': + self.frameEnv[key] += os.pathsep + value + else: + self.frameEnv[key] = value + + # Add threads to use all assigned hyper-threading cores + if 'CPU_LIST' in self.runFrame.attributes and 'CUE_THREADS' in self.frameEnv: + self.frameEnv['CUE_THREADS'] = str(max( + int(self.frameEnv['CUE_THREADS']), + len(self.runFrame.attributes['CPU_LIST'].split(',')))) + self.frameEnv['CUE_HT'] = "True" + + # Add GPU's to use all assigned GPU cores + if 'GPU_LIST' in self.runFrame.attributes: + self.frameEnv['CUE_GPU_CORES'] = self.runFrame.attributes['GPU_LIST'] + + # pylint: disable=inconsistent-return-statements + def _createCommandFile(self, command): + """Creates a file that subprocess. Popen then executes. + @type command: string + @param command: The command specified in the runFrame request + @rtype: string + @return: Command file location""" + commandFile = "" + try: + if platform.system() == "Windows": + rqd_tmp_dir = os.path.join(tempfile.gettempdir(), 'rqd') + try: + os.mkdir(rqd_tmp_dir) + except OSError: + pass # okay, already exists + + # Windows Batch needs some characters escaped: + command = command.replace('%', '%%') + for char in '^&<>|': + command = command.replace(char, '^' + char) + + commandFile = os.path.join( + rqd_tmp_dir, + 'cmd-%s-%s.bat' % (self.runFrame.frame_id, time.time())) + else: + commandFile = os.path.join(tempfile.gettempdir(), + 'rqd-cmd-%s-%s' % (self.runFrame.frame_id, time.time())) + with open(commandFile, "w", encoding='utf-8') as rqexe: + self._tempLocations.append(commandFile) + rqexe.write(command) + rqexe.close() + os.chmod(commandFile, 0o777) + return commandFile + # pylint: disable=broad-except + except Exception as e: log.critical( - "idle_cores (%d) have become greater than total_cores (%d): %s at %s", - self.cores.idle_cores, self.cores.total_cores, sys.exc_info()[0], - traceback.extract_tb(sys.exc_info()[2])) - # pylint: enable=no-member + "Unable to make command file: %s due to %s at %s", + commandFile, e, traceback.extract_tb(sys.exc_info()[2])) + raise e - def shutdown(self): - """Shuts down all rqd systems""" - self.nimbyOff() - if self.onIntervalThread is not None: - self.onIntervalThread.cancel() - if self.updateRssThread is not None: - self.updateRssThread.cancel() - elif self.__reboot: - log.warning("Rebooting machine by request") - self.machine.reboot() - else: - log.warning("Shutting down RQD by request. pid(%s)", os.getpid()) - self.network.stopGrpc() - # Using sys.exit would raise SystemExit, giving exception handlers a chance - # to block this - # pylint: disable=protected-access - os._exit(0) + def __writeHeader(self): + """Writes the frame's log header""" - def handleExit(self, signalnum, flag): - """Shutdown threads and exit RQD.""" - del signalnum - del flag - self.shutdown() + self.startTime = time.time() + + try: + print("="*59, file=self.rqlog) + print("RenderQ JobSpec %s" % time.ctime(self.startTime), "\n", file=self.rqlog) + print("proxy rqd.rqnetwork.RunningFrame/%s -t:tcp -h %s -p 10021" % ( + self.runFrame.frame_id, + self.rqCore.machine.getHostname()), file=self.rqlog) + print("%-21s%s" % ("command", self.runFrame.command), file=self.rqlog) + print("%-21s%s" % ("uid", self.runFrame.uid), file=self.rqlog) + print("%-21s%s" % ("gid", self.runFrame.gid), file=self.rqlog) + print("%-21s%s" % ("logDestination", + self.runFrame.log_dir_file), file=self.rqlog) + print("%-21s%s" % ("cwd", self.runFrame.frame_temp_dir), file=self.rqlog) + print("%-21s%s" % ("renderHost", + self.rqCore.machine.getHostname()), file=self.rqlog) + print("%-21s%s" % ("jobId", self.runFrame.job_id), file=self.rqlog) + print("%-21s%s" % ("frameId", self.runFrame.frame_id), file=self.rqlog) + for env in sorted(self.frameEnv): + print("%-21s%s=%s" % ("env", env, self.frameEnv[env]), file=self.rqlog) + print("="*59, file=self.rqlog) + + if 'CPU_LIST' in self.runFrame.attributes: + print('Hyper-threading enabled', file=self.rqlog) + + # pylint: disable=broad-except + except Exception as e: + log.critical( + "Unable to write header to rqlog: %s due to %s at %s", + self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) + + def __writeFooter(self): + """Writes frame's log footer""" + + self.endTime = time.time() + self.frameInfo.runTime = int(self.endTime - self.startTime) + try: + print("", file=self.rqlog) + print("="*59, file=self.rqlog) + print("RenderQ Job Complete\n", file=self.rqlog) + print("%-20s%s" % ("exitStatus", self.frameInfo.exitStatus), file=self.rqlog) + print("%-20s%s" % ("exitSignal", self.frameInfo.exitSignal), file=self.rqlog) + if self.frameInfo.killMessage: + print("%-20s%s" % ("killMessage", self.frameInfo.killMessage), file=self.rqlog) + print("%-20s%s" % ("startTime", + time.ctime(self.startTime)), file=self.rqlog) + print("%-20s%s" % ("endTime", + time.ctime(self.endTime)), file=self.rqlog) + print("%-20s%s" % ("maxrss", self.frameInfo.maxRss), file=self.rqlog) + print("%-20s%s" % ("maxUsedGpuMemory", + self.frameInfo.maxUsedGpuMemory), file=self.rqlog) + print("%-20s%s" % ("utime", self.frameInfo.utime), file=self.rqlog) + print("%-20s%s" % ("stime", self.frameInfo.stime), file=self.rqlog) + print("%-20s%s" % ("renderhost", self.rqCore.machine.getHostname()), file=self.rqlog) + + print("%-20s%s" % ("maxrss (KB)", self.frameInfo.maxRss), file=self.rqlog) + for child in sorted(self.frameInfo.childrenProcs.items(), + key=lambda item: item[1]['start_time']): + print("\t%-20s%s" % (child[1]['name'], child[1]['rss']), file=self.rqlog) + print("\t%-20s%s" % ("start_time", + datetime.timedelta(seconds=child[1]["start_time"])), + file=self.rqlog) + print("\t%-20s%s" % ("cmdline", " ".join(child[1]["cmd_line"])), file=self.rqlog) + + print("="*59, file=self.rqlog) + + # pylint: disable=broad-except + except Exception as e: + log.critical( + "Unable to write footer: %s due to %s at %s", + self.runFrame.log_dir_file, e, traceback.extract_tb(sys.exc_info()[2])) + + def __cleanup(self): + """Cleans up temporary files""" + rqd.rqutil.permissionsHigh() + try: + for location in self._tempLocations: + if os.path.isfile(location): + try: + os.remove(location) + # pylint: disable=broad-except + except Exception as e: + log.warning( + "Unable to delete file: %s due to %s at %s", + location, e, traceback.extract_tb(sys.exc_info()[2])) + finally: + rqd.rqutil.permissionsLow() + + # Close log file + try: + self.rqlog.close() + # pylint: disable=broad-except + except Exception as e: + log.warning( + "Unable to close file: %s due to %s at %s", + self.runFrame.log_file, e, traceback.extract_tb(sys.exc_info()[2])) + + def runLinux(self): + """The steps required to handle a frame under linux""" + frameInfo = self.frameInfo + runFrame = self.runFrame + + self.__createEnvVariables() + self.__writeHeader() + + tempStatFile = "%srqd-stat-%s-%s" % (self.rqCore.machine.getTempPath(), + frameInfo.frameId, + time.time()) + self._tempLocations.append(tempStatFile) + tempCommand = [] + if self.rqCore.machine.isDesktop(): + tempCommand += ["/bin/nice"] + tempCommand += ["/usr/bin/time", "-p", "-o", tempStatFile] + + if 'CPU_LIST' in runFrame.attributes: + tempCommand += ['taskset', '-c', runFrame.attributes['CPU_LIST']] - def launchFrame(self, runFrame): - """This will setup for the launch the frame specified in the arguments. - If a problem is encountered, a CueException will be thrown. - @type runFrame: RunFrame - @param runFrame: rqd_pb2.RunFrame""" - log.info("Running command %s for %s", runFrame.command, runFrame.frame_id) - log.debug(runFrame) + rqd.rqutil.permissionsHigh() + try: + if rqd.rqconstants.RQD_BECOME_JOB_USER: + tempCommand += ["/bin/su", runFrame.user_name, rqd.rqconstants.SU_ARGUMENT, + '"' + self._createCommandFile(runFrame.command) + '"'] + else: + tempCommand += [self._createCommandFile(runFrame.command)] - # - # Check for reasons to abort launch - # + # pylint: disable=subprocess-popen-preexec-fn,consider-using-with + frameInfo.forkedCommand = subprocess.Popen(tempCommand, + env=self.frameEnv, + cwd=self.rqCore.machine.getTempPath(), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + preexec_fn=os.setsid) + finally: + rqd.rqutil.permissionsLow() - if self.machine.state != rqd.compiled_proto.host_pb2.UP: - err = "Not launching, rqd HardwareState is not Up" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + frameInfo.pid = frameInfo.forkedCommand.pid - if self.__whenIdle: - err = "Not launching, rqd is waiting for idle to shutdown" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + if not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() - if self.nimby.locked and not runFrame.ignore_nimby: - err = "Not launching, rqd is lockNimby and not Ignore Nimby" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + poller = select.poll() + poller.register(frameInfo.forkedCommand.stdout, select.POLLIN) + poller.register(frameInfo.forkedCommand.stderr, select.POLLIN) + while True: + for fd, event in poller.poll(): + if event & select.POLLIN: + if fd == frameInfo.forkedCommand.stdout.fileno(): + line = frameInfo.forkedCommand.stdout.readline() + elif fd == frameInfo.forkedCommand.stderr.fileno(): + line = frameInfo.forkedCommand.stderr.readline() + else: + continue + if not line: + break + self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + if frameInfo.forkedCommand.poll() is not None: + break - if rqd.rqconstants.OVERRIDE_NIMBY and self.nimby.isNimbyActive(): - err = "Not launching, rqd is lockNimby and User is Active" - log.info(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + returncode = frameInfo.forkedCommand.wait() - if runFrame.frame_id in self.__cache: - err = "Not launching, frame is already running on this proc %s" % runFrame.frame_id - log.critical(err) - raise rqd.rqexceptions.DuplicateFrameViolationException(err) + # Find exitStatus and exitSignal + if returncode < 0: + # Exited with a signal + frameInfo.exitStatus = 1 + frameInfo.exitSignal = -returncode + else: + frameInfo.exitStatus = returncode + frameInfo.exitSignal = 0 - if runFrame.HasField("uid") and runFrame.uid <= 0: - err = "Not launching, will not run frame as uid=%d" % runFrame.uid - log.warning(err) - raise rqd.rqexceptions.InvalidUserException(err) + try: + with open(tempStatFile, "r", encoding='utf-8') as statFile: + frameInfo.realtime = statFile.readline().split()[1] + frameInfo.utime = statFile.readline().split()[1] + frameInfo.stime = statFile.readline().split()[1] + statFile.close() + # pylint: disable=broad-except + except Exception: + pass # This happens when frames are killed - if runFrame.num_cores <= 0: - err = "Not launching, numCores must be > 0" - log.warning(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) + self.__writeFooter() + self.__cleanup() - # See if all requested cores are available - with self.__threadLock: - # pylint: disable=no-member - if self.cores.idle_cores < runFrame.num_cores: - err = "Not launching, insufficient idle cores" - log.critical(err) - raise rqd.rqexceptions.CoreReservationFailureException(err) - # pylint: enable=no-member + def runDocker(self): + """The steps required to handle a frame under a docker container""" + frameInfo = self.frameInfo + runFrame = self.runFrame - if runFrame.environment.get('CUE_THREADABLE') == '1': - reserveHT = self.machine.reserveHT(runFrame.num_cores) - if reserveHT: - runFrame.attributes['CPU_LIST'] = reserveHT + # Ensure Nullable attributes have been initialized + if not self.rqlog: + raise RuntimeError("Invalid state. rqlog has not been initialized") + if not self.rqCore.docker: + raise RuntimeError("Invalid state: docker_client must have been initialized.") - if runFrame.num_gpus: - reserveGpus = self.machine.reserveGpus(runFrame.num_gpus) - if reserveGpus: - runFrame.attributes['GPU_LIST'] = reserveGpus + try: + image = self.__getFrameImage(runFrame.os) + except RuntimeError as e: + self.__writeHeader() + self.rqlog.write(str(e), prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + raise e - # They must be available at this point, reserve them - # pylint: disable=no-member - self.cores.idle_cores -= runFrame.num_cores - self.cores.booked_cores += runFrame.num_cores - # pylint: enable=no-member + self.__createEnvVariables() + self.__writeHeader() - runningFrame = rqd.rqnetwork.RunningFrame(self, runFrame) - runningFrame.frameAttendantThread = FrameAttendantThread(self, runFrame, runningFrame) - runningFrame.frameAttendantThread.start() + tempStatFile = "%srqd-stat-%s-%s" % (self.rqCore.machine.getTempPath(), + frameInfo.frameId, + time.time()) + self._tempLocations.append(tempStatFile) - def getRunningFrame(self, frameId): - """Gets the currently running frame.""" + # Prevent frame from attempting to run as ROOT + if runFrame.gid <= 0: + gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID + else: + gid = runFrame.gid + + # Never give frame ROOT permissions + if runFrame.uid == 0 or gid == 0: + msg = ("Frame %s cannot run as ROOT" % frameInfo.frameId) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + raise RuntimeError(msg) + + # Thread affinity + tasksetCmd = "" + if runFrame.attributes['CPU_LIST']: + tasksetCmd = "taskset -c %s" % runFrame.attributes['CPU_LIST'] + + # A temporary password for the user created inside of the frame container. + # This user is only valid inside the container, meaning a leakage would only + # be harmful if the perpetrator gains access to run docker commands. + tempPassword = str(uuid.uuid4()) + # Command wrapper + command = r"""#!/bin/sh +useradd -u %s -g %s -p %s %s >& /dev/null || true; +exec su -s %s %s -c "echo \$$; /bin/nice /usr/bin/time -p -o %s %s %s" +""" % ( + runFrame.uid, + gid, + tempPassword, + runFrame.user_name, + rqd.rqconstants.DOCKER_SHELL_PATH, + runFrame.user_name, + tempStatFile, + tasksetCmd, + runFrame.command.replace('"', r"""\"""") + ) + + # Log entrypoint on frame log to simplify replaying frames + self.rqlog.write("DOCKER_ENTRYPOINT = %s" % + # Mask password + command.replace(tempPassword, "[password]").replace(";", "\n"), + prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + # Write command to a file on the job tmpdir to simplify replaying a frame + command = self._createCommandFile(command) + docker_client = self.rqCore.docker.from_env() + container = None + container_id = "00000000" + frameInfo.pid = -1 try: - return self.__cache[frameId] - except KeyError: - log.info("frameId %s is not running on this machine", frameId) - return None + log_stream = None + with self.rqCore.docker_lock: + container = docker_client.containers.run(image=image, + detach=True, + environment=self.frameEnv, + working_dir=self.rqCore.machine.getTempPath(), + mounts=self.rqCore.docker_mounts, + privileged=True, + pid_mode="host", + network="host", + stderr=True, + hostname=self.frameEnv["jobhost"], + entrypoint=command) + + log_stream = container.logs(stream=True) + + if not container or not log_stream: + raise RuntimeError("Container failed to start for %s.%s(%s)" % ( + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId)) + + # Try to get the cmd pid from top if the container is still running. + # If that fails the pid can be acquired from the first line of the log + try: + # Docker SDK type hint states that `top` returns an str + # when in reality it returns a Dict {"Processes": [[]], "Columns": [[]]} + container_top: dict = container.top() + frameInfo.pid = int(container_top["Processes"][0][1]) + except (APIError, TypeError): + for first_line in log_stream: + frameInfo.pid = int(first_line) + break + + # Log frame start info + msg = "Container %s started for %s.%s(%s) with pid %s" % ( + container.short_id, + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId, + frameInfo.pid) + + log.info(msg) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + # Ping rss thread on rqCore + if self.rqCore.updateRssThread and not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() + + # Atatch to the job and follow the logs + for line in log_stream: + self.rqlog.write(line, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + + output = container.wait() + returncode = output["StatusCode"] + except StopIteration: + # This exception can happen when a container is interrupted + # If frame pid is set it means the container has started successfully + if frameInfo.pid and container: + output = container.wait() + returncode = output["StatusCode"] + else: + returncode = -1 + container_id = container.short_id if container else -1 + msg = "Failed to read frame container logs on %s for %s.%s(%s)" % ( + container_id, + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId) + logging.error(msg) + self.rqlog.write(msg, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + # pylint: disable=broad-except + except Exception as e: + returncode = -1 + msg = "Failed to launch frame container" + logging.exception(msg) + self.rqlog.write("%s - %s" % (msg, e), + prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) + finally: + # Clear up container after if finishes + if container: + container_id = container.short_id + container.remove() + docker_client.close() - def getCoreInfo(self): - """Gets the core info report.""" - return self.cores + # Find exitStatus and exitSignal + if returncode < 0: + # Exited with a signal + frameInfo.exitStatus = 1 + frameInfo.exitSignal = -returncode + else: + frameInfo.exitStatus = returncode + frameInfo.exitSignal = 0 - def reportStatus(self): - """Replies with hostReport""" - return self.machine.getHostReport() + # Log frame start info + log.warning("Frame %s.%s(%s) with pid %s finished on container %s with exitStatus %s %s ", + runFrame.job_name, + runFrame.frame_name, + frameInfo.frameId, + frameInfo.pid, + container_id, + frameInfo.exitStatus, + "" if frameInfo.exitStatus == 0 else " - " + runFrame.log_dir_file) - def shutdownRqdNow(self): - """Kill all running frames and shutdown RQD""" - self.machine.state = rqd.compiled_proto.host_pb2.DOWN try: - self.lockAll() - self.killAllFrame("shutdownRqdNow Command") + with open(tempStatFile, "r", encoding='utf-8') as statFile: + frameInfo.realtime = statFile.readline().split()[1] + frameInfo.utime = statFile.readline().split()[1] + frameInfo.stime = statFile.readline().split()[1] + statFile.close() # pylint: disable=broad-except except Exception: - log.exception("Failed to kill frames, stopping service anyways") - if not self.__cache: - self.shutdown() + pass # This happens when frames are killed - def shutdownRqdIdle(self): - """When machine is idle, shutdown RQD""" - log.info("shutdownRqdIdle") - self.lockAll() - self.__whenIdle = True - self.sendStatusReport() - if not self.__cache: - self.shutdownRqdNow() + self.__writeFooter() + self.__cleanup() - def rebootNow(self): - """Kill all running frames and reboot machine. - This is not available when a user is logged in""" - log.warning('Requested to reboot now') - if self.machine.isUserLoggedIn(): - err = ('Rebooting via RQD is not supported for a desktop machine ' - 'when a user is logged in') - log.warning(err) - raise rqd.rqexceptions.RqdException(err) - self.__reboot = True - self.shutdownRqdNow() + def __getFrameImage(self, frame_os=None): + """ + Get the pre-configured image for the given frame_os. - def rebootIdle(self): - """When machine is idle, reboot it""" - log.warning('Requested to reboot machine when idle') - self.lockAll() - self.__whenIdle = True - self.__reboot = True - self.sendStatusReport() - if not self.__cache and not self.machine.isUserLoggedIn(): - self.shutdownRqdNow() + Raises: + RuntimeError - if a suitable image cannot be found + """ + if frame_os: + image = self.rqCore.docker_images.get(frame_os) + if image is None: + raise RuntimeError("This rqd is not configured to run an image " + "for this frame OS: %s. Check the [docker.images] " + "section of rqd.conf for more information." % frame_os) + return image + if self.rqCore.docker_images: + # If a frame doesn't require an specic OS, default to the first configured OS on + # [docker.images] + return list(self.rqCore.docker_images.values())[0] + + raise RuntimeError("Misconfigured rqd. RUN_ON_DOCKER=True requires at " + "least one image on DOCKER_IMAGES ([docker.images] section of rqd.conf)") - def nimbyOn(self): - """Activates nimby, does not kill any running frames until next nimby - event. Also does not unlock until sufficient idle time is reached.""" - if self.nimby and not self.nimby.active: - try: - self.nimby.run() - log.warning("Nimby has been activated") - # pylint: disable=broad-except - except Exception: - self.nimby.locked = False - err = "Nimby is in the process of shutting down" - log.exception(err) - raise rqd.rqexceptions.RqdException(err) + def runWindows(self): + """The steps required to handle a frame under windows""" + frameInfo = self.frameInfo + runFrame = self.runFrame - def nimbyOff(self): - """Deactivates nimby and unlocks any nimby lock""" - if self.nimby.active: - self.nimby.stop() - log.info("Nimby has been deactivated") + self.__createEnvVariables() + self.__writeHeader() - def onNimbyLock(self): - """This is called by nimby when it locks the machine. - All running frames are killed. - A new report is sent to the cuebot.""" - self.killAllFrame("NIMBY Triggered") - self.sendStatusReport() + try: + runFrame.command = runFrame.command.replace('%{frame}', self.frameEnv['CUE_IFRAME']) + tempCommand = [self._createCommandFile(runFrame.command)] - def onNimbyUnlock(self, asOf=None): - """This is called by nimby when it unlocks the machine due to sufficient - idle. A new report is sent to the cuebot. - @param asOf: Time when idle state began, if known.""" - del asOf - self.sendStatusReport() + # pylint: disable=consider-using-with + frameInfo.forkedCommand = subprocess.Popen(tempCommand, + env=self.frameEnv, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # pylint: disable=broad-except + except Exception: + log.critical( + "Failed subprocess.Popen: Due to: \n%s", + ''.join(traceback.format_exception(*sys.exc_info()))) + + frameInfo.pid = frameInfo.forkedCommand.pid - def lock(self, reqLock): - """Locks the requested core. - If a locked status changes, a status report is sent to the cuebot. - @type reqLock: int - @param reqLock: Number of cores to lock, 100 = 1 physical core""" - sendUpdate = False - with self.__threadLock: - # pylint: disable=no-member - numLock = min(self.cores.total_cores - self.cores.locked_cores, - reqLock) - if numLock > 0: - self.cores.locked_cores += numLock - self.cores.idle_cores -= min(numLock, self.cores.idle_cores) - sendUpdate = True - # pylint: enable=no-member + if not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() - log.debug(self.cores) + while True: + output = frameInfo.forkedCommand.stdout.readline() + if not output and frameInfo.forkedCommand.poll() is not None: + break + if output: + self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) - if sendUpdate: - self.sendStatusReport() + frameInfo.forkedCommand.wait() - def lockAll(self): - """"Locks all cores on the machine. - If a locked status changes, a status report is sent.""" - sendUpdate = False - with self.__threadLock: - # pylint: disable=no-member - if self.cores.locked_cores < self.cores.total_cores: - self.cores.locked_cores = self.cores.total_cores - self.cores.idle_cores = 0 - sendUpdate = True - # pylint: enable=no-member + # Find exitStatus and exitSignal + returncode = frameInfo.forkedCommand.returncode + if returncode < INT32_MIN: + returncode = 303 + if returncode > INT32_MAX: + returncode = 304 + frameInfo.exitStatus = returncode + frameInfo.exitSignal = returncode - log.debug(self.cores) + frameInfo.realtime = 0 + frameInfo.utime = 0 + frameInfo.stime = 0 - if sendUpdate: - self.sendStatusReport() + self.__writeFooter() + self.__cleanup() - def unlock(self, reqUnlock): - """Unlocks the requested number of cores. - Also resets reboot/shutdown/restart when idle requests. - If a locked status changes, a status report is sent to the cuebot. - @type reqUnlock: int - @param reqUnlock: Number of cores to unlock, 100 = 1 physical core""" + def runDarwin(self): + """The steps required to handle a frame under mac""" + frameInfo = self.frameInfo - sendUpdate = False + self.__createEnvVariables() + self.__writeHeader() - if (self.__whenIdle or self.__reboot or - self.machine.state != rqd.compiled_proto.host_pb2.UP): - sendUpdate = True + rqd.rqutil.permissionsHigh() + try: + tempCommand = ["/usr/bin/su", frameInfo.runFrame.user_name, "-c", '"' + + self._createCommandFile(frameInfo.runFrame.command) + '"'] - self.__whenIdle = False - self.__reboot = False - self.machine.state = rqd.compiled_proto.host_pb2.UP + # pylint: disable=subprocess-popen-preexec-fn,consider-using-with + frameInfo.forkedCommand = subprocess.Popen(tempCommand, + env=self.frameEnv, + cwd=self.rqCore.machine.getTempPath(), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid) + finally: + rqd.rqutil.permissionsLow() - with self.__threadLock: - # pylint: disable=no-member - numUnlock = min(self.cores.locked_cores, reqUnlock) - if numUnlock > 0: - self.cores.locked_cores -= numUnlock - self.cores.idle_cores += numUnlock - sendUpdate = True - # pylint: enable=no-member + frameInfo.pid = frameInfo.forkedCommand.pid - log.debug(self.cores) + if not self.rqCore.updateRssThread.is_alive(): + self.rqCore.updateRssThread = threading.Timer(rqd.rqconstants.RSS_UPDATE_INTERVAL, + self.rqCore.updateRss) + self.rqCore.updateRssThread.start() - if sendUpdate: - self.sendStatusReport() + while True: + output = frameInfo.forkedCommand.stdout.readline() + if not output and frameInfo.forkedCommand.poll() is not None: + break + if output: + self.rqlog.write(output, prependTimestamp=rqd.rqconstants.RQD_PREPEND_TIMESTAMP) - def unlockAll(self): - """"Unlocks all cores on the machine. - Also resets reboot/shutdown/restart when idle requests. - If a locked status changes, a status report is sent.""" + frameInfo.forkedCommand.wait() - sendUpdate = False + # Find exitStatus and exitSignal + returncode = frameInfo.forkedCommand.returncode + if os.WIFEXITED(returncode): + frameInfo.exitStatus = os.WEXITSTATUS(returncode) + else: + frameInfo.exitStatus = 1 + if os.WIFSIGNALED(returncode): + frameInfo.exitSignal = os.WTERMSIG(returncode) - if (self.__whenIdle or self.__reboot - or self.machine.state != rqd.compiled_proto.host_pb2.UP): - sendUpdate = True + self.__writeFooter() + self.__cleanup() - self.__whenIdle = False - self.__reboot = False - self.machine.state = rqd.compiled_proto.host_pb2.UP + def runUnknown(self): + """The steps required to handle a frame under an unknown OS.""" - with self.__threadLock: - # pylint: disable=no-member - if self.cores.locked_cores > 0: - if not self.nimby.locked: - self.cores.idle_cores += self.cores.locked_cores - self.cores.locked_cores = 0 - sendUpdate = True - # pylint: enable=no-member + def run(self): + """Thread initialization""" + log.info("Monitor frame started for frameId=%s", self.frameId) - log.debug(self.cores) + runFrame = self.runFrame + run_on_docker = self.rqCore.docker is not None - if sendUpdate: - self.sendStatusReport() + # pylint: disable=too-many-nested-blocks + try: + runFrame.job_temp_dir = os.path.join(self.rqCore.machine.getTempPath(), + runFrame.job_name) + runFrame.frame_temp_dir = os.path.join(runFrame.job_temp_dir, + runFrame.frame_name) + runFrame.log_file = "%s.%s.rqlog" % (runFrame.job_name, + runFrame.frame_name) + runFrame.log_dir_file = os.path.join(runFrame.log_dir, runFrame.log_file) - def sendStatusReport(self): - """Sends the current host report to Cuebot.""" - self.network.reportStatus(self.machine.getHostReport()) + try: # Exception block for all exceptions + # Ensure permissions return to Low after this block + try: + if rqd.rqconstants.RQD_CREATE_USER_IF_NOT_EXISTS and runFrame.HasField("uid"): + rqd.rqutil.checkAndCreateUser(runFrame.user_name, + runFrame.uid, + runFrame.gid) + if not run_on_docker: + # Do everything as launching user: + runFrame.gid = rqd.rqconstants.LAUNCH_FRAME_USER_GID + rqd.rqutil.permissionsUser(runFrame.uid, runFrame.gid) - def isWaitingForIdle(self): - """Returns whether the host is waiting until idle to take some action.""" - return self.__whenIdle + # Setup frame logging + try: + self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) + self.rqlog.waitForFile() + # pylint: disable=broad-except + except Exception as e: + err = "Unable to write to %s due to %s" % (runFrame.log_dir_file, e) + raise RuntimeError(err) - def sendFrameCompleteReport(self, runningFrame): - """Send a frameCompleteReport to Cuebot""" - if not runningFrame.completeReportSent: - report = rqd.compiled_proto.report_pb2.FrameCompleteReport() - # pylint: disable=no-member - report.host.CopyFrom(self.machine.getHostInfo()) - report.frame.CopyFrom(runningFrame.runningFrameInfo()) - # pylint: enable=no-member + finally: + rqd.rqutil.permissionsLow() - if runningFrame.exitStatus is None: - report.exit_status = 1 - else: - report.exit_status = runningFrame.exitStatus + # Store frame in cache and register servant + self.rqCore.storeFrame(runFrame.frame_id, self.frameInfo) - report.exit_signal = runningFrame.exitSignal - report.run_time = int(runningFrame.runTime) + if run_on_docker: + self.runDocker() + elif platform.system() == "Linux": + self.runLinux() + elif platform.system() == "Windows": + self.runWindows() + elif platform.system() == "Darwin": + self.runDarwin() + else: + self.runUnknown() - # If nimby is active, then frame must have been killed by nimby - # Set the exitSignal to indicate this event - if self.nimby.locked and not runningFrame.ignoreNimby: - report.exit_status = rqd.rqconstants.EXITSTATUS_FOR_NIMBY_KILL + # pylint: disable=broad-except + except Exception: + log.critical( + "Failed launchFrame: For %s due to: \n%s", + runFrame.frame_id, ''.join(traceback.format_exception(*sys.exc_info()))) + # Notifies the cuebot that there was an error launching + self.frameInfo.exitStatus = rqd.rqconstants.EXITSTATUS_FOR_FAILED_LAUNCH + # Delay keeps the cuebot from spamming failing booking requests + time.sleep(10) + finally: + self.rqCore.releaseCores(self.runFrame.num_cores, runFrame.attributes.get('CPU_LIST'), + runFrame.attributes.get('GPU_LIST') + if 'GPU_LIST' in self.runFrame.attributes else None) - self.network.reportRunningFrameCompletion(report) - runningFrame.completeReportSent = True + self.rqCore.deleteFrame(self.runFrame.frame_id) - def sanitizeFrames(self): - """ - Iterate over the cache and update the status of frames that might have - completed but never reported back to cuebot. - """ - for frameId in list(self.__cache.keys()): - runningFrame = self.__cache[frameId] - # If the frame was marked as completed (exitStatus) and a report has not been sent - # try to file the report again - if runningFrame.exitStatus is not None and not runningFrame.completeReportSent: - try: - self.sendFrameCompleteReport(runningFrame) - self.deleteFrame(frameId) - log.info("Successfully deleted frame from cache for %s/%s (%s)", - runningFrame.runFrame.job_name, - runningFrame.runFrame.frame_name, - frameId) - # pylint: disable=broad-except - except Exception: - log.exception("Failed to sanitize frame %s/%s", - runningFrame.runFrame.job_name, - runningFrame.runFrame.frame_name) + self.rqCore.sendFrameCompleteReport(self.frameInfo) + time_till_next = ( + (self.rqCore.intervalStartTime + self.rqCore.intervalSleepTime) - time.time()) + if time_till_next > (2 * rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC): + self.rqCore.onIntervalThread.cancel() + self.rqCore.onInterval(rqd.rqconstants.RQD_MIN_PING_INTERVAL_SEC) + + log.info("Monitor frame ended for frameId=%s", + self.runFrame.frame_id) diff --git a/rqd/rqd/rqmachine.py b/rqd/rqd/rqmachine.py index 0687858c7..a6d9a6317 100644 --- a/rqd/rqd/rqmachine.py +++ b/rqd/rqd/rqmachine.py @@ -302,7 +302,7 @@ def rssUpdate(self, frames): if re.search(r"\d+", child_statm_fields[1]) else -1 # pylint: disable=broad-except - except (OSError, IOError): + except (OSError, IOError, psutil.ZombieProcess): # Many Linux processes are ephemeral and will disappear before we're able # to read them. This is not typically indicative of a problem. log.debug('Failed to read stat/statm file for pid %s', pid) @@ -315,7 +315,7 @@ def rssUpdate(self, frames): values = list(frames.values()) for frame in values: - if frame.pid > 0: + if frame.pid is not None and frame.pid > 0: session = str(frame.pid) rss = 0 vsize = 0 @@ -870,7 +870,7 @@ def reserveHT(self, frameCores): if frameCores % 100: log.warning('Taskset: Can not reserveHT with fractional cores') return None - log.warning('Taskset: Requesting reserve of %d', (frameCores // 100)) + log.info('Taskset: Requesting reserve of %d', (frameCores // 100)) # Look for the most idle physical cpu. # Prefer to assign cores from the same physical cpu. diff --git a/rqd/rqd/rqutil.py b/rqd/rqd/rqutil.py index 3c11e75ff..3d8abc964 100644 --- a/rqd/rqd/rqutil.py +++ b/rqd/rqd/rqutil.py @@ -157,6 +157,9 @@ def checkAndCreateUser(username, uid=None, gid=None): cmd.append(username) log.info("Frame's username not found on host. Adding user with: %s", cmd) subprocess.check_call(cmd) + # pylint: disable=broad-except + except Exception: + logging.info("useradd failed to add user: %s. User possibly already exists.", username) finally: permissionsLow() diff --git a/rqd/tests/rqcore_test.py b/rqd/tests/rqcore_test.py index fc155a3be..82e585147 100644 --- a/rqd/tests/rqcore_test.py +++ b/rqd/tests/rqcore_test.py @@ -637,6 +637,7 @@ def test_runLinux( rqCore.machine.isDesktop.return_value = True rqCore.machine.getHostInfo.return_value = renderHost rqCore.nimby.locked = False + rqCore.docker = None children = rqd.compiled_proto.report_pb2.ChildrenProcStats() runFrame = rqd.compiled_proto.rqd_pb2.RunFrame( @@ -686,6 +687,91 @@ def test_runLinux( frameInfo ) + @mock.patch('platform.system', new=mock.Mock(return_value='Linux')) + @mock.patch('tempfile.gettempdir') + def test_runDocker(self, getTempDirMock, permsUser, timeMock, popenMock): + # given + currentTime = 1568070634.3 + jobTempPath = '/job/temp/path/' + logDir = '/path/to/log/dir/' + tempDir = '/some/random/temp/dir' + frameId = 'arbitrary-frame-id' + jobName = 'arbitrary-job-name' + frameName = 'arbitrary-frame-name' + frameUid = 928 + frameUsername = 'my-random-user' + returnCode = 0 + renderHost = rqd.compiled_proto.report_pb2.RenderHost(name='arbitrary-host-name') + logFile = os.path.join(logDir, '%s.%s.rqlog' % (jobName, frameName)) + + self.fs.create_dir(tempDir) + + timeMock.return_value = currentTime + getTempDirMock.return_value = tempDir + + rqCore = mock.MagicMock() + rqCore.intervalStartTime = 20 + rqCore.intervalSleepTime = 40 + rqCore.machine.getTempPath.return_value = jobTempPath + rqCore.machine.isDesktop.return_value = True + rqCore.machine.getHostInfo.return_value = renderHost + rqCore.nimby.locked = False + + # Setup mock docker client + rqCore.docker.from_env.return_value.\ + containers.run.return_value.wait.return_value = {"StatusCode": returnCode} + rqCore.docker_images = { + "centos7": "centos7_image", + "rocky9": "rocky9_image", + } + rqCore.docker_mounts = { + "vol1": "/vol1/mount", + "vol2": "/vol2/mount", + } + + children = rqd.compiled_proto.report_pb2.ChildrenProcStats() + + runFrame = rqd.compiled_proto.rqd_pb2.RunFrame( + frame_id=frameId, + job_name=jobName, + frame_name=frameName, + uid=frameUid, + user_name=frameUsername, + log_dir=logDir, + children=children, + environment={"ENVVAR": "env_value"}, + os="centos7") + frameInfo = rqd.rqnetwork.RunningFrame(rqCore, runFrame) + + # when + attendantThread = rqd.rqcore.FrameAttendantThread(rqCore, runFrame, frameInfo) + attendantThread.start() + attendantThread.join() + + # then + cmd_file = os.path.join(tempDir, 'rqd-cmd-%s-%s' % (runFrame.frame_id, currentTime)) + rqCore.docker.from_env.return_value.containers.run.assert_called_with( + image="centos7_image", + detach=True, + environment=mock.ANY, + working_dir=jobTempPath, + mounts=rqCore.docker_mounts, + privileged=True, + pid_mode="host", + network="host", + stderr=True, + hostname=mock.ANY, + entrypoint=cmd_file + ) + + self.assertTrue(os.path.exists(logDir)) + self.assertTrue(os.path.isfile(logFile)) + + rqCore.sendFrameCompleteReport.assert_called_with( + frameInfo + ) + + # TODO(bcipriano) Re-enable this test once Windows is supported. The main sticking point here # is that the log directory is always overridden on Windows which makes mocking difficult. @mock.patch("platform.system", new=mock.Mock(return_value="Windows")) @@ -789,6 +875,7 @@ def test_runDarwin(self, getTempDirMock, permsUser, timeMock, popenMock): rqCore.machine.isDesktop.return_value = True rqCore.machine.getHostInfo.return_value = renderHost rqCore.nimby.locked = False + rqCore.docker = None children = rqd.compiled_proto.report_pb2.ChildrenProcStats() runFrame = rqd.compiled_proto.rqd_pb2.RunFrame(