Skip to content

Commit 801c801

Browse files
committed
Simpler approach to implementing timeouts
1 parent f11cc92 commit 801c801

File tree

2 files changed

+19
-23
lines changed

2 files changed

+19
-23
lines changed

crates/amalthea/src/fixtures/dummy_frontend.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -155,41 +155,33 @@ impl DummyFrontend {
155155
message.send(&self.stdin_socket).unwrap();
156156
}
157157

158-
pub fn recv(socket: &Socket) -> Message {
159-
let (tx, rx) = crossbeam::channel::bounded(1);
160-
161-
// There is no timeout variant on our `Socket `API, and `Socket` is not
162-
// Sync, so we need to spawn a thread to handle the timeout
163-
stdext::spawn!("dummy_frontend_timeout", move || {
164-
if let Err(err) = rx.recv_timeout(std::time::Duration::from_secs(1)) {
165-
eprintln!("Timeout while receiving message: {err}");
166-
167-
// Can't panic as this would only poison the thread
168-
std::process::exit(42);
169-
}
170-
});
171-
172-
let out = Message::read_from_socket(socket).unwrap();
173-
174-
// Notify timeout thread we're done
175-
tx.send(()).unwrap();
158+
pub fn recv(&self, socket: &Socket) -> Message {
159+
// It's important to wait with a timeout because the kernel thread might
160+
// have panicked, preventing it from sending the expected message. The
161+
// tests would then hang indefinitely.
162+
//
163+
// Note that the panic hook will still have run to record the panic, so
164+
// we'll get expected panic information in the test output.
165+
if socket.poll_incoming(1000).unwrap() {
166+
return Message::read_from_socket(socket).unwrap();
167+
}
176168

177-
out
169+
panic!("Timeout while expecting message on socket {}", socket.name);
178170
}
179171

180172
/// Receives a Jupyter message from the Shell socket
181173
pub fn recv_shell(&self) -> Message {
182-
Self::recv(&self.shell_socket)
174+
self.recv(&self.shell_socket)
183175
}
184176

185177
/// Receives a Jupyter message from the IOPub socket
186178
pub fn recv_iopub(&self) -> Message {
187-
Self::recv(&self.iopub_socket)
179+
self.recv(&self.iopub_socket)
188180
}
189181

190182
/// Receives a Jupyter message from the Stdin socket
191183
pub fn recv_stdin(&self) -> Message {
192-
Self::recv(&self.stdin_socket)
184+
self.recv(&self.stdin_socket)
193185
}
194186

195187
/// Receive from Shell and assert `ExecuteReply` message.

crates/amalthea/src/socket/socket.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,12 @@ impl Socket {
180180
}
181181
}
182182

183+
pub fn poll_incoming(&self, timeout_ms: i64) -> zmq::Result<bool> {
184+
Ok(self.socket.poll(zmq::PollEvents::POLLIN, timeout_ms)? != 0)
185+
}
186+
183187
pub fn has_incoming_data(&self) -> zmq::Result<bool> {
184-
Ok(self.socket.poll(zmq::PollEvents::POLLIN, 0)? != 0)
188+
self.poll_incoming(0)
185189
}
186190

187191
/// Subscribes a SUB socket to all the published messages from a PUB socket.

0 commit comments

Comments
 (0)