Skip to content

Commit 1c48be0

Browse files
authored
stop trying to use R_Is_Running (#176)
* use more evocative name * switch to crossbeam * use bus for broadcast channel * refactor
1 parent b5f7fee commit 1c48be0

23 files changed

+293
-162
lines changed

extensions/positron-r/amalthea/Cargo.lock

+93-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extensions/positron-r/amalthea/crates/amalthea/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ uuid = { version = "0.8.2", features = ["v4"] }
2222
zmq = { version = "0.9.2", features = ["vendored"] }
2323
strum = "0.24"
2424
strum_macros = "0.24"
25+
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }
2526

2627
[dev-dependencies]
2728
rand = "0.8.5"

extensions/positron-r/amalthea/crates/amalthea/src/kernel.rs

+14-12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
*
66
*/
77

8+
use std::sync::Arc;
9+
use std::sync::Mutex;
10+
use std::thread;
11+
812
use crate::connection_file::ConnectionFile;
913
use crate::error::Error;
1014
use crate::language::control_handler::ControlHandler;
@@ -19,10 +23,10 @@ use crate::socket::shell::Shell;
1923
use crate::socket::socket::Socket;
2024
use crate::socket::stdin::Stdin;
2125
use crate::stream_capture::StreamCapture;
22-
use std::sync::mpsc::sync_channel;
23-
use std::sync::mpsc::{Receiver, SyncSender};
24-
use std::sync::{Arc, Mutex};
25-
use std::thread;
26+
27+
use crossbeam::channel::Receiver;
28+
use crossbeam::channel::Sender;
29+
use crossbeam::channel::bounded;
2630
use log::{warn, info};
2731

