Skip to content

Commit acb197a

Browse files
authored
fix in current rust version (rustc 1.86.0) (#7)
* update bindgen && ucx version to 1.18 * fix context * fix MaybeInit to pass test in Release mode for input params : MaybeUninit::uninit().assume_init() -> MaybeUninit::zeroed().assume_init() for output: use explicit type MaybeUninit::<T>::uninit() make sure no remain MaybeUninit::uninit() causes UB * fix rma example * add doc * fmt * fix example bench * use AmProto::Rndv in AM. otherwise it will crash * format
1 parent d03836a commit acb197a

File tree

15 files changed

+82
-63
lines changed

15 files changed

+82
-63
lines changed

examples/bench-multi-thread.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ async fn main() -> Result<()> {
1717
} else {
1818
local.run_until(server()).await;
1919
}
20-
Ok(())
2120
}
2221

2322
async fn client(server_addr: String) -> ! {
@@ -72,7 +71,7 @@ async fn server() -> ! {
7271
loop {
7372
ep.worker().tag_recv(tag, &mut buf).await.unwrap();
7473
// ep.tag_send(tag, &[0]).await;
75-
unsafe { *(&*counter as *const AtomicUsize as *mut usize) += 1 };
74+
counter.fetch_add(1, Ordering::Relaxed);
7675
}
7776
});
7877
});

examples/bench.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ async fn main() -> Result<()> {
1212
} else {
1313
local.run_until(server()).await;
1414
}
15-
Ok(())
1615
}
1716

