Skip to content
This repository was archived by the owner on Jun 21, 2020. It is now read-only.

Read trait #48

Merged
merged 8 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ members = ["http-service-hyper", "http-service-lambda", "http-service-mock"]
bytes = "0.4.12"
futures = "0.3.1"
http = "0.1.17"
async-std = { version = "1.0.1", default-features = false, features = ["std"] }
pin-project-lite = "0.1.0"

[dev-dependencies]
http-service-hyper = { version = "0.3.1", path = "./http-service-hyper" }
Expand Down
82 changes: 14 additions & 68 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,91 +37,37 @@
</h3>
</div>

<div align="center">
<sub>Built with ⛵ by <a href="https://github.com/rustasync">The Rust Async Ecosystem WG</a>
</div>

## About

The crate `http-service` provides the necessary types and traits to implement your own HTTP Server. It uses `hyper` for the lower level TCP abstraction.
The crate `http-service` provides the necessary types and traits to implement
your own HTTP Server.

You can use the workspace member [`http-service-hyper`](https://crates.io/crates/http-service-hyper) to run your HTTP Server.
For example you can use the workspace member
[`http-service-hyper`](https://crates.io/crates/http-service-hyper) to run an
HTTP Server.

1. Runs via `http_service_hyper::run(HTTP_SERVICE, ADDRESS);`
2. Returns a future which can be `await`ed via `http_service_hyper::serve(HTTP_SERVICE, ADDRESS);`

This crate uses the latest [Futures](https://github.com/rust-lang-nursery/futures-rs) preview, and therefore needs to be run on Rust Nightly.

## Examples

**Cargo.toml**

```toml
[dependencies]
http-service = "0.3.1"
http-service-hyper = "0.3.1"
futures-preview = "0.3.0-alpha.16"
```

**main.rs**

```rust,no_run
use futures::future::{self, BoxFuture, FutureExt};
use http_service::{HttpService, Response};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

struct Server {
message: Vec<u8>,
}

impl Server {
fn create(message: Vec<u8>) -> Server {
Server { message }
}

pub fn run(s: Server) {
let a = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8088);
http_service_hyper::run(s, a);
}
}

impl HttpService for Server {
type Connection = ();
type ConnectionFuture = future::Ready<Result<(), std::io::Error>>;
type ResponseFuture = BoxFuture<'static, Result<http_service::Response, std::io::Error>>;

fn connect(&self) -> Self::ConnectionFuture {
future::ok(())
}

fn respond(&self, _conn: &mut (), _req: http_service::Request) -> Self::ResponseFuture {
let message = self.message.clone();
async move { Ok(Response::new(http_service::Body::from(message))) }.boxed()
}
}

fn main() {
let s = Server::create(String::from("Hello, World").into_bytes());
Server::run(s);
}
```
2. Returns a future which can be `await`ed via
`http_service_hyper::serve(HTTP_SERVICE, ADDRESS);`

## Contributing

Want to join us? Check out our [The "Contributing" section of the guide][contributing] and take a look at some of these issues:
Want to join us? Check out our [The "Contributing" section of the
guide][contributing] and take a look at some of these issues:

- [Issues labeled "good first issue"][good-first-issue]
- [Issues labeled "help wanted"][help-wanted]

#### Conduct

The http-service project adheres to the [Contributor Covenant Code of Conduct](https://github.com/rustasync/.github/blob/master/CODE_OF_CONDUCT.md).
The http-service project adheres to the [Contributor Covenant Code of
Conduct](https://github.com/http-rs/.github/blob/master/CODE_OF_CONDUCT.md).
This describes the minimum behavior expected from all contributors.

## License

[MIT](./LICENSE-MIT) OR [Apache-2.0](./LICENSE-APACHE)

[contributing]: https://github.com/rustasync/.github/blob/master/CONTRIBUTING.md
[good-first-issue]: https://github.com/rustasync/http-service/labels/good%20first%20issue
[help-wanted]: https://github.com/rustasync/http-service/labels/help%20wanted
[contributing]: https://github.com/http-rs/.github/blob/master/CONTRIBUTING.md
[good-first-issue]: https://github.com/http-rs/http-service/labels/good%20first%20issue
[help-wanted]: https://github.com/http-rs/http-service/labels/help%20wanted
86 changes: 58 additions & 28 deletions http-service-hyper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,22 @@
#![warn(missing_docs, missing_doc_code_examples)]
#![cfg_attr(test, deny(warnings))]

#[cfg(feature = "runtime")]
use futures::compat::Future01CompatExt;
use futures::{
compat::{Compat, Compat01As03},
future::BoxFuture,
prelude::*,
stream,
task::Spawn,
};
#[cfg(feature = "runtime")]
use futures::compat::{Compat as Compat03As01, Compat01As03};
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream;
use futures::task::Spawn;
use http_service::{Body, HttpService};
use hyper::server::{Builder as HyperBuilder, Server as HyperServer};

use std::io;
#[cfg(feature = "runtime")]
use std::net::SocketAddr;
use std::{
pin::Pin,
sync::Arc,
task::{self, Poll},
};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Context, Poll};

// Wrapper type to allow us to provide a blanket `MakeService` impl
struct WrapHttpService<H> {
Expand All @@ -43,7 +41,7 @@ where
type ResBody = hyper::Body;
type Error = std::io::Error;
type Service = WrapConnection<H>;
type Future = Compat<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
type Future = Compat03As01<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
type MakeError = std::io::Error;

fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
Expand All @@ -68,22 +66,28 @@ where
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = std::io::Error;
type Future = Compat<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;
type Future =
Compat03As01<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;

fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
// Convert Request
let error = std::io::Error::from(std::io::ErrorKind::Other);
let req = req.map(|hyper_body| {
let stream = Compat01As03::new(hyper_body).map(|c| match c {
Ok(chunk) => Ok(chunk.into_bytes()),
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
});
Body::from_stream(stream)
let req = req.map(|body| {
let body_stream = Compat01As03::new(body)
.map(|chunk| chunk.map(|chunk| chunk.to_vec()))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let body_reader = body_stream.into_async_read();
Body::from_reader(body_reader)
});

let fut = self.service.respond(&mut self.connection, req);

// Convert Request
async move {
let res: http::Response<_> = fut.into_future().await.map_err(|_| error)?;
Ok(res.map(|body| hyper::Body::wrap_stream(body.compat())))
let (parts, body) = res.into_parts();
let body = hyper::Body::wrap_stream(Compat03As01::new(ChunkStream { body }));
Ok(hyper::Response::from_parts(parts, body))
}
.boxed()
.compat()
Expand All @@ -99,9 +103,9 @@ where
pub struct Server<I: TryStream, S, Sp> {
inner: Compat01As03<
HyperServer<
Compat<stream::MapOk<I, fn(I::Ok) -> Compat<I::Ok>>>,
Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
WrapHttpService<S>,
Compat<Sp>,
Compat03As01<Sp>,
>,
>,
}
Expand All @@ -115,7 +119,10 @@ impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
/// A builder for a [`Server`].
#[allow(clippy::type_complexity)] // single-use type with many compat layers
pub struct Builder<I: TryStream, Sp> {
inner: HyperBuilder<Compat<stream::MapOk<I, fn(I::Ok) -> Compat<I::Ok>>>, Compat<Sp>>,
inner: HyperBuilder<
Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
Compat03As01<Sp>,
>,
}

impl<I: TryStream, Sp> std::fmt::Debug for Builder<I, Sp> {
Expand All @@ -128,8 +135,8 @@ impl<I: TryStream> Server<I, (), ()> {
/// Starts a [`Builder`] with the provided incoming stream.
pub fn builder(incoming: I) -> Builder<I, ()> {
Builder {
inner: HyperServer::builder(Compat::new(incoming.map_ok(Compat::new as _)))
.executor(Compat::new(())),
inner: HyperServer::builder(Compat03As01::new(incoming.map_ok(Compat03As01::new as _)))
.executor(Compat03As01::new(())),
}
}
}
Expand All @@ -138,7 +145,7 @@ impl<I: TryStream, Sp> Builder<I, Sp> {
/// Sets the [`Spawn`] to deal with starting connection tasks.
pub fn with_spawner<Sp2>(self, new_spawner: Sp2) -> Builder<I, Sp2> {
Builder {
inner: self.inner.executor(Compat::new(new_spawner)),
inner: self.inner.executor(Compat03As01::new(new_spawner)),
}
}

Expand Down Expand Up @@ -221,3 +228,26 @@ pub fn run<S: HttpService>(s: S, addr: SocketAddr) {
let server = serve(s, addr).map(|_| Result::<_, ()>::Ok(())).compat();
hyper::rt::run(server);
}

/// A type that wraps an `AsyncRead` into a `Stream` of `hyper::Chunk`. Used for writing data to a
/// Hyper response.
struct ChunkStream<R: AsyncRead> {
body: R,
}

impl<R: AsyncRead + Unpin> futures::Stream for ChunkStream<R> {
type Item = Result<hyper::Chunk, Box<dyn std::error::Error + Send + Sync + 'static>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// This is not at all efficient, but that's okay for now.
let mut buf = vec![0; 1024];
let read = futures::ready!(Pin::new(&mut self.body).poll_read(cx, &mut buf))?;
if read == 0 {
return Poll::Ready(None);
} else {
buf.truncate(read);
let chunk = hyper::Chunk::from(buf);
Poll::Ready(Some(Ok(chunk)))
}
}
}
Loading