Skip to content

Commit e611582

Browse files
committed
Switch to stdin for config object transmission
1 parent e7d18aa commit e611582

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import base64
21
import logging
32
import math
43
import pickle
@@ -543,18 +542,22 @@ def _start_local_interchange_process(self) -> None:
543542
"cert_dir": self.cert_dir,
544543
}
545544

546-
encoded = base64.b64encode(pickle.dumps(interchange_config))
545+
config_pickle = pickle.dumps(interchange_config)
547546

548-
cmd: List[bytes] = [b"interchange.py",
549-
encoded
550-
]
551-
self.interchange_proc = subprocess.Popen(cmd)
547+
self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE)
548+
stdin = self.interchange_proc.stdin
549+
assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode"
552550

551+
logger.debug("Popened interchange process. Writing config object")
552+
stdin.write(config_pickle)
553+
stdin.flush()
554+
logger.debug("Sent config object. Requesting worker ports")
553555
try:
554556
(self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120)
555557
except CommandClientTimeoutError:
556-
logger.error("Interchange has not completed initialization in 120s. Aborting")
558+
logger.error("Interchange has not completed initialization. Aborting")
557559
raise Exception("Interchange failed to start")
560+
logger.debug("Got worker ports")
558561

559562
def _start_queue_management_thread(self):
560563
"""Method to start the management thread as a daemon.

parsl/executors/high_throughput/interchange.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#!/usr/bin/env python
2-
import base64
32
import datetime
43
import json
54
import logging
@@ -676,7 +675,7 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:
676675
if __name__ == "__main__":
677676
setproctitle("parsl: HTEX interchange")
678677

679-
config = pickle.loads(base64.b64decode(sys.argv[1]))
678+
config = pickle.load(sys.stdin.buffer)
680679

681680
ic = Interchange(**config)
682681
ic.start()

0 commit comments

Comments
 (0)