Skip to content
This repository was archived by the owner on Jun 21, 2020. It is now read-only.

Tokio 0.2 #43

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ members = ["http-service-hyper", "http-service-lambda", "http-service-mock"]

[dependencies]
bytes = "0.4.12"
http = "0.1.17"
http = "0.1.18"

[dev-dependencies]
http-service-hyper = { version = "0.3.1", path = "./http-service-hyper" }
tokio = "0.2.0-alpha.4"

[dependencies.futures-preview]
version = "0.3.0-alpha.16"
version = "0.3.0-alpha.18"

[patch.crates-io]
http-service = { path = "." }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ This crate uses the latest [Futures](https://github.com/rust-lang-nursery/future
[dependencies]
http-service = "0.3.1"
http-service-hyper = "0.3.1"
futures-preview = "0.3.0-alpha.16"
futures-preview = "0.3.0-alpha.18"
```

**main.rs**
Expand Down
9 changes: 5 additions & 4 deletions examples/simple_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ impl Server {
Server { message }
}

pub fn run(s: Server) {
pub async fn run(s: Server) {
let a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8088);
http_service_hyper::run(s, a);
http_service_hyper::serve(s, a).await.unwrap();
}
}

Expand All @@ -32,7 +32,8 @@ impl HttpService for Server {
}
}

fn main() {
#[tokio::main]
async fn main() {
let s = Server::create(String::from("Hello, World").into_bytes());
Server::run(s);
Server::run(s).await;
}
13 changes: 5 additions & 8 deletions http-service-hyper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@ version = "0.3.1"
[dependencies]
http = "0.1"
http-service = { version = "0.3.1", path = ".." }
hyper = { version = "0.12.27", default-features = false }
hyper = { version = "0.13.0-alpha.4", default-features = false, features = ["runtime", "unstable-stream"] }
tokio = "0.2.0-alpha"
tower-service = "0.3.0-alpha"

[dependencies.futures-preview]
features = ["compat", "io-compat"]
version = "0.3.0-alpha.16"

[features]
default = ["runtime"]
runtime = ["hyper/runtime"]
version = "0.3.0-alpha.18"

[dev-dependencies]
romio = "0.3.0-alpha.8"
romio = "0.3.0-alpha.9"
258 changes: 49 additions & 209 deletions http-service-hyper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,219 +5,59 @@
#![warn(missing_docs, missing_doc_code_examples)]
#![cfg_attr(test, deny(warnings))]

#[cfg(feature = "runtime")]
use futures::compat::Future01CompatExt;
use futures::{
compat::{Compat, Compat01As03},
future::BoxFuture,
prelude::*,
stream,
task::Spawn,
};
use http_service::{Body, HttpService};
use hyper::server::{Builder as HyperBuilder, Server as HyperServer};
#[cfg(feature = "runtime")]
use futures::prelude::*;
use http::Response;
use http_service::HttpService;
use hyper::service::{make_service_fn, service_fn};
use std::net::SocketAddr;
use std::{
pin::Pin,
sync::Arc,
task::{self, Poll},
};
use std::sync::Arc;

// Wrapper type to allow us to provide a blanket `MakeService` impl
struct WrapHttpService<H> {
service: Arc<H>,
}

// Wrapper type to allow us to provide a blanket `Service` impl
struct WrapConnection<H: HttpService> {
service: Arc<H>,
connection: H::Connection,
}

impl<H, Ctx> hyper::service::MakeService<Ctx> for WrapHttpService<H>
where
H: HttpService,
{
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = std::io::Error;
type Service = WrapConnection<H>;
type Future = Compat<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
type MakeError = std::io::Error;

fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
let service = self.service.clone();
let error = std::io::Error::from(std::io::ErrorKind::Other);
async move {
let connection = service.connect().into_future().await.map_err(|_| error)?;
Ok(WrapConnection {
service,
connection,
})
}
.boxed()
.compat()
}
}

impl<H> hyper::service::Service for WrapConnection<H>
/// Start hyper server and return a future.
pub async fn serve<CO, CE, RE, S>(s: S, addr: SocketAddr) -> Result<(), hyper::Error>
where
H: HttpService,
CO: Send + 'static,
CE: std::error::Error + Send + Sync + 'static,
RE: std::error::Error + Send + Sync + 'static,
S: HttpService<Connection = CO> + Send + 'static,
<S as http_service::HttpService>::ConnectionFuture:
Future<Output = Result<CO, CE>> + Send + 'static,
<S as http_service::HttpService>::ResponseFuture:
Future<Output = Result<Response<http_service::Body>, RE>> + Send + 'static,
{
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = std::io::Error;
type Future = Compat<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;

fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
let error = std::io::Error::from(std::io::ErrorKind::Other);
let req = req.map(|hyper_body| {
let stream = Compat01As03::new(hyper_body).map(|c| match c {
Ok(chunk) => Ok(chunk.into_bytes()),
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
});
Body::from_stream(stream)
});
let fut = self.service.respond(&mut self.connection, req);

async move {
let res: http::Response<_> = fut.into_future().await.map_err(|_| error)?;
Ok(res.map(|body| hyper::Body::wrap_stream(body.compat())))
}
.boxed()
.compat()
}
}

/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
///
/// [`Server`] is a [`Future`] mapping a bound listener with a set of service handlers. It is built
/// using the [`Builder`], and the future completes when the server has been shutdown. It should be
/// run by an executor.
#[allow(clippy::type_complexity)] // single-use type with many compat layers
pub struct Server<I: TryStream, S, Sp> {
inner: Compat01As03<
HyperServer<
Compat<stream::MapOk<I, fn(I::Ok) -> Compat<I::Ok>>>,
WrapHttpService<S>,
Compat<Sp>,
>,
>,
}

impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Server").finish()
}
}

/// A builder for a [`Server`].
#[allow(clippy::type_complexity)] // single-use type with many compat layers
pub struct Builder<I: TryStream, Sp> {
inner: HyperBuilder<Compat<stream::MapOk<I, fn(I::Ok) -> Compat<I::Ok>>>, Compat<Sp>>,
}

impl<I: TryStream, Sp> std::fmt::Debug for Builder<I, Sp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Builder").finish()
}
}

impl<I: TryStream> Server<I, (), ()> {
/// Starts a [`Builder`] with the provided incoming stream.
pub fn builder(incoming: I) -> Builder<I, ()> {
Builder {
inner: HyperServer::builder(Compat::new(incoming.map_ok(Compat::new as _)))
.executor(Compat::new(())),
}
}
}

impl<I: TryStream, Sp> Builder<I, Sp> {
/// Sets the [`Spawn`] to deal with starting connection tasks.
pub fn with_spawner<Sp2>(self, new_spawner: Sp2) -> Builder<I, Sp2> {
Builder {
inner: self.inner.executor(Compat::new(new_spawner)),
}
}

/// Consume this [`Builder`], creating a [`Server`].
///
/// # Examples
///
/// ```no_run
/// use http_service::{Response, Body};
/// use http_service_hyper::Server;
/// use romio::TcpListener;
///
/// // Construct an executor to run our tasks on
/// let mut pool = futures::executor::ThreadPool::new()?;
///
/// // And an HttpService to handle each connection...
/// let service = |req| {
/// futures::future::ok::<_, ()>(Response::new(Body::from("Hello World")))
/// };
///
/// // Then bind, configure the spawner to our pool, and serve...
/// let addr = "127.0.0.1:3000".parse()?;
/// let mut listener = TcpListener::bind(&addr)?;
/// let server = Server::builder(listener.incoming())
/// .with_spawner(pool.clone())
/// .serve(service);
///
/// // Finally, spawn `server` onto our executor...
/// pool.run(server)?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
pub fn serve<S: HttpService>(self, service: S) -> Server<I, S, Sp>
where
I: TryStream + Unpin,
I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Sp: Clone + Send + 'static,
for<'a> &'a Sp: Spawn,
{
Server {
inner: Compat01As03::new(self.inner.serve(WrapHttpService {
service: Arc::new(service),
})),
let s = Arc::new(s);

let make_svc = make_service_fn({
let s = s.clone();
move |_| {
let s = s.clone();
async move {
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(service_fn({
move |req: http::Request<hyper::Body>| {
let s = s.clone();
async move {
let mut conn = s.connect().await?;
let (parts, body) = req.into_parts();
let body = http_service::Body::from_stream(
body.map_ok(|b| b.into()).map_err(|e: hyper::Error| {
std::io::Error::new(std::io::ErrorKind::Other, e)
}),
);

let rsp = s
.respond(&mut conn, http::Request::from_parts(parts, body))
.await?;

let (parts, body) = rsp.into_parts();
let body: hyper::Body = body.into_vec().await?.into();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the body be wrapped into a compatible stream for hyper::Body instead of converting it into a Vec? I see for that we need the unstable-stream feature from hyper.

Ok::<_, Box<dyn std::error::Error + Send + Sync>>(
http::Response::from_parts(parts, body),
)
}
}
}))
}
}
}
}

impl<I, S, Sp> Future for Server<I, S, Sp>
where
I: TryStream + Unpin,
I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S: HttpService,
Sp: Clone + Send + 'static,
for<'a> &'a Sp: Spawn,
{
type Output = hyper::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<hyper::Result<()>> {
self.inner.poll_unpin(cx)
}
}

/// Serve the given `HttpService` at the given address, using `hyper` as backend, and return a
/// `Future` that can be `await`ed on.
#[cfg(feature = "runtime")]
pub fn serve<S: HttpService>(
s: S,
addr: SocketAddr,
) -> impl Future<Output = Result<(), hyper::Error>> {
let service = WrapHttpService {
service: Arc::new(s),
};
hyper::Server::bind(&addr).serve(service).compat()
}
});

/// Run the given `HttpService` at the given address on the default runtime, using `hyper` as
/// backend.
#[cfg(feature = "runtime")]
pub fn run<S: HttpService>(s: S, addr: SocketAddr) {
let server = serve(s, addr).map(|_| Result::<_, ()>::Ok(())).compat();
hyper::rt::run(server);
hyper::Server::bind(&addr).serve(make_svc).await
}
2 changes: 1 addition & 1 deletion http-service-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ tokio = "0.1.21"

[dependencies.futures-preview]
features = ["compat"]
version = "0.3.0-alpha.16"
version = "0.3.0-alpha.18"

[dev-dependencies]
log = "0.4.6"
Expand Down
2 changes: 1 addition & 1 deletion http-service-mock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ version = "0.3.1"
http-service = { version = "0.3.1", path = ".." }

[dependencies.futures-preview]
version = "0.3.0-alpha.16"
version = "0.3.0-alpha.18"
Loading