Skip to content

Commit 983c48a

Browse files
authored
Wait until 0MQ sockets are created before starting R (#43)
1 parent 73ff451 commit 983c48a

File tree

5 files changed

+32
-3
lines changed

5 files changed

+32
-3
lines changed

crates/amalthea/src/kernel.rs

+9
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ impl Kernel {
105105
control_handler: Arc<Mutex<dyn ControlHandler>>,
106106
lsp_handler: Option<Arc<Mutex<dyn LspHandler>>>,
107107
stream_behavior: StreamBehavior,
108+
conn_init_tx: Option<Sender<bool>>,
108109
) -> Result<(), Error> {
109110
let ctx = zmq::Context::new();
110111

@@ -205,6 +206,14 @@ impl Kernel {
205206
self.connection.endpoint(self.connection.control_port),
206207
)?;
207208

209+
// 0MQ sockets are now initialised. We can start the kernel runtime
210+
// with relative multithreading safety. See
211+
// https://github.com/rstudio/positron/issues/720
212+
if let Some(tx) = conn_init_tx {
213+
tx.send(true).unwrap();
214+
drop(tx);
215+
}
216+
208217
// TODO: thread/join thread? Exiting this thread will cause the whole
209218
// kernel to exit.
210219
Self::control_thread(control_socket, control_handler);

crates/amalthea/tests/client.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ fn test_kernel() {
5454

5555
// Create the thread that will run the Amalthea kernel
5656
thread::spawn(
57-
move || match kernel.connect(shell, control, None, StreamBehavior::None) {
57+
move || match kernel.connect(shell, control, None, StreamBehavior::None, None) {
5858
Ok(_) => {
5959
info!("Kernel connection initiated");
6060
},

crates/ark/src/main.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ fn start_kernel(connection_file: ConnectionFile, capture_streams: bool) {
6060
kernel_init_tx.add_rx(),
6161
)));
6262

63+
let (conn_init_tx, conn_init_rx) = bounded::<bool>(0);
64+
6365
// Create the shell.
6466
let kernel_init_rx = kernel_init_tx.add_rx();
6567
let shell = Shell::new(
@@ -68,6 +70,7 @@ fn start_kernel(connection_file: ConnectionFile, capture_streams: bool) {
6870
shell_request_rx,
6971
kernel_init_tx,
7072
kernel_init_rx,
73+
conn_init_rx,
7174
);
7275

7376
// Create the control handler; this is used to handle shutdown/interrupt and
@@ -83,7 +86,13 @@ fn start_kernel(connection_file: ConnectionFile, capture_streams: bool) {
8386

8487
// Create the kernel
8588
let shell = Arc::new(Mutex::new(shell));
86-
match kernel.connect(shell, control, Some(lsp), stream_behavior) {
89+
match kernel.connect(
90+
shell,
91+
control,
92+
Some(lsp),
93+
stream_behavior,
94+
Some(conn_init_tx),
95+
) {
8796
Ok(()) => {
8897
println!("R Kernel exiting.");
8998
},

crates/ark/src/shell.rs

+11
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,20 @@ impl Shell {
6767
shell_request_rx: Receiver<Request>,
6868
kernel_init_tx: Bus<KernelInfo>,
6969
kernel_init_rx: BusReader<KernelInfo>,
70+
conn_init_rx: Receiver<bool>,
7071
) -> Self {
7172
let iopub_tx = iopub_tx.clone();
7273
spawn!("ark-r-main-thread", move || {
74+
// Block until 0MQ is initialised before starting R to avoid
75+
// thread-safety issues. See https://github.com/rstudio/positron/issues/720
76+
if let Err(err) = conn_init_rx.recv_timeout(std::time::Duration::from_secs(3)) {
77+
warn!(
78+
"Failed to get init notification from main thread: {:?}",
79+
err
80+
);
81+
}
82+
drop(conn_init_rx);
83+
7384
Self::execution_thread(iopub_tx, kernel_init_tx, shell_request_rx);
7485
});
7586

crates/echo/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ fn start_kernel(connection_file: ConnectionFile) {
3737
let shell = Arc::new(Mutex::new(Shell::new(shell_tx)));
3838
let control = Arc::new(Mutex::new(Control {}));
3939

40-
match kernel.connect(shell, control, None, StreamBehavior::None) {
40+
match kernel.connect(shell, control, None, StreamBehavior::None, None) {
4141
Ok(()) => {
4242
let mut s = String::new();
4343
println!("Kernel activated, press Ctrl+C to end ");

0 commit comments

Comments
 (0)