Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,20 @@ jobs:
toolchain: ${{ matrix.channel }}
targets: ${{ matrix.target.toolchain }}
- uses: swatinem/rust-cache@v2
- name: cargo test (all features)
- name: cargo test (workspace, all features)
run: cargo test --locked --workspace --all-features --bins --tests --examples
- name: cargo test (default features)
- name: cargo test (workspace, default features)
run: cargo test --locked --workspace --bins --tests --examples
- name: cargo test (no default features)
- name: cargo test (workspace, no default features)
run: cargo test --locked --workspace --no-default-features --bins --tests --examples
- name: cargo check (feature message_spans)
run: cargo check --no-default-features --features message_spans
- name: cargo check (feature rpc)
run: cargo check --no-default-features --features rpc
- name: cargo check (irpc, no default features)
run: cargo check --locked --no-default-features --bins --tests --examples
- name: cargo check (irpc, feature derive)
run: cargo check --locked --no-default-features --features derive --bins --tests --examples
- name: cargo check (irpc, feature spans)
run: cargo check --locked --no-default-features --features spans --bins --tests --examples
- name: cargo check (irpc, feature rpc)
run: cargo check --locked --no-default-features --features rpc --bins --tests --examples

test-release:
runs-on: ${{ matrix.target.os }}
Expand Down
22 changes: 19 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,26 @@ rpc = ["dep:quinn", "dep:postcard", "dep:anyhow", "dep:smallvec", "dep:tracing",
# add test utilities
quinn_endpoint_setup = ["rpc", "dep:rustls", "dep:rcgen", "dep:anyhow", "dep:futures-buffered", "quinn/rustls-ring"]
# pick up parent span when creating channel messages
message_spans = ["dep:tracing"]
spans = ["dep:tracing"]
stream = ["dep:futures-util"]
derive = ["dep:irpc-derive"]
default = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"]
default = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"]

[[example]]
name = "derive"
required-features = ["rpc", "derive", "quinn_endpoint_setup"]

[[example]]
name = "compute"
required-features = ["rpc", "derive", "quinn_endpoint_setup"]

[[example]]
name = "local"
required-features = ["derive"]

[[example]]
name = "storage"
required-features = ["rpc", "quinn_endpoint_setup"]

[workspace]
members = ["irpc-derive", "irpc-iroh"]
Expand All @@ -84,7 +100,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(quicrpc_docsrs)"] }
anyhow = { version = "1.0.98" }
tokio = { version = "1.44", default-features = false }
postcard = { version = "1.1.1", default-features = false }
serde = { version = "1", default-features = false }
serde = { version = "1", default-features = false, features = ["derive"] }
tracing = { version = "0.1.41", default-features = false }
n0-future = { version = "0.1.2", default-features = false }
tracing-subscriber = { version = "0.3.19" }
Expand Down
54 changes: 19 additions & 35 deletions examples/compute.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use std::{
io::{self, Write},
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};

use anyhow::bail;
use futures_buffered::BufferedStreamExt;
use irpc::{
channel::{mpsc, oneshot},
rpc::{listen, Handler},
rpc::{listen, RemoteService},
rpc_requests,
util::{make_client_endpoint, make_server_endpoint},
Client, LocalSender, Request, Service, WithChannels,
Client, Request, WithChannels,
};
use n0_future::{
stream::StreamExt,
Expand All @@ -21,11 +20,19 @@ use serde::{Deserialize, Serialize};
use thousands::Separable;
use tracing::trace;

// Define the ComputeService
#[derive(Debug, Clone, Copy)]
struct ComputeService;

impl Service for ComputeService {}
// Define the protocol and message enums using the macro
#[rpc_requests(message = ComputeMessage)]
#[derive(Serialize, Deserialize, Debug)]
enum ComputeProtocol {
#[rpc(tx=oneshot::Sender<u128>)]
Sqr(Sqr),
#[rpc(rx=mpsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
Sum(Sum),
#[rpc(tx=mpsc::Sender<u64>)]
Fibonacci(Fibonacci),
#[rpc(rx=mpsc::Receiver<u64>, tx=mpsc::Sender<u64>)]
Multiply(Multiply),
}

// Define ComputeRequest sub-messages
#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -55,20 +62,6 @@ enum ComputeRequest {
Multiply(Multiply),
}

// Define the protocol and message enums using the macro
#[rpc_requests(ComputeService, message = ComputeMessage)]
#[derive(Serialize, Deserialize)]
enum ComputeProtocol {
#[rpc(tx=oneshot::Sender<u128>)]
Sqr(Sqr),
#[rpc(rx=mpsc::Receiver<i64>, tx=oneshot::Sender<i64>)]
Sum(Sum),
#[rpc(tx=mpsc::Sender<u64>)]
Fibonacci(Fibonacci),
#[rpc(rx=mpsc::Receiver<u64>, tx=mpsc::Sender<u64>)]
Multiply(Multiply),
}

// The actor that processes requests
struct ComputeActor {
recv: tokio::sync::mpsc::Receiver<ComputeMessage>,
Expand All @@ -79,9 +72,8 @@ impl ComputeActor {
let (tx, rx) = tokio::sync::mpsc::channel(128);
let actor = Self { recv: rx };
n0_future::task::spawn(actor.run());
let local = LocalSender::<ComputeMessage, ComputeService>::from(tx);
ComputeApi {
inner: local.into(),
inner: Client::local(tx),
}
}

Expand Down Expand Up @@ -157,7 +149,7 @@ impl ComputeActor {
// The API for interacting with the ComputeService
#[derive(Clone)]
struct ComputeApi {
inner: Client<ComputeMessage, ComputeProtocol, ComputeService>,
inner: Client<ComputeProtocol>,
}

impl ComputeApi {
Expand All @@ -168,18 +160,10 @@ impl ComputeApi {
}

pub fn listen(&self, endpoint: quinn::Endpoint) -> anyhow::Result<AbortOnDropHandle<()>> {
let Some(local) = self.inner.local() else {
let Some(local) = self.inner.as_local() else {
bail!("cannot listen on a remote service");
};
let handler: Handler<ComputeProtocol> = Arc::new(move |msg, rx, tx| {
let local = local.clone();
Box::pin(match msg {
ComputeProtocol::Sqr(msg) => local.send((msg, tx)),
ComputeProtocol::Sum(msg) => local.send((msg, tx, rx)),
ComputeProtocol::Fibonacci(msg) => local.send((msg, tx)),
ComputeProtocol::Multiply(msg) => local.send((msg, tx, rx)),
})
});
let handler = ComputeProtocol::remote_handler(local);
Ok(AbortOnDropHandle::new(task::spawn(listen(
endpoint, handler,
))))
Expand Down
39 changes: 14 additions & 25 deletions examples/derive.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
use std::{
collections::BTreeMap,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
};

use anyhow::{Context, Result};
use irpc::{
channel::{mpsc, oneshot},
rpc::Handler,
rpc::RemoteService,
rpc_requests,
util::{make_client_endpoint, make_server_endpoint},
Client, LocalSender, Service, WithChannels,
Client, WithChannels,
};
// Import the macro
use n0_future::task::{self, AbortOnDropHandle};
use serde::{Deserialize, Serialize};
use tracing::info;

/// A simple storage service, just to try it out
#[derive(Debug, Clone, Copy)]
struct StorageService;

impl Service for StorageService {}

#[derive(Debug, Serialize, Deserialize)]
struct Get {
key: String,
Expand All @@ -48,8 +41,8 @@ struct SetMany;

// Use the macro to generate both the StorageProtocol and StorageMessage enums
// plus implement Channels for each type
#[rpc_requests(StorageService, message = StorageMessage)]
#[derive(Serialize, Deserialize)]
#[rpc_requests(message = StorageMessage)]
#[derive(Serialize, Deserialize, Debug)]
enum StorageProtocol {
#[rpc(tx=oneshot::Sender<Option<String>>)]
Get(Get),
Expand All @@ -74,9 +67,8 @@ impl StorageActor {
state: BTreeMap::new(),
};
n0_future::task::spawn(actor.run());
let local = LocalSender::<StorageMessage, StorageService>::from(tx);
StorageApi {
inner: local.into(),
inner: Client::local(tx),
}
}

Expand Down Expand Up @@ -123,7 +115,7 @@ impl StorageActor {
}

struct StorageApi {
inner: Client<StorageMessage, StorageProtocol, StorageService>,
inner: Client<StorageProtocol>,
}

impl StorageApi {
Expand All @@ -134,17 +126,14 @@ impl StorageApi {
}

pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
let local = self.inner.local().context("cannot listen on remote API")?;
let handler: Handler<StorageProtocol> = Arc::new(move |msg, rx, tx| {
let local = local.clone();
Box::pin(match msg {
StorageProtocol::Get(msg) => local.send((msg, tx)),
StorageProtocol::Set(msg) => local.send((msg, tx)),
StorageProtocol::SetMany(msg) => local.send((msg, tx, rx)),
StorageProtocol::List(msg) => local.send((msg, tx)),
})
});
let join_handle = task::spawn(irpc::rpc::listen(endpoint, handler));
let local = self
.inner
.as_local()
.context("cannot listen on remote API")?;
let join_handle = task::spawn(irpc::rpc::listen(
endpoint,
StorageProtocol::remote_handler(local),
));
Ok(AbortOnDropHandle::new(join_handle))
}

Expand Down
105 changes: 105 additions & 0 deletions examples/local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//! This demonstrates using irpc with the derive macro but without the rpc feature
//! for local-only use. Run with:
//! ```
//! cargo run --example local --no-default-features --features derive
//! ```

use std::collections::BTreeMap;

use irpc::{channel::oneshot, rpc_requests, Client, WithChannels};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct Get {
key: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct List;

#[derive(Debug, Serialize, Deserialize)]
struct Set {
key: String,
value: String,
}

impl From<(String, String)> for Set {
fn from((key, value): (String, String)) -> Self {
Self { key, value }
}
}

#[derive(Debug, Serialize, Deserialize)]
struct SetMany;

#[rpc_requests(message = StorageMessage, no_rpc, no_spans)]
#[derive(Serialize, Deserialize, Debug)]
enum StorageProtocol {
#[rpc(tx=oneshot::Sender<Option<String>>)]
Get(Get),
#[rpc(tx=oneshot::Sender<()>)]
Set(Set),
}

struct Actor {
recv: tokio::sync::mpsc::Receiver<StorageMessage>,
state: BTreeMap<String, String>,
}

impl Actor {
async fn run(mut self) {
while let Some(msg) = self.recv.recv().await {
self.handle(msg).await;
}
}

async fn handle(&mut self, msg: StorageMessage) {
match msg {
StorageMessage::Get(get) => {
let WithChannels { tx, inner, .. } = get;
tx.send(self.state.get(&inner.key).cloned()).await.ok();
}
StorageMessage::Set(set) => {
let WithChannels { tx, inner, .. } = set;
self.state.insert(inner.key, inner.value);
tx.send(()).await.ok();
}
}
}
}

struct StorageApi {
inner: Client<StorageProtocol>,
}

impl StorageApi {
pub fn spawn() -> StorageApi {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let actor = Actor {
recv: rx,
state: BTreeMap::new(),
};
n0_future::task::spawn(actor.run());
StorageApi {
inner: Client::local(tx),
}
}

pub async fn get(&self, key: String) -> irpc::Result<Option<String>> {
self.inner.rpc(Get { key }).await
}

pub async fn set(&self, key: String, value: String) -> irpc::Result<()> {
self.inner.rpc(Set { key, value }).await
}
}

#[tokio::main]
async fn main() -> irpc::Result<()> {
tracing_subscriber::fmt::init();
let api = StorageApi::spawn();
api.set("hello".to_string(), "world".to_string()).await?;
let value = api.get("hello".to_string()).await?;
println!("get: hello = {value:?}");
Ok(())
}
Loading
Loading