Skip to content

Commit a6e6ece

Browse files
committed
Merge branch 'reunite' of https://github.com/spinda/futures-rs
2 parents 8a10de5 + b90ac1d commit a6e6ece

File tree

5 files changed

+124
-21
lines changed

5 files changed

+124
-21
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ matrix:
1212
after_success:
1313
- travis-cargo doc-upload
1414
- os: linux
15-
rust: 1.10.0
15+
rust: 1.17.0
1616
script: cargo test
1717
sudo: false
1818
script:

src/stream/split.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
use {StartSend, Sink, Stream, Poll, Async, AsyncSink};
2+
use std::error::Error;
3+
use std::fmt;
24
use sync::BiLock;
35

46
/// A `Stream` part of the split pair
57
#[derive(Debug)]
68
pub struct SplitStream<S>(BiLock<S>);
79

10+
impl<S> SplitStream<S> {
11+
/// Attempts to put the two "halves" of a split `Stream + Sink` back
12+
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
13+
/// a matching pair originating from the same call to `Stream::split`.
14+
pub fn reunite(self, other: SplitSink<S>) -> Result<S, ReuniteError<S>> {
15+
other.reunite(self)
16+
}
17+
}
18+
819
impl<S: Stream> Stream for SplitStream<S> {
920
type Item = S::Item;
1021
type Error = S::Error;
@@ -21,6 +32,17 @@ impl<S: Stream> Stream for SplitStream<S> {
2132
#[derive(Debug)]
2233
pub struct SplitSink<S>(BiLock<S>);
2334

35+
impl<S> SplitSink<S> {
36+
/// Attempts to put the two "halves" of a split `Stream + Sink` back
37+
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
38+
/// a matching pair originating from the same call to `Stream::split`.
39+
pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S>> {
40+
self.0.reunite(other.0).map_err(|err| {
41+
ReuniteError(SplitSink(err.0), SplitStream(err.1))
42+
})
43+
}
44+
}
45+
2446
impl<S: Sink> Sink for SplitSink<S> {
2547
type SinkItem = S::SinkItem;
2648
type SinkError = S::SinkError;
@@ -55,3 +77,27 @@ pub fn split<S: Stream + Sink>(s: S) -> (SplitSink<S>, SplitStream<S>) {
5577
let write = SplitSink(b);
5678
(write, read)
5779
}
80+
81+
/// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
82+
/// of a `Stream + Split`, and thus could not be `reunite`d.
83+
pub struct ReuniteError<T>(pub SplitSink<T>, pub SplitStream<T>);
84+
85+
impl<T> fmt::Debug for ReuniteError<T> {
86+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
87+
fmt.debug_tuple("ReuniteError")
88+
.field(&"...")
89+
.finish()
90+
}
91+
}
92+
93+
impl<T> fmt::Display for ReuniteError<T> {
94+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
95+
write!(fmt, "tried to reunite a SplitStream and SplitSink that don't form a pair")
96+
}
97+
}
98+
99+
impl<T> Error for ReuniteError<T> {
100+
fn description(&self) -> &str {
101+
"tried to reunite a SplitStream and SplitSink that don't form a pair"
102+
}
103+
}

src/sync/bilock.rs

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::boxed::Box;
22
use std::cell::UnsafeCell;
3+
use std::error::Error;
4+
use std::fmt;
35
use std::mem;
46
use std::ops::{Deref, DerefMut};
57
use std::sync::Arc;
@@ -35,7 +37,7 @@ pub struct BiLock<T> {
3537
#[derive(Debug)]
3638
struct Inner<T> {
3739
state: AtomicUsize,
38-
inner: UnsafeCell<T>,
40+
inner: Option<UnsafeCell<T>>,
3941
}
4042

4143
unsafe impl<T: Send> Send for Inner<T> {}
@@ -50,7 +52,7 @@ impl<T> BiLock<T> {
5052
pub fn new(t: T) -> (BiLock<T>, BiLock<T>) {
5153
let inner = Arc::new(Inner {
5254
state: AtomicUsize::new(0),
53-
inner: UnsafeCell::new(t),
55+
inner: Some(UnsafeCell::new(t)),
5456
});
5557

5658
(BiLock { inner: inner.clone() }, BiLock { inner: inner })
@@ -131,6 +133,21 @@ impl<T> BiLock<T> {
131133
}
132134
}
133135

136+
/// Attempts to put the two "halves" of a `BiLock<T>` back together and
137+
/// recover the original value. Succeeds only if the two `BiLock<T>`s
138+
/// originated from the same call to `BiLock::new`.
139+
pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>> {
140+
if Arc::ptr_eq(&self.inner, &other.inner) {
141+
drop(other);
142+
let inner = Arc::try_unwrap(self.inner)
143+
.ok()
144+
.expect("futures: try_unwrap failed in BiLock<T>::reunite");
145+
Ok(unsafe { inner.into_inner() })
146+
} else {
147+
Err(ReuniteError(self, other))
148+
}
149+
}
150+
134151
fn unlock(&self) {
135152
match self.inner.state.swap(0, SeqCst) {
136153
// we've locked the lock, shouldn't be possible for us to see an
@@ -149,12 +166,42 @@ impl<T> BiLock<T> {
149166
}
150167
}
151168

169+
impl<T> Inner<T> {
170+
unsafe fn into_inner(mut self) -> T {
171+
mem::replace(&mut self.inner, None).unwrap().into_inner()
172+
}
173+
}
174+
152175
impl<T> Drop for Inner<T> {
153176
fn drop(&mut self) {
154177
assert_eq!(self.state.load(SeqCst), 0);
155178
}
156179
}
157180

181+
/// Error indicating two `BiLock<T>`s were not two halves of a whole, and
182+
/// thus could not be `reunite`d.
183+
pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
184+
185+
impl<T> fmt::Debug for ReuniteError<T> {
186+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
187+
fmt.debug_tuple("ReuniteError")
188+
.field(&"...")
189+
.finish()
190+
}
191+
}
192+
193+
impl<T> fmt::Display for ReuniteError<T> {
194+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
195+
write!(fmt, "tried to reunite two BiLocks that don't form a pair")
196+
}
197+
}
198+
199+
impl<T> Error for ReuniteError<T> {
200+
fn description(&self) -> &str {
201+
"tried to reunite two BiLocks that don't form a pair"
202+
}
203+
}
204+
158205
/// Returned RAII guard from the `poll_lock` method.
159206
///
160207
/// This structure acts as a sentinel to the data in the `BiLock<T>` itself,
@@ -168,13 +215,13 @@ pub struct BiLockGuard<'a, T: 'a> {
168215
impl<'a, T> Deref for BiLockGuard<'a, T> {
169216
type Target = T;
170217
fn deref(&self) -> &T {
171-
unsafe { &*self.inner.inner.inner.get() }
218+
unsafe { &*self.inner.inner.inner.as_ref().unwrap().get() }
172219
}
173220
}
174221

175222
impl<'a, T> DerefMut for BiLockGuard<'a, T> {
176223
fn deref_mut(&mut self) -> &mut T {
177-
unsafe { &mut *self.inner.inner.inner.get() }
224+
unsafe { &mut *self.inner.inner.inner.as_ref().unwrap().get() }
178225
}
179226
}
180227

@@ -231,13 +278,13 @@ impl<T> BiLockAcquired<T> {
231278
impl<T> Deref for BiLockAcquired<T> {
232279
type Target = T;
233280
fn deref(&self) -> &T {
234-
unsafe { &*self.inner.as_ref().unwrap().inner.inner.get() }
281+
unsafe { &*self.inner.as_ref().unwrap().inner.inner.as_ref().unwrap().get() }
235282
}
236283
}
237284

238285
impl<T> DerefMut for BiLockAcquired<T> {
239286
fn deref_mut(&mut self) -> &mut T {
240-
unsafe { &mut *self.inner.as_mut().unwrap().inner.inner.get() }
287+
unsafe { &mut *self.inner.as_mut().unwrap().inner.inner.as_ref().unwrap().get() }
241288
}
242289
}
243290

tests/bilock.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,31 @@ use support::*;
1515
fn smoke() {
1616
let future = future::lazy(|| {
1717
let (a, b) = BiLock::new(1);
18-
let mut lock = match a.poll_lock() {
19-
Async::Ready(l) => l,
20-
Async::NotReady => panic!("poll not ready"),
21-
};
22-
assert_eq!(*lock, 1);
23-
*lock = 2;
2418

25-
assert!(b.poll_lock().is_not_ready());
26-
assert!(a.poll_lock().is_not_ready());
27-
drop(lock);
19+
{
20+
let mut lock = match a.poll_lock() {
21+
Async::Ready(l) => l,
22+
Async::NotReady => panic!("poll not ready"),
23+
};
24+
assert_eq!(*lock, 1);
25+
*lock = 2;
26+
27+
assert!(b.poll_lock().is_not_ready());
28+
assert!(a.poll_lock().is_not_ready());
29+
}
2830

2931
assert!(b.poll_lock().is_ready());
3032
assert!(a.poll_lock().is_ready());
3133

32-
let lock = match b.poll_lock() {
33-
Async::Ready(l) => l,
34-
Async::NotReady => panic!("poll not ready"),
35-
};
36-
assert_eq!(*lock, 2);
34+
{
35+
let lock = match b.poll_lock() {
36+
Async::Ready(l) => l,
37+
Async::NotReady => panic!("poll not ready"),
38+
};
39+
assert_eq!(*lock, 2);
40+
}
41+
42+
assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
3743

3844
Ok::<(), ()>(())
3945
});
@@ -73,6 +79,8 @@ fn concurrent() {
7379
Async::NotReady => panic!("poll not ready"),
7480
}
7581

82+
assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
83+
7684
struct Increment {
7785
remaining: usize,
7886
a: Option<BiLock<usize>>,

tests/split.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ fn test_split() {
3939
{
4040
let j = Join(iter(vec![Ok(10), Ok(20), Ok(30)]), &mut dest);
4141
let (sink, stream) = j.split();
42+
let j = sink.reunite(stream).expect("test_split: reunite error");
43+
let (sink, stream) = j.split();
4244
sink.send_all(stream).wait().unwrap();
4345
}
4446
assert_eq!(dest, vec![10, 20, 30]);

0 commit comments

Comments
 (0)