Skip to content

Commit 9b7b122

Browse files
pickfiredjc
authored andcommitted
Use server example based on warp tls
The current server example is not able to process another request while tls is resolving the cert, which prevents acme client from solving acme tls challenges, similarly during that all other requests will be blocked. Although I am not quite sure why but the tls in warp worked out for me.
1 parent 6c9e31b commit 9b7b122

File tree

2 files changed

+119
-26
lines changed

2 files changed

+119
-26
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ tokio-rustls = { version = "0.23", default-features = false }
2020
webpki-roots = { version = "0.22", optional = true }
2121

2222
[dev-dependencies]
23-
async-stream = "0.3.0"
2423
futures-util = { version = "0.3.1", default-features = false }
2524
hyper = { version = "0.14", features = ["full"] }
2625
rustls = { version = "0.20.1", default-features = false, features = ["tls12"] }

examples/server.rs

Lines changed: 119 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,19 @@
44
//! Certificate and private key are hardcoded to sample files.
55
//! hyper will automatically use HTTP/2 if a client starts talking HTTP/2,
66
//! otherwise HTTP/1.1 will be used.
7-
use std::{env, fs, io, sync};
8-
9-
use async_stream::stream;
10-
use futures_util::future::TryFutureExt;
11-
use hyper::server::accept;
7+
use core::task::{Context, Poll};
8+
use futures_util::ready;
9+
use hyper::server::accept::Accept;
10+
use hyper::server::conn::{AddrIncoming, AddrStream};
1211
use hyper::service::{make_service_fn, service_fn};
1312
use hyper::{Body, Method, Request, Response, Server, StatusCode};
14-
use tokio::net::TcpListener;
15-
use tokio_rustls::TlsAcceptor;
13+
use std::future::Future;
14+
use std::pin::Pin;
15+
use std::sync::Arc;
16+
use std::vec::Vec;
17+
use std::{env, fs, io, sync};
18+
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
19+
use tokio_rustls::rustls::ServerConfig;
1620

1721
fn main() {
1822
// Serve an echo service over HTTPS, with proper error handling.
@@ -33,7 +37,7 @@ async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3337
Some(ref p) => p.to_owned(),
3438
None => "1337".to_owned(),
3539
};
36-
let addr = format!("127.0.0.1:{}", port);
40+
let addr = format!("127.0.0.1:{}", port).parse()?;
3741

3842
// Build TLS configuration.
3943
let tls_cfg = {
@@ -53,31 +57,121 @@ async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
5357
};
5458

5559
// Create a TCP listener via tokio.
56-
let tcp = TcpListener::bind(&addr).await?;
57-
let tls_acceptor = TlsAcceptor::from(tls_cfg);
58-
// Prepare a long-running future stream to accept and serve clients.
59-
let incoming_tls_stream = stream! {
60-
loop {
61-
let (socket, _) = tcp.accept().await?;
62-
let stream = tls_acceptor.accept(socket).map_err(|e| {
63-
println!("[!] Voluntary server halt due to client-connection error...");
64-
// Errors could be handled here, instead of server aborting.
65-
// Ok(None)
66-
error(format!("TLS Error: {:?}", e))
67-
});
68-
yield stream.await;
69-
}
70-
};
71-
let acceptor = accept::from_stream(incoming_tls_stream);
60+
let incoming = AddrIncoming::bind(&addr)?;
7261
let service = make_service_fn(|_| async { Ok::<_, io::Error>(service_fn(echo)) });
73-
let server = Server::builder(acceptor).serve(service);
62+
let server = Server::builder(TlsAcceptor::new(tls_cfg, incoming)).serve(service);
7463

7564
// Run the future, keep going until an error occurs.
7665
println!("Starting to serve on https://{}.", addr);
7766
server.await?;
7867
Ok(())
7968
}
8069

