@@ -55,6 +55,23 @@ unsafe impl<T, const N: usize> Send for Channel<T, N> {}
55
55
56
56
unsafe impl < T , const N : usize > Sync for Channel < T , N > { }
57
57
58
+ macro_rules! cs_access {
59
+ ( $name: ident, $type: ty) => {
60
+ /// Access the value mutably.
61
+ ///
62
+ /// SAFETY: this function must not be called recursively within `f`.
63
+ unsafe fn $name<F , R >( & self , _cs: critical_section:: CriticalSection , f: F ) -> R
64
+ where
65
+ F : FnOnce ( & mut $type) -> R ,
66
+ {
67
+ self . $name. with_mut( |v| {
68
+ let v = unsafe { & mut * v } ;
69
+ f( v)
70
+ } )
71
+ }
72
+ } ;
73
+ }
74
+
58
75
impl < T , const N : usize > Channel < T , N > {
59
76
const _CHECK: ( ) = assert ! ( N < 256 , "This queue support a maximum of 255 entries" ) ;
60
77
@@ -113,45 +130,10 @@ impl<T, const N: usize> Channel<T, N> {
113
130
( Sender ( self ) , Receiver ( self ) )
114
131
}
115
132
116
- fn freeq < F , R > ( & self , _cs : critical_section:: CriticalSection , f : F ) -> R
117
- where
118
- F : FnOnce ( & mut Deque < u8 , N > ) -> R ,
119
- {
120
- self . freeq . with_mut ( |freeq| {
121
- let queue = unsafe { & mut * freeq } ;
122
- f ( queue)
123
- } )
124
- }
125
-
126
- fn readyq < F , R > ( & self , _cs : critical_section:: CriticalSection , f : F ) -> R
127
- where
128
- F : FnOnce ( & mut Deque < u8 , N > ) -> R ,
129
- {
130
- self . readyq . with_mut ( |readyq| {
131
- let queue = unsafe { & mut * readyq } ;
132
- f ( queue)
133
- } )
134
- }
135
-
136
- fn receiver_dropped < F , R > ( & self , _cs : critical_section:: CriticalSection , f : F ) -> R
137
- where
138
- F : FnOnce ( & mut bool ) -> R ,
139
- {
140
- self . receiver_dropped . with_mut ( |receiver_dropped| {
141
- let receiver_dropped = unsafe { & mut * receiver_dropped } ;
142
- f ( receiver_dropped)
143
- } )
144
- }
145
-
146
- fn num_senders < F , R > ( & self , _cs : critical_section:: CriticalSection , f : F ) -> R
147
- where
148
- F : FnOnce ( & mut usize ) -> R ,
149
- {
150
- self . num_senders . with_mut ( |num_senders| {
151
- let num_senders = unsafe { & mut * num_senders } ;
152
- f ( num_senders)
153
- } )
154
- }
133
+ cs_access ! ( freeq, Deque <u8 , N >) ;
134
+ cs_access ! ( readyq, Deque <u8 , N >) ;
135
+ cs_access ! ( receiver_dropped, bool ) ;
136
+ cs_access ! ( num_senders, usize ) ;
155
137
}
156
138
157
139
/// Creates a split channel with `'static` lifetime.R
@@ -276,7 +258,10 @@ impl<T, const N: usize> Sender<'_, T, N> {
276
258
277
259
// Write the value into the ready queue.
278
260
critical_section:: with ( |cs| {
279
- assert ! ( !self . 0 . readyq( cs, |q| q. is_full( ) ) ) ;
261
+ assert ! ( unsafe {
262
+ // SAFETY: the closure does not call `readyq`
263
+ !self . 0 . readyq( cs, |q| q. is_full( ) )
264
+ } ) ;
280
265
unsafe { self . 0 . readyq ( cs, |q| q. push_back_unchecked ( idx) ) }
281
266
} ) ;
282
267
@@ -298,12 +283,14 @@ impl<T, const N: usize> Sender<'_, T, N> {
298
283
return Err ( TrySendError :: NoReceiver ( val) ) ;
299
284
}
300
285
301
- let idx =
302
- if let Some ( idx) = critical_section:: with ( |cs| self . 0 . freeq ( cs, |q| q. pop_front ( ) ) ) {
303
- idx
304
- } else {
305
- return Err ( TrySendError :: Full ( val) ) ;
306
- } ;
286
+ // SAFETY: the closure does not call `freeq`
287
+ let idx = if let Some ( idx) =
288
+ critical_section:: with ( |cs| unsafe { self . 0 . freeq ( cs, |q| q. pop_front ( ) ) } )
289
+ {
290
+ idx
291
+ } else {
292
+ return Err ( TrySendError :: Full ( val) ) ;
293
+ } ;
307
294
308
295
self . send_footer ( idx, val) ;
309
296
@@ -337,7 +324,10 @@ impl<T, const N: usize> Sender<'_, T, N> {
337
324
// Do all this in one critical section, else there can be race conditions
338
325
let queue_idx = critical_section:: with ( |cs| {
339
326
let wq_empty = self . 0 . wait_queue . is_empty ( ) ;
340
- let fq_empty = self . 0 . freeq ( cs, |q| q. is_empty ( ) ) ;
327
+ let fq_empty = unsafe {
328
+ // SAFETY: the closure does not call `freeq`
329
+ self . 0 . freeq ( cs, |q| q. is_empty ( ) )
330
+ } ;
341
331
342
332
if !wq_empty || fq_empty {
343
333
// SAFETY: This pointer is only dereferenced here and on drop of the future
@@ -365,7 +355,11 @@ impl<T, const N: usize> Sender<'_, T, N> {
365
355
}
366
356
}
367
357
368
- assert ! ( !self . 0 . freeq( cs, |q| q. is_empty( ) ) ) ;
358
+ assert ! ( unsafe {
359
+ // SAFETY: the closure does not call `freeq`
360
+ !self . 0 . freeq( cs, |q| q. is_empty( ) )
361
+ } ) ;
362
+
369
363
// Get index as the queue is guaranteed not empty and the wait queue is empty
370
364
let idx = unsafe { self . 0 . freeq ( cs, |q| q. pop_front_unchecked ( ) ) } ;
371
365
@@ -395,28 +389,40 @@ impl<T, const N: usize> Sender<'_, T, N> {
395
389
396
390
/// Returns true if there is no `Receiver`s.
397
391
pub fn is_closed ( & self ) -> bool {
398
- critical_section:: with ( |cs| self . 0 . receiver_dropped ( cs, |v| * v) )
392
+ critical_section:: with ( |cs| unsafe {
393
+ // SAFETY: the closure does not call `receiver_dropped`
394
+ self . 0 . receiver_dropped ( cs, |v| * v)
395
+ } )
399
396
}
400
397
401
398
/// Is the queue full.
402
399
pub fn is_full ( & self ) -> bool {
403
- critical_section:: with ( |cs| self . 0 . freeq ( cs, |q| q. is_empty ( ) ) )
400
+ critical_section:: with ( |cs| unsafe {
401
+ // SAFETY: the closure does not call `freeq`
402
+ self . 0 . freeq ( cs, |q| q. is_empty ( ) )
403
+ } )
404
404
}
405
405
406
406
/// Is the queue empty.
407
407
pub fn is_empty ( & self ) -> bool {
408
- critical_section:: with ( |cs| self . 0 . freeq ( cs, |q| q. is_full ( ) ) )
408
+ critical_section:: with ( |cs| unsafe {
409
+ // SAFETY: the closure does not call `freeq`
410
+ self . 0 . freeq ( cs, |q| q. is_full ( ) )
411
+ } )
409
412
}
410
413
}
411
414
412
415
impl < T , const N : usize > Drop for Sender < ' _ , T , N > {
413
416
fn drop ( & mut self ) {
414
417
// Count down the reference counter
415
418
let num_senders = critical_section:: with ( |cs| {
416
- self . 0 . num_senders ( cs, |v| {
417
- * v -= 1 ;
418
- * v
419
- } )
419
+ unsafe {
420
+ // SAFETY: the closure does not call `num_senders`
421
+ self . 0 . num_senders ( cs, |v| {
422
+ * v -= 1 ;
423
+ * v
424
+ } )
425
+ }
420
426
} ) ;
421
427
422
428
// If there are no senders, wake the receiver to do error handling.
@@ -429,7 +435,10 @@ impl<T, const N: usize> Drop for Sender<'_, T, N> {
429
435
impl < T , const N : usize > Clone for Sender < ' _ , T , N > {
430
436
fn clone ( & self ) -> Self {
431
437
// Count up the reference counter
432
- critical_section:: with ( |cs| self . 0 . num_senders ( cs, |v| * v += 1 ) ) ;
438
+ critical_section:: with ( |cs| unsafe {
439
+ // SAFETY: the closure does not call `num_senders`
440
+ self . 0 . num_senders ( cs, |v| * v += 1 )
441
+ } ) ;
433
442
434
443
Self ( self . 0 )
435
444
}
@@ -469,7 +478,12 @@ impl<T, const N: usize> Receiver<'_, T, N> {
469
478
/// Receives a value if there is one in the channel, non-blocking.
470
479
pub fn try_recv ( & mut self ) -> Result < T , ReceiveError > {
471
480
// Try to get a ready slot.
472
- let ready_slot = critical_section:: with ( |cs| self . 0 . readyq ( cs, |q| q. pop_front ( ) ) ) ;
481
+ let ready_slot = critical_section:: with ( |cs| {
482
+ unsafe {
483
+ // SAFETY: the closure does not call `readyq`.
484
+ self . 0 . readyq ( cs, |q| q. pop_front ( ) )
485
+ }
486
+ } ) ;
473
487
474
488
if let Some ( rs) = ready_slot {
475
489
// Read the value from the slots, note; this memcpy is not under a critical section.
@@ -480,10 +494,14 @@ impl<T, const N: usize> Receiver<'_, T, N> {
480
494
481
495
// Return the index to the free queue after we've read the value.
482
496
critical_section:: with ( |cs| {
483
- self . 0 . freeq ( cs, |freeq| {
484
- assert ! ( !freeq. is_full( ) ) ;
485
- unsafe { freeq. push_back_unchecked ( rs) }
486
- } ) ;
497
+ unsafe {
498
+ // SAFETY: the closure does not call `freeq`
499
+ self . 0 . freeq ( cs, |freeq| {
500
+ assert ! ( !freeq. is_full( ) ) ;
501
+ // SAFETY: `freeq` is not full.
502
+ freeq. push_back_unchecked ( rs)
503
+ } ) ;
504
+ }
487
505
488
506
fence ( Ordering :: SeqCst ) ;
489
507
@@ -528,24 +546,44 @@ impl<T, const N: usize> Receiver<'_, T, N> {
528
546
529
547
/// Returns true if there are no `Sender`s.
530
548
pub fn is_closed ( & self ) -> bool {
531
- critical_section:: with ( |cs| self . 0 . num_senders ( cs, |v| * v == 0 ) )
549
+ critical_section:: with ( |cs| {
550
+ unsafe {
551
+ // SAFETY: the closure does not call `num_senders`
552
+ self . 0 . num_senders ( cs, |v| * v == 0 )
553
+ }
554
+ } )
532
555
}
533
556
534
557
/// Is the queue full.
535
558
pub fn is_full ( & self ) -> bool {
536
- critical_section:: with ( |cs| self . 0 . readyq ( cs, |q| q. is_full ( ) ) )
559
+ critical_section:: with ( |cs| {
560
+ unsafe {
561
+ // SAFETY: the closure does not call `readyq`
562
+ self . 0 . readyq ( cs, |q| q. is_full ( ) )
563
+ }
564
+ } )
537
565
}
538
566
539
567
/// Is the queue empty.
540
568
pub fn is_empty ( & self ) -> bool {
541
- critical_section:: with ( |cs| self . 0 . readyq ( cs, |q| q. is_empty ( ) ) )
569
+ critical_section:: with ( |cs| {
570
+ unsafe {
571
+ // SAFETY: the closure does not call `readyq`
572
+ self . 0 . readyq ( cs, |q| q. is_empty ( ) )
573
+ }
574
+ } )
542
575
}
543
576
}
544
577
545
578
impl < T , const N : usize > Drop for Receiver < ' _ , T , N > {
546
579
fn drop ( & mut self ) {
547
580
// Mark the receiver as dropped and wake all waiters
548
- critical_section:: with ( |cs| self . 0 . receiver_dropped ( cs, |v| * v = true ) ) ;
581
+ critical_section:: with ( |cs| {
582
+ unsafe {
583
+ // SAFETY: the closure does not call `receiver_dropped`
584
+ self . 0 . receiver_dropped ( cs, |v| * v = true )
585
+ }
586
+ } ) ;
549
587
550
588
while let Some ( waker) = self . 0 . wait_queue . pop ( ) {
551
589
waker. wake ( ) ;
0 commit comments