Skip to content

Commit 6a416bc

Browse files
committed
wip
1 parent 3151796 commit 6a416bc

File tree

5 files changed

+1113
-1046
lines changed

5 files changed

+1113
-1046
lines changed

tests/h2-support/src/future_ext.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ pub trait FutureExt: Future {
7878
where
7979
Self: Future + Sized + 'static,
8080
{
81-
Box::new(super::util::yield_once().then(move |_| self))
81+
Box::new(async move {
82+
futures::pending!();
83+
self.await
84+
})
8285
}
8386
}
8487

tests/h2-support/src/mock.rs

Lines changed: 47 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -112,19 +112,6 @@ impl Handle {
112112
&mut self.codec
113113
}
114114

115-
/// Send a frame
116-
pub fn send(&mut self, item: SendFrame) -> Result<(), SendError> {
117-
block_on(async {
118-
// Queue the frame
119-
self.codec.send(item).await?;
120-
121-
// Flush the frame
122-
self.codec.flush().await?;
123-
124-
Ok(())
125-
})
126-
}
127-
128115
/// Writes the client preface
129116
pub fn write_preface(&mut self) {
130117
// Write the connnection preface
@@ -156,8 +143,7 @@ impl Handle {
156143
T: Into<frame::Settings>,
157144
{
158145
let settings = settings.into();
159-
self.send(frame::Settings::from(settings).into()).unwrap();
160-
self.expected_settings_acks += 1;
146+
self.send_frame(frame::Settings::from(settings)).await;
161147
self.read_preface().await
162148
}
163149

@@ -176,18 +162,24 @@ impl Handle {
176162
T: Into<frame::Settings>,
177163
{
178164
self.write_preface();
179-
self.send(settings.into().into()).unwrap();
180-
self.expected_settings_acks += 1;
165+
self.send_frame(settings.into()).await;
181166
self
182167
}
183168

184169
pub async fn close(mut self) {
185170
futures::future::poll_fn(move |cx| {
186-
let _ = self.poll_next_unpin(cx);
187-
if self.expected_settings_acks == 0 {
188-
Poll::Ready(())
189-
} else {
190-
Poll::Pending
171+
loop {
172+
match self.poll_next_unpin(cx) {
173+
Poll::Ready(Some(_)) => {},
174+
Poll::Ready(None) => return Poll::Ready(()),
175+
Poll::Pending => {
176+
if self.expected_settings_acks == 0 {
177+
return Poll::Ready(())
178+
} else {
179+
return Poll::Pending
180+
}
181+
}
182+
}
191183
}
192184
}).await
193185
}
@@ -211,7 +203,13 @@ impl Handle {
211203
}
212204

213205
pub async fn send_frame<T: Into<SendFrame>>(&mut self, item: T) {
214-
self.codec.send(item.into()).await.unwrap();
206+
let frame = item.into();
207+
if let Frame::Settings(ref settings) = frame {
208+
if !settings.is_ack() {
209+
self.expected_settings_acks += 1;
210+
}
211+
}
212+
self.codec.send(frame).await.unwrap();
215213
self.codec.flush().await.unwrap();
216214
}
217215
}
@@ -230,7 +228,7 @@ impl Stream for Handle {
230228
}
231229
self.poll_next(cx)
232230
} else {
233-
self.send(frame::Settings::ack().into()).unwrap();
231+
block_on(self.send_frame(frame::Settings::ack()));
234232
Poll::Ready(Some(Ok(settings.into())))
235233
}
236234
}
@@ -285,7 +283,7 @@ impl Drop for Handle {
285283
fn drop(&mut self) {
286284
block_on(self.codec.close()).unwrap();
287285

288-
let mut me = self.codec.get_ref().get_ref().get_ref().inner.try_lock().unwrap();
286+
let mut me = block_on(self.codec.get_ref().get_ref().get_ref().inner.lock());
289287
me.closed = true;
290288

291289
if let Some(task) = me.rx_task.take() {
@@ -311,7 +309,7 @@ impl AsyncRead for Mock {
311309
"attempted read with zero length buffer... wut?"
312310
);
313311

314-
let mut me = self.pipe.inner.try_lock().unwrap();
312+
let mut me = ready!(self.pipe.inner.lock().poll_unpin(cx));
315313

316314
if me.rx.is_empty() {
317315
if me.closed {
@@ -336,7 +334,7 @@ impl AsyncWrite for Mock {
336334
cx: &mut Context,
337335
mut src: &[u8],
338336
) -> Poll<io::Result<usize>> {
339-
let mut me = self.pipe.inner.try_lock().unwrap();
337+
let mut me = ready!(self.pipe.inner.lock().poll_unpin(cx));
340338

341339
if me.closed {
342340
return Poll::Ready(Ok(src.len()));
@@ -378,7 +376,7 @@ impl AsyncWrite for Mock {
378376

379377
impl Drop for Mock {
380378
fn drop(&mut self) {
381-
let mut me = self.pipe.inner.try_lock().unwrap();
379+
let mut me = block_on(self.pipe.inner.lock());
382380
me.closed = true;
383381

384382
if let Some(task) = me.tx_task.take() {
@@ -400,7 +398,7 @@ impl AsyncRead for Pipe {
400398
"attempted read with zero length buffer... wut?"
401399
);
402400

403-
let mut me = self.inner.try_lock().unwrap();
401+
let mut me = ready!(self.inner.lock().poll_unpin(cx));
404402

405403
if me.tx.is_empty() {
406404
if me.closed {
@@ -421,10 +419,10 @@ impl AsyncRead for Pipe {
421419
impl AsyncWrite for Pipe {
422420
fn poll_write(
423421
self: Pin<&mut Self>,
424-
_: &mut Context,
422+
cx: &mut Context,
425423
src: &[u8],
426424
) -> Poll<io::Result<usize>> {
427-
let mut me = self.inner.try_lock().unwrap();
425+
let mut me = ready!(self.inner.lock().poll_unpin(cx));
428426
me.rx.extend(src);
429427

430428
if let Some(task) = me.rx_task.take() {
@@ -456,6 +454,19 @@ pub trait HandleFutureExt: Future<Output = Handle> + Send + Sized + 'static {
456454
self.recv_frame(settings.into())
457455
}
458456

457+
fn ignore_settings(self) -> Pin<Box<dyn Future<Output = Handle> + Send>> {
458+
async {
459+
let mut handle = self.await;
460+
let frame = handle.next().await;
461+
match frame {
462+
Some(Ok(frame::Frame::Settings(_))) => {},
463+
Some(frame) => panic!("Expected settings, got {:?}", frame),
464+
None => panic!("Expected settings, got EOF"),
465+
};
466+
handle
467+
}.boxed()
468+
}
469+
459470
fn recv_frame<T>(self, frame: T) -> Pin<Box<dyn Future<Output = Handle> + Send>>
460471
where
461472
T: Into<Frame>,
@@ -506,23 +517,11 @@ pub trait HandleFutureExt: Future<Output = Handle> + Send + Sized + 'static {
506517
}
507518

508519
fn idle_ms(self, ms: usize) -> Pin<Box<dyn Future<Output = Handle> + Send>> {
509-
use std::thread;
510-
use std::time::Duration;
511-
512-
self.then(move |handle| {
513-
// This is terrible... but oh well
514-
let (tx, rx) = oneshot::channel();
515-
516-
thread::spawn(move || {
517-
thread::sleep(Duration::from_millis(ms as u64));
518-
tx.send(()).unwrap();
519-
});
520-
521-
Idle {
522-
handle: Some(handle),
523-
timeout: rx,
524-
}
525-
}).boxed()
520+
async move {
521+
let res = self.await;
522+
crate::util::idle_ms(ms).await;
523+
res
524+
}.boxed()
526525
}
527526

528527
fn buffer_bytes(self, num: usize) -> Pin<Box<dyn Future<Output = Handle> + Send>> {
@@ -586,25 +585,6 @@ pub trait HandleFutureExt: Future<Output = Handle> + Send + Sized + 'static {
586585
}
587586
}
588587

589-
pub struct Idle {
590-
handle: Option<Handle>,
591-
timeout: oneshot::Receiver<()>,
592-
}
593-
594-
impl Future for Idle {
595-
type Output = Handle;
596-
597-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
598-
if self.timeout.poll_unpin(cx).is_ready() {
599-
return Poll::Ready(self.handle.take().unwrap());
600-
}
601-
602-
self.handle.as_mut().unwrap().poll_next_unpin(cx).map(|res| {
603-
panic!("Idle received unexpected frame on handle; frame={:?}", res);
604-
})
605-
}
606-
}
607-
608588
impl<T> HandleFutureExt for T
609589
where
610590
T: Future<Output = Handle> + Send + 'static,

tests/h2-support/src/prelude.rs

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -68,36 +68,6 @@ impl MockH2 for super::mock_io::Builder {
6868
}
6969
}
7070

71-
pub trait ClientExt {
72-
fn run<F: Future + Unpin>(&mut self, f: F) -> F::Output;
73-
}
74-
75-
impl<T, B> ClientExt for client::Connection<T, B>
76-
where
77-
T: AsyncRead + AsyncWrite + 'static,
78-
B: IntoBuf + 'static,
79-
{
80-
// FIXME: There must be a proper way to do this...
81-
fn run<F: Future + Unpin>(&mut self, f: F) -> F::Output {
82-
use futures01::Future;
83-
use futures::executor::block_on;
84-
use futures::select;
85-
86-
let mut connection_future = future::poll_fn(|_| {
87-
util::poll_01_to_03(self.poll())
88-
}).fuse();
89-
let mut f = f.fuse();
90-
block_on(async {
91-
loop {
92-
select! {
93-
conn_res = connection_future => conn_res.unwrap(),
94-
v = f => break v,
95-
}
96-
}
97-
})
98-
}
99-
}
100-
10171
pub fn build_large_headers() -> Vec<(&'static str, String)> {
10272
vec![
10373
("one", "hello".to_string()),

tests/h2-support/src/util.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,20 @@ pub fn byte_str(s: &str) -> String<Bytes> {
1111
String::try_from(Bytes::from(s)).unwrap()
1212
}
1313

14-
pub async fn yield_once() {
15-
futures::pending!();
14+
pub async fn idle_ms(ms: usize) {
15+
use futures::channel::oneshot;
16+
use std::thread;
17+
use std::time::Duration;
18+
19+
// This is terrible... but oh well
20+
let (tx, rx) = oneshot::channel();
21+
22+
thread::spawn(move || {
23+
thread::sleep(Duration::from_millis(ms as u64));
24+
tx.send(()).unwrap();
25+
});
26+
27+
rx.await.unwrap();
1628
}
1729

1830
pub fn wait_for_capacity(stream: h2::SendStream<Bytes>, target: usize) -> WaitForCapacity {

0 commit comments

Comments
 (0)