1817
async fn client(server_addr: String) -> ! {

examples/rma.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ async fn client(server_addr: String) -> Result<()> {
1818
println!("client: connect to {:?}", server_addr);
1919
let context = Context::new().unwrap();
2020
let worker = context.create_worker().unwrap();
21-
let endpoint = worker
22-
.connect_socket(server_addr.parse().unwrap())
23-
.await
24-
.unwrap();
25-
endpoint.print_to_stderr();
2621
#[cfg(not(feature = "event"))]
2722
tokio::task::spawn_local(worker.clone().polling());
2823
#[cfg(feature = "event")]
2924
tokio::task::spawn_local(worker.clone().event_poll());
3025

26+
let endpoint = worker
27+
.connect_socket(server_addr.parse().unwrap())
28+
.await
29+
.unwrap();
30+
endpoint.print_to_stderr();
3131
// register memory region
3232
let mut buf: Vec<u8> = (0..0x1000).map(|x| x as u8).collect();
3333
let mem = MemoryHandle::register(&context, &mut buf);
@@ -62,13 +62,13 @@ async fn server() -> Result<()> {
6262
println!("accept");
6363
endpoint.print_to_stderr();
6464

65-
let mut vaddr_buf = [MaybeUninit::uninit(); 8];
65+
let mut vaddr_buf = [MaybeUninit::<u8>::uninit(); 8];
6666
let len = endpoint.stream_recv(&mut vaddr_buf).await.unwrap();
6767
assert_eq!(len, 8);
6868
let vaddr = u64::from_ne_bytes(unsafe { transmute(vaddr_buf) });
6969
println!("recv: vaddr={:#x}", vaddr);
7070

71-
let mut rkey_buf = [MaybeUninit::uninit(); 100];
71+
let mut rkey_buf = [MaybeUninit::<u8>::uninit(); 100];
7272
let len = endpoint.stream_recv(&mut rkey_buf).await.unwrap();
7373
println!("recv rkey: len={}", len);
7474

examples/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async fn server() -> Result<()> {
5353
println!("accept");
5454
endpoint.print_to_stderr();
5555

56-
let mut buf = [MaybeUninit::uninit(); 10];
56+
let mut buf = [MaybeUninit::<u8>::uninit(); 10];
5757
let len = endpoint.stream_recv(&mut buf).await.unwrap();
5858
let msg = std::str::from_utf8(unsafe { transmute(&buf[..len]) });
5959
println!("recv: {:?}", msg);

examples/tag.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ async fn server() -> Result<()> {
5555
let _endpoint = worker.accept(connection).await.unwrap();
5656
println!("accept");
5757

58-
let mut buf = [MaybeUninit::uninit(); 0x1005];
58+
let mut buf = [MaybeUninit::<u8>::uninit(); 0x1005];
5959
let len = worker.tag_recv(100, &mut buf).await.unwrap();
6060
let msg = std::str::from_utf8(unsafe { transmute(&buf[..len]) }).unwrap();
6161
println!("recv: {:?}", msg);

src/ucp/endpoint/am.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,20 @@ use std::{
88
sync::atomic::AtomicBool,
99
};
1010

11+
//// Active message protocol.
12+
/// Active message protocol is a mechanism for sending and receiving messages
13+
/// between processes in a distributed system.
14+
/// It allows a process to send a message to another process, which can then
15+
/// handle the message and perform some action based on its contents.
16+
/// Active messages are typically used in high-performance computing (HPC)
17+
/// applications, where low-latency communication is critical.
1118
#[derive(Debug, PartialEq, Eq)]
1219
pub enum AmDataType {
20+
/// Eager message
1321
Eager,
22+
/// Data message
1423
Data,
24+
/// Rendezvous message
1525
Rndv,
1626
}
1727

@@ -88,6 +98,7 @@ impl RawMsg {
8898
}
8999
}
90100

101+
/// Active message message.
91102
pub struct AmMsg<'a> {
92103
worker: &'a Worker,
93104
msg: RawMsg,
@@ -98,35 +109,44 @@ impl<'a> AmMsg<'a> {
98109
AmMsg { worker, msg }
99110
}
100111

112+
/// Get the message ID.
101113
#[inline]
102114
pub fn id(&self) -> u16 {
103115
self.msg.id
104116
}
105117

118+
/// Get the message header.
106119
#[inline]
107120
pub fn header(&self) -> &[u8] {
108121
self.msg.header.as_ref()
109122
}
110123

124+
/// Get the message header length.
111125
#[inline]
112126
pub fn contains_data(&self) -> bool {
113127
self.data_type().is_some()
114128
}
115129

130+
/// Get the message data type.
116131
pub fn data_type(&self) -> Option<AmDataType> {
117132
self.msg.data.as_ref().map(|data| data.data_type())
118133
}
119134

135+
/// Get the message data.
136+
/// Returns `None` if the message doesn't contain data.
120137
#[inline]
121138
pub fn get_data(&self) -> Option<&[u8]> {
122139
self.msg.data.as_ref().and_then(|data| data.data())
123140
}
124141

142+
/// Get the message data length.
143+
/// Returns `0` if the message doesn't contain data.
125144
#[inline]
126145
pub fn data_len(&self) -> usize {
127146
self.msg.data.as_ref().map_or(0, |data| data.len())
128147
}
129148

149+
/// Receive the message data.
130150
pub async fn recv_data(&mut self) -> Result<Vec<u8>, Error> {
131151
match self.msg.data.take() {
132152
None => Ok(Vec::new()),
@@ -144,6 +164,12 @@ impl<'a> AmMsg<'a> {
144164
}
145165
}
146166

167+
/// Receive the message data.
168+
/// Returns `0` if the message doesn't contain data.
169+
/// Returns the number of bytes received.
170+
/// # Safety
171+
/// User needs to ensure that the buffer is large enough to hold the data.
172+
/// Otherwise, it will cause memory corruption.
147173
pub async fn recv_data_single(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
148174
if !self.contains_data() {
149175
Ok(0)
@@ -153,6 +179,7 @@ impl<'a> AmMsg<'a> {
153179
}
154180
}
155181

182+
/// Receive the message data.
156183
pub async fn recv_data_vectored(&mut self, iov: &[IoSliceMut<'_>]) -> Result<usize, Error> {
157184
let data = self.msg.data.take();
158185
if let Some(data) = data {
@@ -192,7 +219,7 @@ impl<'a> AmMsg<'a> {
192219
unsafe extern "C" fn callback(
193220
request: *mut c_void,
194221
status: ucs_status_t,
195-
_length: u64,
222+
_length: usize,
196223
_data: *mut c_void,
197224
) {
198225
// todo: handle error & fix real data length
@@ -255,6 +282,7 @@ impl<'a> AmMsg<'a> {
255282
}
256283
}
257284

285+
/// Check if the message needs a reply.
258286
#[inline]
259287
pub fn need_reply(&self) -> bool {
260288
self.msg.attr & (ucp_am_recv_attr_t::UCP_AM_RECV_ATTR_FIELD_REPLY_EP as u64) != 0
@@ -309,6 +337,7 @@ impl<'a> Drop for AmMsg<'a> {
309337
}
310338
}
311339

340+
/// Active message stream.
312341
#[derive(Clone)]
313342
pub struct AmStream<'a> {
314343
worker: &'a Worker,
@@ -383,9 +412,9 @@ impl Worker {
383412
unsafe extern "C" fn callback(
384413
arg: *mut c_void,
385414
header: *const c_void,
386-
header_len: u64,
415+
header_len: usize,
387416
data: *mut c_void,
388-
data_len: u64,
417+
data_len: usize,
389418
param: *const ucp_am_recv_param_t,
390419
) -> ucs_status_t {
391420
let handler = &*(arg as *const AmStreamInner);
@@ -442,7 +471,9 @@ impl Worker {
442471
}
443472
}
444473

474+
/// Active message endpoint.
445475
impl Endpoint {
476+
/// Send active message.
446477
pub async fn am_send(
447478
&self,
448479
id: u32,
@@ -456,6 +487,7 @@ impl Endpoint {
456487
.await
457488
}
458489

490+
/// Send active message.
459491
pub async fn am_send_vectorized(
460492
&self,
461493
id: u32,
@@ -469,8 +501,11 @@ impl Endpoint {
469501
}
470502
}
471503

504+
/// Active message protocol
472505
pub enum AmProto {
506+
/// Eager protocol
473507
Eager,
508+
/// Rendezvous protocol
474509
Rndv,
475510
}
476511

@@ -601,7 +636,7 @@ mod tests {
601636
header.as_slice(),
602637
data.as_slice(),
603638
true,
604-
Some(AmProto::Eager),
639+
Some(AmProto::Rndv),
605640
)
606641
.await;
607642
assert!(result.is_ok());
@@ -627,7 +662,10 @@ mod tests {
627662
tokio::join!(
628663
async {
629664
// send reply
630-
let result = unsafe { msg.reply(12, &header, &data, false, None).await };
665+
let result = unsafe {
666+
msg.reply(12, &header, &data, false, Some(AmProto::Rndv))
667+
.await
668+
};
631669
assert!(result.is_ok());
632670
},
633671
async {

src/ucp/endpoint/mod.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ mod tag;
1616
#[cfg(feature = "am")]
1717
pub use self::am::*;
1818
pub use self::rma::*;
19-
pub use self::stream::*;
20-
pub use self::tag::*;
2119

2220
// State associate with ucp_ep_h
2321
// todo: Add a `get_user_data` to UCX
@@ -111,7 +109,7 @@ impl Endpoint {
111109
arg: std::ptr::null_mut(), // override by user_data
112110
};
113111

114-
let mut handle = MaybeUninit::uninit();
112+
let mut handle = MaybeUninit::<*mut ucp_ep>::uninit();
115113
let status = unsafe { ucp_ep_create(worker.handle, &params, handle.as_mut_ptr()) };
116114
if let Err(err) = Error::from_status(status) {
117115
// error happened, drop reference
@@ -142,7 +140,7 @@ impl Endpoint {
142140
addrlen: sockaddr.len(),
143141
},
144142
err_mode: ucp_err_handling_mode_t::UCP_ERR_HANDLING_MODE_PEER,
145-
..unsafe { MaybeUninit::uninit().assume_init() }
143+
..unsafe { MaybeUninit::zeroed().assume_init() }
146144
};
147145
let endpoint = Endpoint::create(worker, params)?;
148146

@@ -157,15 +155,13 @@ impl Endpoint {
157155
worker: &Rc<Worker>,
158156
addr: *const ucp_address_t,
159157
) -> Result<Self, Error> {
160-
#[allow(invalid_value)]
161-
#[allow(clippy::uninit_assumed_init)]
162158
let params = ucp_ep_params {
163159
field_mask: (ucp_ep_params_field::UCP_EP_PARAM_FIELD_REMOTE_ADDRESS
164160
| ucp_ep_params_field::UCP_EP_PARAM_FIELD_ERR_HANDLING_MODE)
165161
.0 as u64,
166162
address: addr,
167163
err_mode: ucp_err_handling_mode_t::UCP_ERR_HANDLING_MODE_PEER,
168-
..unsafe { MaybeUninit::uninit().assume_init() }
164+
..unsafe { MaybeUninit::zeroed().assume_init() }
169165
};
170166
Endpoint::create(worker, params)
171167
}
@@ -174,12 +170,10 @@ impl Endpoint {
174170
worker: &Rc<Worker>,
175171
connection: ConnectionRequest,
176172
) -> Result<Self, Error> {
177-
#[allow(invalid_value)]
178-
#[allow(clippy::uninit_assumed_init)]
179173
let params = ucp_ep_params {
180174
field_mask: ucp_ep_params_field::UCP_EP_PARAM_FIELD_CONN_REQUEST.0 as u64,
181175
conn_request: connection.handle,
182-
..unsafe { MaybeUninit::uninit().assume_init() }
176+
..unsafe { MaybeUninit::zeroed().assume_init() }
183177
};
184178
let endpoint = Endpoint::create(worker, params)?;
185179

src/ucp/endpoint/rma.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ impl MemoryHandle {
1919
.0 as u64,
2020
address: region.as_ptr() as _,
2121
length: region.len() as _,
22-
..unsafe { MaybeUninit::uninit().assume_init() }
22+
..unsafe { MaybeUninit::zeroed().assume_init() }
2323
};
24-
let mut handle = MaybeUninit::uninit();
24+
let mut handle = MaybeUninit::<*mut ucp_mem>::uninit();
2525
let status = unsafe { ucp_mem_map(context.handle, &params, handle.as_mut_ptr()) };
2626
assert_eq!(status, ucs_status_t::UCS_OK);
2727
MemoryHandle {
@@ -32,8 +32,8 @@ impl MemoryHandle {
3232

3333
/// Packs into the buffer a remote access key (RKEY) object.
3434
pub fn pack(&self) -> RKeyBuffer {
35-
let mut buf = MaybeUninit::uninit();
36-
let mut len = MaybeUninit::uninit();
35+
let mut buf = MaybeUninit::<*mut c_void>::uninit();
36+
let mut len = MaybeUninit::<usize>::uninit();
3737
let status = unsafe {
3838
ucp_rkey_pack(
3939
self.context.handle,
@@ -60,7 +60,7 @@ impl Drop for MemoryHandle {
6060
#[derive(Debug)]
6161
pub struct RKeyBuffer {
6262
buf: *mut c_void,
63-
len: u64,
63+
len: usize,
6464
}
6565

6666
impl AsRef<[u8]> for RKeyBuffer {
@@ -87,7 +87,7 @@ unsafe impl Sync for RKey {}
8787
impl RKey {
8888
/// Create remote access key from packed buffer.
8989
pub fn unpack(endpoint: &Endpoint, rkey_buffer: &[u8]) -> Self {
90-
let mut handle = MaybeUninit::uninit();
90+
let mut handle = MaybeUninit::<*mut ucp_rkey>::uninit();
9191
let status = unsafe {
9292
ucp_ep_rkey_unpack(
9393
endpoint.handle,

src/ucp/endpoint/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl Endpoint {
4040
/// Receives data from stream.
4141
pub async fn stream_recv(&self, buf: &mut [MaybeUninit<u8>]) -> Result<usize, Error> {
4242
trace!("stream_recv: endpoint={:?} len={}", self.handle, buf.len());
43-
unsafe extern "C" fn callback(request: *mut c_void, status: ucs_status_t, length: u64) {
43+
unsafe extern "C" fn callback(request: *mut c_void, status: ucs_status_t, length: usize) {
4444
trace!(
4545
"stream_recv: complete. req={:?}, status={:?}, len={}",
4646
request,
@@ -50,7 +50,7 @@ impl Endpoint {
5050
let request = &mut *(request as *mut Request);
5151
request.waker.wake();
5252
}
53-
let mut length = MaybeUninit::uninit();
53+
let mut length = MaybeUninit::<usize>::uninit();
5454
let status = unsafe {
5555
ucp_stream_recv_nb(
5656
self.get_handle()?,

src/ucp/endpoint/tag.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ mod tests {
238238
},
239239
async {
240240
// recv
241-
let mut buf = vec![MaybeUninit::uninit(); msg_size];
241+
let mut buf = vec![MaybeUninit::<u8>::uninit(); msg_size];
242242
worker1.tag_recv(1, &mut buf).await.unwrap();
243243
println!("tag recved");
244244
}

0 commit comments

Comments
 (0)