Skip to content

Commit bef038b

Browse files
authored
Merge pull request #558 from posit-dev/bugfix/test-hangs
Fix hanging issues in integration tests
2 parents 76ddd9c + 83b8e6d commit bef038b

File tree

10 files changed

+156
-107
lines changed

10 files changed

+156
-107
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/amalthea/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ anyhow = "1.0.80"
3434
serde_with = "3.0.0"
3535
serde_repr = "0.1.17"
3636
tracing = "0.1.40"
37+
assert_matches = "1.5.0"
3738

3839
[dev-dependencies]
3940
env_logger = "0.10.0"

crates/amalthea/src/fixtures/dummy_frontend.rs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
*
66
*/
77

8+
use assert_matches::assert_matches;
89
use serde_json::Value;
9-
use stdext::assert_match;
1010

1111
use crate::connection_file::ConnectionFile;
1212
use crate::session::Session;
@@ -155,17 +155,41 @@ impl DummyFrontend {
155155
message.send(&self.stdin_socket).unwrap();
156156
}
157157

158+
pub fn recv(&self, socket: &Socket) -> Message {
159+
// It's important to wait with a timeout because the kernel thread might
160+
// have panicked, preventing it from sending the expected message. The
161+
// tests would then hang indefinitely.
162+
//
163+
// Note that the panic hook will still have run to record the panic, so
164+
// we'll get expected panic information in the test output.
165+
if socket.poll_incoming(1000).unwrap() {
166+
return Message::read_from_socket(socket).unwrap();
167+
}
168+
169+
panic!("Timeout while expecting message on socket {}", socket.name);
170+
}
171+
158172
/// Receives a Jupyter message from the Shell socket
159173
pub fn recv_shell(&self) -> Message {
160-
Message::read_from_socket(&self.shell_socket).unwrap()
174+
self.recv(&self.shell_socket)
175+
}
176+
177+
/// Receives a Jupyter message from the IOPub socket
178+
pub fn recv_iopub(&self) -> Message {
179+
self.recv(&self.iopub_socket)
180+
}
181+
182+
/// Receives a Jupyter message from the Stdin socket
183+
pub fn recv_stdin(&self) -> Message {
184+
self.recv(&self.stdin_socket)
161185
}
162186

