Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f2a4bf5
connect_add_rvec
May 12, 2025
1b56b62
disable build ucx
May 13, 2025
1f1f68a
tag use send/recv_nbx
chnlkw Jun 27, 2025
c9bce46
use RequestParam
chnlkw Jun 27, 2025
fa298d3
stream use nbx and Param
chnlkw Jun 27, 2025
8940e9b
rma use nbx
chnlkw Jun 27, 2025
cdf6c85
add author Li Kaiwei
chnlkw Jun 27, 2025
2808d49
update ucx to 1.18.1
chnlkw Jun 27, 2025
1a64dec
little fix
chnlkw Jun 27, 2025
6a3216a
rename send_cb -> cb_send
chnlkw Jun 27, 2025
67691ca
fmt
chnlkw Jun 27, 2025
5b39369
poll fn are safe
chnlkw Jun 27, 2025
fcad8aa
add stream Extension for AsyncRead and AsyncWrite
chnlkw Jun 28, 2025
8f48e8d
add more test
chnlkw Jun 28, 2025
ad6b300
to io error
chnlkw Jun 29, 2025
1dbbc3e
add utill.rs with feature
chnlkw Jun 29, 2025
bed08b8
Update endpoint tag and util modules
chnlkw Jun 29, 2025
5abf8d4
update readme and changlog
chnlkw Jun 29, 2025
2150e47
fix remote println!
chnlkw Jun 29, 2025
35ce5c4
cargo fmt
chnlkw Jun 29, 2025
09b79fd
feat: improve tag receive implementation with ucp_tag_recv_info
chnlkw Jun 29, 2025
59a0311
impl poll_flush and poll_shutdown
chnlkw Jul 3, 2025
a1fc9b9
endpoint handler
Jul 15, 2025
965c7e8
add reply_ep
Jul 15, 2025
3263c6e
am id is u16
Jul 16, 2025
0107ef0
fix TagWriteStream
chnlkw Jul 21, 2025
5ac7cd1
fix WriteStream
chnlkw Jul 21, 2025
c8ec537
Create rust.yml
chnlkw Sep 24, 2025
62f4afa
cargo clippy --fix
Sep 24, 2025
50f1487
cargo fix
Sep 24, 2025
5b099db
Merge branch 'dev' into v0.2
Sep 24, 2025
b80b161
feat(ucx1-sys): bump version to 0.2.0
Sep 24, 2025
efde911
Merge branch 'dev' into v0.2
Sep 24, 2025
146a73b
fix testing on rust 1.90.0
Sep 25, 2025
60726a1
build(deps): add alloc feature to crossbeam dependency
Sep 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.vscode/

/target
Cargo.lock
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.2.0] - 2025-06-29

### Added

- Added AsyncRead/AsyncWrite support for Tag and Stream (requires `utils` feature flag)
- Added `connect_addr_vec` function to Worker

### Changed

- Updated to UCX 1.18 with latest API compatibility
- Updated multiple dependency versions
- Migrated to Rust 2021 edition

### Fixed

- Fixed various bugs and issues

## [0.1.1] - 2022-09-01

### Changed
Expand Down
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async-ucx"
version = "0.1.1"
authors = ["Runji Wang <[email protected]>", "Yiyuan Liu <[email protected]>"]
version = "0.2.0"
authors = ["Runji Wang <[email protected]>", "Yiyuan Liu <[email protected]>", "Kaiwei Li <[email protected]>"]
edition = "2021"
description = "Asynchronous Rust bindings to UCX."
homepage = "https://github.com/madsys-dev/async-ucx"
Expand All @@ -15,6 +15,7 @@ categories = ["asynchronous", "api-bindings", "network-programming"]
[features]
event = ["tokio"]
am = ["tokio/sync", "crossbeam"]
util = ["tokio"]

[dependencies]
ucx1-sys = { version = "0.1", path = "ucx1-sys" }
Expand All @@ -27,9 +28,10 @@ tokio = { version = "1.0", features = ["net"], optional = true }
crossbeam = { version = "0.8", optional = true }
derivative = "2.2.0"
thiserror = "1.0"
pin-project = "1.1.10"