70+
enum State {
71+
Handshaking(tokio_rustls::Accept<AddrStream>),
72+
Streaming(tokio_rustls::server::TlsStream<AddrStream>),
73+
}
74+
75+
// tokio_rustls::server::TlsStream doesn't expose constructor methods,
76+
// so we have to TlsAcceptor::accept and handshake to have access to it
77+
// TlsStream implements AsyncRead/AsyncWrite handshaking tokio_rustls::Accept first
78+
pub struct TlsStream {
79+
state: State,
80+
}
81+
82+
impl TlsStream {
83+
fn new(stream: AddrStream, config: Arc<ServerConfig>) -> TlsStream {
84+
let accept = tokio_rustls::TlsAcceptor::from(config).accept(stream);
85+
TlsStream {
86+
state: State::Handshaking(accept),
87+
}
88+
}
89+
}
90+
91+
impl AsyncRead for TlsStream {
92+
fn poll_read(
93+
self: Pin<&mut Self>,
94+
cx: &mut Context,
95+
buf: &mut ReadBuf,
96+
) -> Poll<io::Result<()>> {
97+
let pin = self.get_mut();
98+
match pin.state {
99+
State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
100+
Ok(mut stream) => {
101+
let result = Pin::new(&mut stream).poll_read(cx, buf);
102+
pin.state = State::Streaming(stream);
103+
result
104+
}
105+
Err(err) => Poll::Ready(Err(err)),
106+
},
107+
State::Streaming(ref mut stream) => Pin::new(stream).poll_read(cx, buf),
108+
}
109+
}
110+
}
111+
112+
impl AsyncWrite for TlsStream {
113+
fn poll_write(
114+
self: Pin<&mut Self>,
115+
cx: &mut Context<'_>,
116+
buf: &[u8],
117+
) -> Poll<io::Result<usize>> {
118+
let pin = self.get_mut();
119+
match pin.state {
120+
State::Handshaking(ref mut accept) => match ready!(Pin::new(accept).poll(cx)) {
121+
Ok(mut stream) => {
122+
let result = Pin::new(&mut stream).poll_write(cx, buf);
123+
pin.state = State::Streaming(stream);
124+
result
125+
}
126+
Err(err) => Poll::Ready(Err(err)),
127+
},
128+
State::Streaming(ref mut stream) => Pin::new(stream).poll_write(cx, buf),
129+
}
130+
}
131+
132+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
133+
match self.state {
134+
State::Handshaking(_) => Poll::Ready(Ok(())),
135+
State::Streaming(ref mut stream) => Pin::new(stream).poll_flush(cx),
136+
}
137+
}
138+
139+
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
140+
match self.state {
141+
State::Handshaking(_) => Poll::Ready(Ok(())),
142+
State::Streaming(ref mut stream) => Pin::new(stream).poll_shutdown(cx),
143+
}
144+
}
145+
}
146+
147+
pub struct TlsAcceptor {
148+
config: Arc<ServerConfig>,
149+
incoming: AddrIncoming,
150+
}
151+
152+
impl TlsAcceptor {
153+
pub fn new(config: Arc<ServerConfig>, incoming: AddrIncoming) -> TlsAcceptor {
154+
TlsAcceptor { config, incoming }
155+
}
156+
}
157+
158+
impl Accept for TlsAcceptor {
159+
type Conn = TlsStream;
160+
type Error = io::Error;
161+
162+
fn poll_accept(
163+
self: Pin<&mut Self>,
164+
cx: &mut Context<'_>,
165+
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
166+
let pin = self.get_mut();
167+
match ready!(Pin::new(&mut pin.incoming).poll_accept(cx)) {
168+
Some(Ok(sock)) => Poll::Ready(Some(Ok(TlsStream::new(sock, pin.config.clone())))),
169+
Some(Err(e)) => Poll::Ready(Some(Err(e))),
170+
None => Poll::Ready(None),
171+
}
172+
}
173+
}
174+
81175
// Custom echo service, handling two different routes and a
82176
// catch-all 404 responder.
83177
async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {

0 commit comments

Comments
 (0)