163187
/// Receive from Shell and assert `ExecuteReply` message.
164188
/// Returns `execution_count`.
165189
pub fn recv_shell_execute_reply(&self) -> u32 {
166190
let msg = self.recv_shell();
167191

168-
assert_match!(msg, Message::ExecuteReply(data) => {
192+
assert_matches!(msg, Message::ExecuteReply(data) => {
169193
assert_eq!(data.content.status, Status::Ok);
170194
data.content.execution_count
171195
})
@@ -176,22 +200,17 @@ impl DummyFrontend {
176200
pub fn recv_shell_execute_reply_exception(&self) -> u32 {
177201
let msg = self.recv_shell();
178202

179-
assert_match!(msg, Message::ExecuteReplyException(data) => {
203+
assert_matches!(msg, Message::ExecuteReplyException(data) => {
180204
assert_eq!(data.content.status, Status::Error);
181205
data.content.execution_count
182206
})
183207
}
184208

185-
/// Receives a Jupyter message from the IOPub socket
186-
pub fn recv_iopub(&self) -> Message {
187-
Message::read_from_socket(&self.iopub_socket).unwrap()
188-
}
189-
190209
/// Receive from IOPub and assert Busy message
191210
pub fn recv_iopub_busy(&self) -> () {
192211
let msg = self.recv_iopub();
193212

194-
assert_match!(msg, Message::Status(data) => {
213+
assert_matches!(msg, Message::Status(data) => {
195214
assert_eq!(data.content.execution_state, ExecutionState::Busy);
196215
});
197216
}
@@ -200,7 +219,7 @@ impl DummyFrontend {
200219
pub fn recv_iopub_idle(&self) -> () {
201220
let msg = self.recv_iopub();
202221

203-
assert_match!(msg, Message::Status(data) => {
222+
assert_matches!(msg, Message::Status(data) => {
204223
assert_eq!(data.content.execution_state, ExecutionState::Idle);
205224
});
206225
}
@@ -209,7 +228,7 @@ impl DummyFrontend {
209228
pub fn recv_iopub_execute_input(&self) -> ExecuteInput {
210229
let msg = self.recv_iopub();
211230

212-
assert_match!(msg, Message::ExecuteInput(data) => {
231+
assert_matches!(msg, Message::ExecuteInput(data) => {
213232
data.content
214233
})
215234
}
@@ -219,9 +238,9 @@ impl DummyFrontend {
219238
pub fn recv_iopub_execute_result(&self) -> String {
220239
let msg = self.recv_iopub();
221240

222-
assert_match!(msg, Message::ExecuteResult(data) => {
223-
assert_match!(data.content.data, Value::Object(map) => {
224-
assert_match!(map["text/plain"], Value::String(ref string) => {
241+
assert_matches!(msg, Message::ExecuteResult(data) => {
242+
assert_matches!(data.content.data, Value::Object(map) => {
243+
assert_matches!(map["text/plain"], Value::String(ref string) => {
225244
string.clone()
226245
})
227246
})
@@ -233,16 +252,11 @@ impl DummyFrontend {
233252
pub fn recv_iopub_execute_error(&self) -> String {
234253
let msg = self.recv_iopub();
235254

236-
assert_match!(msg, Message::ExecuteError(data) => {
255+
assert_matches!(msg, Message::ExecuteError(data) => {
237256
data.content.exception.evalue
238257
})
239258
}
240259

241-
/// Receives a Jupyter message from the Stdin socket
242-
pub fn recv_stdin(&self) -> Message {
243-
Message::read_from_socket(&self.stdin_socket).unwrap()
244-
}
245-
246260
/// Receives a (raw) message from the heartbeat socket
247261
pub fn recv_heartbeat(&self) -> zmq::Message {
248262
let mut msg = zmq::Message::new();

crates/amalthea/src/kernel.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use crossbeam::channel::unbounded;
1313
use crossbeam::channel::Receiver;
1414
use crossbeam::channel::Select;
1515
use crossbeam::channel::Sender;
16-
use log::error;
1716
use stdext::spawn;
1817
use stdext::unwrap;
1918

@@ -38,6 +37,10 @@ use crate::wire::input_reply::InputReply;
3837
use crate::wire::jupyter_message::Message;
3938
use crate::wire::jupyter_message::OutboundMessage;
4039

40+
macro_rules! report_error {
41+
($($arg:tt)+) => (if cfg!(debug_assertions) { log::error!($($arg)+) } else { panic!($($arg)+) })
42+
}
43+
4144
/// A Kernel represents a unique Jupyter kernel session and is the host for all
4245
/// execution and messaging threads.
4346
pub struct Kernel {
@@ -371,7 +374,7 @@ impl Kernel {
371374
}
372375
// Consume notification
373376
let _ = unwrap!(outbound_notif_socket.socket.recv_bytes(0), Err(err) => {
374-
log::error!("Could not consume outbound notification socket: {}", err);
377+
report_error!("Could not consume outbound notification socket: {}", err);
375378
return false;
376379
});
377380

@@ -424,7 +427,7 @@ impl Kernel {
424427
let n = unwrap!(
425428
zmq::poll(&mut poll_items, -1),
426429
Err(err) => {
427-
error!("While polling 0MQ items: {}", err);
430+
report_error!("While polling 0MQ items: {}", err);
428431
0
429432
}
430433
);
@@ -433,20 +436,20 @@ impl Kernel {
433436
if has_outbound() {
434437
unwrap!(
435438
forward_outbound(),
436-
Err(err) => error!("While forwarding outbound message: {}", err)
439+
Err(err) => report_error!("While forwarding outbound message: {}", err)
437440
);
438441
continue;
439442
}
440443

441444
if has_inbound() {
442445
unwrap!(
443446
forward_inbound(),
444-
Err(err) => error!("While forwarding inbound message: {}", err)
447+
Err(err) => report_error!("While forwarding inbound message: {}", err)
445448
);
446449
continue;
447450
}
448451

449-
log::error!("Could not find readable message");
452+
report_error!("Could not find readable message");
450453
}
451454
}
452455
}
@@ -463,7 +466,7 @@ impl Kernel {
463466
unwrap!(
464467
notif_socket.send(zmq::Message::new()),
465468
Err(err) => {
466-
error!("Couldn't notify 0MQ thread: {}", err);
469+
report_error!("Couldn't notify 0MQ thread: {}", err);
467470
continue;
468471
}
469472
);
@@ -476,7 +479,7 @@ impl Kernel {
476479
notif_socket.recv(&mut msg)
477480
},
478481
Err(err) => {
479-
error!("Couldn't received acknowledgement from 0MQ thread: {}", err);
482+
report_error!("Couldn't received acknowledgement from 0MQ thread: {}", err);
480483
continue;
481484
}
482485
);

crates/amalthea/src/socket/socket.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,12 @@ impl Socket {
180180
}
181181
}
182182

183+
pub fn poll_incoming(&self, timeout_ms: i64) -> zmq::Result<bool> {
184+
Ok(self.socket.poll(zmq::PollEvents::POLLIN, timeout_ms)? != 0)
185+
}
186+
183187
pub fn has_incoming_data(&self) -> zmq::Result<bool> {
184-
Ok(self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0)
188+
self.poll_incoming(0)
185189
}
186190

187191
/// Subscribes a SUB socket to all the published messages from a PUB socket.

crates/ark/src/fixtures/dummy_frontend.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,25 @@ impl DummyArkFrontend {
4242
let frontend = DummyFrontend::new();
4343
let connection_file = frontend.get_connection_file();
4444

45+
// Start the kernel in this thread so that panics are propagated
46+
crate::start::start_kernel(
47+
connection_file,
48+
vec![
49+
String::from("--interactive"),
50+
String::from("--vanilla"),
51+
String::from("--no-save"),
52+
String::from("--no-restore"),
53+
],
54+
None,
55+
SessionMode::Console,
56+
false,
57+
);
58+
59+
// Start the REPL in a background thread, does not return and is never joined
4560
stdext::spawn!("dummy_kernel", || {
46-
crate::start::start_kernel(
47-
connection_file,
48-
vec![String::from("--no-save"), String::from("--no-restore")],
49-
None,
50-
SessionMode::Console,
51-
false,
52-
);
61+
RMain::start();
5362
});
5463

55-
// Wait for startup to complete
56-
RMain::wait_r_initialized();
57-
5864
frontend.complete_initialization();
5965
frontend
6066
}

0 commit comments

Comments
 (0)