2832
/// A Kernel represents a unique Jupyter kernel session and is the host for all
@@ -35,7 +39,7 @@ pub struct Kernel {
3539
session: Session,
3640

3741
/// Sends messages to the IOPub socket
38-
iopub_sender: SyncSender<IOPubMessage>,
42+
iopub_sender: Sender<IOPubMessage>,
3943

4044
/// Receives message sent to the IOPub socket
4145
iopub_receiver: Option<Receiver<IOPubMessage>>,
@@ -55,12 +59,12 @@ impl Kernel {
5559
pub fn new(file: ConnectionFile) -> Result<Kernel, Error> {
5660
let key = file.key.clone();
5761

58-
let (iopub_sender, iopub_receiver) = sync_channel::<IOPubMessage>(10);
62+
let (iopub_sender, iopub_receiver) = bounded::<IOPubMessage>(10);
5963

6064
Ok(Self {
6165
connection: file,
6266
session: Session::create(key)?,
63-
iopub_sender,
67+
iopub_sender: iopub_sender,
6468
iopub_receiver: Some(iopub_receiver),
6569
})
6670
}
@@ -164,7 +168,7 @@ impl Kernel {
164168
}
165169

166170
/// Returns a copy of the IOPub sending channel.
167-
pub fn create_iopub_sender(&self) -> SyncSender<IOPubMessage> {
171+
pub fn create_iopub_sender(&self) -> Sender<IOPubMessage> {
168172
self.iopub_sender.clone()
169173
}
170174

@@ -177,7 +181,7 @@ impl Kernel {
177181
/// Starts the shell thread.
178182
fn shell_thread(
179183
socket: Socket,
180-
iopub_sender: SyncSender<IOPubMessage>,
184+
iopub_sender: Sender<IOPubMessage>,
181185
shell_handler: Arc<Mutex<dyn ShellHandler>>,
182186
) -> Result<(), Error> {
183187
let mut shell = Shell::new(socket, iopub_sender.clone(), shell_handler);
@@ -210,9 +214,7 @@ impl Kernel {
210214
}
211215

212216
/// Starts the output capture thread.
213-
fn output_capture_thread(
214-
iopub_sender: SyncSender<IOPubMessage>,
215-
) -> Result<(), Error> {
217+
fn output_capture_thread(iopub_sender: Sender<IOPubMessage>) -> Result<(), Error> {
216218
let output_capture = StreamCapture::new(iopub_sender);
217219
output_capture.listen();
218220
Ok(())

extensions/positron-r/amalthea/crates/amalthea/src/language/lsp_handler.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ use crate::error::Error;
1515
#[async_trait]
1616
pub trait LspHandler: Send {
1717
/// Starts the LSP server and binds it to the given TCP address.
18-
fn start(&self, tcp_address: String) -> Result<(), Error>;
18+
fn start(&mut self, tcp_address: String) -> Result<(), Error>;
1919
}

extensions/positron-r/amalthea/crates/amalthea/src/language/shell_handler.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ use crate::wire::is_complete_request::IsCompleteRequest;
2121
use crate::wire::kernel_info_reply::KernelInfoReply;
2222
use crate::wire::kernel_info_request::KernelInfoRequest;
2323

24-
use std::sync::mpsc::SyncSender;
25-
2624
use async_trait::async_trait;
25+
use crossbeam::channel::Sender;
2726

2827
#[async_trait]
2928
pub trait ShellHandler: Send {
@@ -89,5 +88,5 @@ pub trait ShellHandler: Send {
8988
/// input and deliver it via the `handle_input_reply` method.
9089
///
9190
/// https://jupyter-client.readthedocs.io/en/stable/messaging.html#messages-on-the-stdin-router-dealer-channel
92-
fn establish_input_handler(&mut self, handler: SyncSender<ShellInputRequest>);
91+
fn establish_input_handler(&mut self, handler: Sender<ShellInputRequest>);
9392
}

extensions/positron-r/amalthea/crates/amalthea/src/socket/iopub.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use crate::wire::jupyter_message::ProtocolMessage;
1818
use crate::wire::status::ExecutionState;
1919
use crate::wire::status::KernelStatus;
2020
use crate::wire::stream::StreamOutput;
21+
use crossbeam::channel::Receiver;
2122
use log::{trace, warn};
22-
use std::sync::mpsc::Receiver;
2323

2424
pub struct IOPub {
2525
/// The underlying IOPub socket

extensions/positron-r/amalthea/crates/amalthea/src/socket/shell.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ use crate::wire::kernel_info_reply::KernelInfoReply;
3131
use crate::wire::kernel_info_request::KernelInfoRequest;
3232
use crate::wire::status::ExecutionState;
3333
use crate::wire::status::KernelStatus;
34+
use crossbeam::channel::Sender;
3435
use futures::executor::block_on;
3536
use log::{debug, trace, warn};
3637
use std::collections::HashMap;
37-
use std::sync::mpsc::SyncSender;
38-
use std::sync::{Arc, Mutex};
3938
use std::str::FromStr;
39+
use std::sync::Arc;
40+
use std::sync::Mutex;
4041

4142
/// Wrapper for the Shell socket; receives requests for execution, etc. from the
4243
/// front end and handles them or dispatches them to the execution thread.
@@ -45,7 +46,7 @@ pub struct Shell {
4546
socket: Socket,
4647

4748
/// Sends messages to the IOPub socket (owned by another thread)
48-
iopub_sender: SyncSender<IOPubMessage>,
49+
iopub_sender: Sender<IOPubMessage>,
4950

5051
/// Language-provided shell handler object
5152
handler: Arc<Mutex<dyn ShellHandler>>,
@@ -62,7 +63,7 @@ impl Shell {
6263
/// * `handler` - The language's shell channel handler
6364
pub fn new(
6465
socket: Socket,
65-
iopub_sender: SyncSender<IOPubMessage>,
66+
iopub_sender: Sender<IOPubMessage>,
6667
handler: Arc<Mutex<dyn ShellHandler>>,
6768
) -> Self {
6869
Self {

extensions/positron-r/amalthea/crates/amalthea/src/socket/stdin.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@
55
*
66
*/
77

8+
use std::sync::Arc;
9+
use std::sync::Mutex;
10+
811
use crate::language::shell_handler::ShellHandler;
912
use crate::socket::socket::Socket;
1013
use crate::wire::input_request::ShellInputRequest;
1114
use crate::wire::jupyter_message::JupyterMessage;
1215
use crate::wire::jupyter_message::Message;
16+
use crossbeam::channel::bounded;
1317
use futures::executor::block_on;
1418
use log::{trace, warn};
15-
use std::sync::mpsc::sync_channel;
16-
use std::sync::{Arc, Mutex};
1719

1820
pub struct Stdin {
1921
/// The ZeroMQ stdin socket
@@ -40,7 +42,7 @@ impl Stdin {
4042
/// 1. Wait for
4143
pub fn listen(&self) {
4244
// Create the communication channel for the shell handler and inject it
43-
let (sender, receiver) = sync_channel::<ShellInputRequest>(1);
45+
let (sender, receiver) = bounded::<ShellInputRequest>(1);
4446
{
4547
let mut shell_handler = self.handler.lock().unwrap();
4648
shell_handler.establish_input_handler(sender);

0 commit comments

Comments
 (0)