|
| 1 | +--- |
| 2 | +title: Async with Tokio |
| 3 | +date: "`r Sys.Date()`" |
| 4 | +--- |
| 5 | + |
| 6 | + |
| 7 | +```{r include = FALSE} |
| 8 | +
|
| 9 | +``` |
| 10 | + |
| 11 | + |
| 12 | +Asynchronous programming enables efficient handling of I/O-bound operations like network requests, file system operations, and database queries without blocking the main program thread. This is crucial for building performant applications that can handle many concurrent operations. |
| 13 | + |
| 14 | +Many crates in the Rust ecosystem use an **asynchronous** runtime through the [tokio](https://crates.io/crates/tokio) crate. Tokio provides a **multithreaded** runtime and is used by popular libraries like [reqwest](https://crates.io/crates/reqwest), [axum](https://crates.io/crates/axum), [DataFusion](https://crates.io/crates/datafusion), [sqlx](https://crates.io/crates/sqlx), and many more. |
| 15 | + |
| 16 | +extendr doesn't provide an async function interface because there is not a true async runtime for R let alone any C API infrastructure for it. But that does not mean we cannot harness the vast async ecosystem in Rust. |
| 17 | + |
| 18 | +## Add tokio |
| 19 | + |
| 20 | + |
| 21 | +To utilize tokio with extendr we will need to bump the msrv of our package to `1.70` as that is the MSRV of `tokio` and we will be using a langauge feature called [`OnceLock`](https://doc.rust-lang.org/std/sync/struct.OnceLock.html) that wasn't stabilized until 1.70. |
| 22 | + |
| 23 | + |
| 24 | +```r |
| 25 | +rextendr::use_msrv("1.70") |
| 26 | +rextendr::use_crate("tokio", features = "rt-multi-thread") |
| 27 | +``` |
| 28 | + |
| 29 | +The `use_msrv()` function will bump the MSRV specified in your package's `DESCRIPTION` file. The `use_crate()` function will call `cargo add` on your behalf and add the crate to your `Cargo.toml`. |
| 30 | + |
| 31 | +## Creating your runtime |
| 32 | + |
| 33 | +Your R package should share one runtime across all function calls. This approach: |
| 34 | + |
| 35 | +- Creates a global static variable using `OnceLock<>` (thread-safe, write-once) |
| 36 | +- Uses lazy initialization—runtime is created only when first needed |
| 37 | +- Returns a reference to the same runtime on subsequent calls |
| 38 | + |
| 39 | +::: callout-note |
| 40 | +## Futures |
| 41 | + |
| 42 | +See [Futures and the Async Syntax](https://doc.rust-lang.org/book/ch17-01-futures-and-syntax.html) section of The Book™. |
| 43 | + |
| 44 | +::: |
| 45 | + |
| 46 | +In your `lib.rs` we define a static called `TOKIO_RUNTIME` which contains a `Runtime`. |
| 47 | + |
| 48 | +The function `get_rt()` will create a new `Runtime` the first time it is called. Each subsequent call returns a reference to that created runtime. |
| 49 | + |
| 50 | +```rust |
| 51 | +use extendr_api::prelude::*; |
| 52 | +use std::sync::OnceLock; |
| 53 | +use tokio::runtime::{Builder, Runtime}; |
| 54 | + |
| 55 | +// Initialize a shared tokio runtime for the package |
| 56 | +static TOKIO_RUNTIME: OnceLock<Runtime> = OnceLock::new(); |
| 57 | + |
| 58 | +// Helper function to get a tokio runtime |
| 59 | +fn get_rt() -> &'static Runtime { |
| 60 | + TOKIO_RUNTIME.get_or_init(|| { |
| 61 | + Builder::new_multi_thread() |
| 62 | + .enable_all() |
| 63 | + .build() |
| 64 | + .expect("Failed to create tokio runtime") |
| 65 | + }) |
| 66 | +} |
| 67 | +``` |
| 68 | + |
| 69 | +Now, in any function we want to use the tokio run time can first call `get_rt()` to get a reference to it. |
| 70 | + |
| 71 | +## Blocking on `async` futures |
| 72 | + |
| 73 | +For a motivating example we can use the async file reader from tokio using our new runtime. |
| 74 | + |
| 75 | + |
| 76 | +```rust |
| 77 | +#[extendr] |
| 78 | +fn read_file_async(path: &str) -> String { |
| 79 | + // get the tokio runtime |
| 80 | + let rt = get_rt(); |
| 81 | + |
| 82 | + // define a future, typically we would `.await` |
| 83 | + let file_content_fut = tokio::fs::read_to_string(path); |
| 84 | + |
| 85 | + // use `.block_on()` to await the future |
| 86 | + rt.block_on(file_content_fut).expect("failed to read file") |
| 87 | +} |
| 88 | +``` |
| 89 | + |
| 90 | +The first step is to get the tokio runtime. Then we call the async function, which typically we would `.await` to get the result. Instead, we call `.block_on()` to execute the future and get the result. |
| 91 | + |
| 92 | +## Example: read many files async |
| 93 | + |
| 94 | +For a more complete / complex example we can create a function that reads multiple files in parallel and awaits all of the futures asynchronously. |
| 95 | + |
| 96 | + |
| 97 | +```rust |
| 98 | +#[extendr] |
| 99 | +fn read_files_async(paths: Vec<String>) -> Strings { |
| 100 | + // get the tokio runtime |
| 101 | + let rt = get_rt(); |
| 102 | + |
| 103 | + // create a joinset to await multiple futures asynchronously |
| 104 | + let mut set = tokio::task::JoinSet::new(); |
| 105 | + |
| 106 | + // spawn each future in the join set |
| 107 | + for p in paths { |
| 108 | + set.spawn(tokio::fs::read_to_string(p)); |
| 109 | + } |
| 110 | + |
| 111 | + // wait for all futures to resolve |
| 112 | + let all_file_bodies = rt.block_on(set.join_all()); |
| 113 | + |
| 114 | + // filter out any files that failed to read |
| 115 | + // return the contents as a character vector |
| 116 | + all_file_bodies |
| 117 | + .into_iter() |
| 118 | + .filter_map(|contents| contents.ok().map(Rstr::from)) |
| 119 | + .collect::<Strings>() |
| 120 | +} |
| 121 | +``` |
| 122 | + |
| 123 | +## What this unlocks |
| 124 | + |
| 125 | +With tokio working in extendr, we now have access to the entire async Rust ecosystem. This means we can build R packages using: |
| 126 | + |
| 127 | +- [DataFusion](https://github.com/apache/datafusion) - high-performance SQL query engine |
| 128 | +- [DataFusion Comet](https://github.com/apache/datafusion-comet) - Spark accelerator |
| 129 | +- [lancedb](https://github.com/lancedb/lancedb) - vector database for AI applications |
| 130 | +- [qdrant](https://github.com/qdrant/qdrant) - vector search engine for next-gen AI |
| 131 | +- [burn](https://github.com/tracel-ai/burn) - deep learning framework |
| 132 | +- [sail](https://github.com/lakehq/sail) - unified batch and stream processing |
| 133 | + |
| 134 | +The async ecosystem is massive and growing. Now R can be part of it. |
0 commit comments