[dev-dependencies]
tokio = { version = "1.0", features = ["rt", "time", "macros", "sync"] }
tokio = { version = "1.0", features = ["rt", "time", "macros", "sync", "io-util"] }
env_logger = "0.9"
tracing = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.2.17", default-features = false, features = ["env-filter", "fmt"] }
Expand Down
88 changes: 85 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,94 @@
[![Docs](https://docs.rs/async-ucx/badge.svg)](https://docs.rs/async-ucx)
[![CI](https://github.com/madsys-dev/async-ucx/workflows/CI/badge.svg?branch=main)](https://github.com/madsys-dev/async-ucx/actions)

Async Rust UCX bindings.
Async Rust UCX bindings providing high-performance networking capabilities for distributed systems and HPC applications.

## Features

- **Asynchronous UCP Operations**: Full async/await support for UCX operations
- **Multiple Communication Models**: Support for RMA, Stream, Tag, and Active Message APIs
- **High Performance**: Optimized for low-latency, high-throughput communication
- **Tokio Integration**: Seamless integration with Tokio async runtime
- **Comprehensive Examples**: Ready-to-use examples for various UCX patterns

## Optional features

- `event`: Enable UCP wakeup mechanism.
- `am`: Enable UCP Active Message API.
- `event`: Enable UCP wakeup mechanism for event-driven applications
- `am`: Enable UCP Active Message API for flexible message handling
- `util`: Enable additional utility functions for UCX integration

## Quick Start

Add to your `Cargo.toml`:

```toml
[dependencies]
async-ucx = "0.2"
tokio = { version = "1.0", features = ["rt", "net"] }
```

Basic usage example:

```rust
use async_ucx::ucp::*;
use std::mem::MaybeUninit;
use std::net::SocketAddr;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create UCP contexts and workers
let context1 = Context::new()?;
let worker1 = context1.create_worker()?;
let context2 = Context::new()?;
let worker2 = context2.create_worker()?;

// Start polling for both workers
tokio::task::spawn_local(worker1.clone().polling());
tokio::task::spawn_local(worker2.clone().polling());

// Create listener on worker1
let mut listener = worker1
.create_listener("0.0.0.0:0".parse().unwrap())?;
let listen_port = listener.socket_addr()?.port();

// Connect worker2 to worker1
let mut addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
addr.set_port(listen_port);

let (endpoint1, endpoint2) = tokio::join!(
async {
let conn1 = listener.next().await;
worker1.accept(conn1).await.unwrap()
},
async { worker2.connect_socket(addr).await.unwrap() },
);

// Send and receive tag message
tokio::join!(
async {
let msg = b"Hello UCX!";
endpoint2.tag_send(1, msg).await.unwrap();
println!("Message sent");
},
async {
let mut buf = vec![MaybeUninit::<u8>::uninit(); 10];
worker1.tag_recv(1, &mut buf).await.unwrap();
println!("Message received");
}
);

Ok(())
}
```

## Examples

Check the `examples/` directory for comprehensive examples:
- `rma.rs`: Remote Memory Access operations
- `stream.rs`: Stream-based communication
- `tag.rs`: Tag-based message matching
- `bench.rs`: Performance benchmarking
- `bench-multi-thread.rs`: Multi-threaded benchmarking

## License

Expand Down
2 changes: 1 addition & 1 deletion examples/bench-multi-thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl WorkerThread {
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
#[cfg(not(event))]
#[cfg(not(feature = "event"))]
local.spawn_local(worker.clone().polling());
#[cfg(feature = "event")]
local.spawn_local(worker.clone().event_poll());
Expand Down
41 changes: 41 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,44 @@ impl Error {
}
}
}

impl Into<std::io::Error> for Error {
fn into(self) -> std::io::Error {
use std::io::ErrorKind::*;
let kind = match self {
Error::Inprogress => WouldBlock,
Error::NoMessage => WouldBlock,
Error::NoReource => WouldBlock,
Error::IoError => Other,
Error::NoMemory => OutOfMemory,
Error::InvalidParam => InvalidInput,
Error::Unreachable => NotConnected,
Error::InvalidAddr => InvalidInput,
Error::NotImplemented => Unsupported,
Error::MessageTruncated => InvalidData,
Error::NoProgress => WouldBlock,
Error::BufferTooSmall => UnexpectedEof,
Error::NoElem => NotFound,
Error::SomeConnectsFailed => ConnectionAborted,
Error::NoDevice => NotFound,
Error::Busy => ResourceBusy,
Error::Canceled => Interrupted,
Error::ShmemSegment => Other,
Error::AlreadyExists => AlreadyExists,
Error::OutOfRange => InvalidInput,
Error::Timeout => TimedOut,
Error::ExceedsLimit => Other,
Error::Unsupported => Unsupported,
Error::Rejected => ConnectionRefused,
Error::NotConnected => NotConnected,
Error::ConnectionReset => ConnectionReset,
Error::FirstLinkFailure => Other,
Error::LastLinkFailure => Other,
Error::FirstEndpointFailure => Other,
Error::LastEndpointFailure => Other,
Error::EndpointTimeout => TimedOut,
Error::Unknown => Other,
};
std::io::Error::new(kind, self)
}
}
84 changes: 27 additions & 57 deletions src/ucp/endpoint/am.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crossbeam::queue::SegQueue;
use tokio::sync::Notify;

use super::param::RequestParam;
use super::*;
use std::{
io::{IoSlice, IoSliceMut},
Expand Down Expand Up @@ -249,22 +250,12 @@ impl<'a> AmMsg<'a> {
self.worker.handle,
iov.len()
);
let mut param = MaybeUninit::<ucp_request_param_t>::uninit();
let (buffer, count) = unsafe {
let param = &mut *param.as_mut_ptr();
param.op_attr_mask = ucp_op_attr_t::UCP_OP_ATTR_FIELD_CALLBACK as u32
| ucp_op_attr_t::UCP_OP_ATTR_FIELD_DATATYPE as u32;
param.cb = ucp_request_param_t__bindgen_ty_1 {
recv_am: Some(callback),
};

if iov.len() == 1 {
param.datatype = ucp_dt_make_contig(1);
(iov[0].as_ptr(), iov[0].len())
} else {
param.datatype = ucp_dt_type::UCP_DATATYPE_IOV as _;
(iov.as_ptr() as _, iov.len())
}

let param = RequestParam::new().cb_recv_am(Some(callback));
let (buffer, count, param) = if iov.len() == 1 {
(iov[0].as_ptr(), iov[0].len(), param)
} else {
(iov.as_ptr() as _, iov.len(), param.iov())
};

let status = unsafe {
Expand All @@ -273,7 +264,7 @@ impl<'a> AmMsg<'a> {
data_desc as _,
buffer as _,
count as _,
param.as_ptr(),
param.as_ref(),
)
};
if status.is_null() {
Expand All @@ -282,9 +273,9 @@ impl<'a> AmMsg<'a> {
} else if UCS_PTR_IS_PTR(status) {
RequestHandle {
ptr: status,
poll_fn: poll_recv,
poll_fn: poll_normal,
}
.await;
.await?;
Ok(data_len)
} else {
Err(Error::from_ptr(status).unwrap_err())
Expand Down Expand Up @@ -546,34 +537,22 @@ async fn am_send(
request.waker.wake();
}

let mut param = MaybeUninit::<ucp_request_param_t>::uninit();
let (buffer, count) = unsafe {
let param = &mut *param.as_mut_ptr();
param.op_attr_mask = ucp_op_attr_t::UCP_OP_ATTR_FIELD_CALLBACK as u32
| ucp_op_attr_t::UCP_OP_ATTR_FIELD_DATATYPE as u32
| ucp_op_attr_t::UCP_OP_ATTR_FIELD_FLAGS as u32;
param.flags = 0;
param.cb = ucp_request_param_t__bindgen_ty_1 {
send: Some(callback),
};

match proto {
Some(AmProto::Eager) => param.flags |= ucp_send_am_flags::UCP_AM_SEND_FLAG_EAGER.0,
Some(AmProto::Rndv) => param.flags |= ucp_send_am_flags::UCP_AM_SEND_FLAG_RNDV.0,
_ => (),
}

if need_reply {
param.flags |= ucp_send_am_flags::UCP_AM_SEND_FLAG_REPLY.0;
}

if data.len() == 1 {
param.datatype = ucp_dt_make_contig(1);
(data[0].as_ptr(), data[0].len())
} else {
param.datatype = ucp_dt_type::UCP_DATATYPE_IOV as _;
(data.as_ptr() as _, data.len())
}
// Use RequestParam builder for send
let param = RequestParam::new().cb_send(Some(callback));
let param = match proto {
Some(AmProto::Eager) => param.set_flag_eager(),
Some(AmProto::Rndv) => param.set_flag_rndv(),
None => param,
};
let param = if need_reply {
param.set_flag_reply()
} else {
param
};
let (buffer, count, param) = if data.len() == 1 {
(data[0].as_ptr(), data[0].len(), param)
} else {
(data.as_ptr() as _, data.len(), param.iov())
};

let status = unsafe {
Expand All @@ -584,7 +563,7 @@ async fn am_send(
header.len() as _,
buffer as _,
count as _,
param.as_mut_ptr(),
param.as_ref(),
)
};
if status.is_null() {
Expand All @@ -601,15 +580,6 @@ async fn am_send(
}
}

unsafe fn poll_recv(ptr: ucs_status_ptr_t) -> Poll<()> {
let status = ucp_request_check_status(ptr as _);
if status == ucs_status_t::UCS_INPROGRESS {
Poll::Pending
} else {
Poll::Ready(())
}
}

#[cfg(test)]
#[cfg(feature = "am")]
mod tests {
Expand Down
Loading
Loading