Skip to content

Commit df00291

Browse files
authored
Merge pull request #456 from http-rs/sse
Add initial SSE support
2 parents f30a602 + f673a98 commit df00291

File tree

8 files changed

+176
-1
lines changed

8 files changed

+176
-1
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ docs = ["unstable"]
2929
unstable = []
3030

3131
[dependencies]
32+
async-sse = "2.1.0"
3233
http-types = "1.0.1"
3334
http-service = "0.5.0"
3435
http-service-h1 = { version = "0.1.0", optional = true }

examples/sse.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use tide::sse;
2+
3+
#[async_std::main]
4+
async fn main() -> Result<(), std::io::Error> {
5+
let mut app = tide::new();
6+
app.at("/sse").get(sse::endpoint(|_req, sender| async move {
7+
sender.send("fruit", "banana", None).await;
8+
sender.send("fruit", "apple", None).await;
9+
Ok(())
10+
}));
11+
app.listen("localhost:8080").await?;
12+
Ok(())
13+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ pub mod security;
201201

202202
pub use endpoint::Endpoint;
203203
pub use request::Request;
204+
pub mod sse;
204205

205206
#[doc(inline)]
206207
pub use http_types::{Error, Result, Status};

src/response/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub(crate) enum CookieEvent {
2323
/// An HTTP response
2424
#[derive(Debug)]
2525
pub struct Response {
26-
res: http_service::Response,
26+
pub(crate) res: http_service::Response,
2727
// tracking here
2828
pub(crate) cookie_events: Vec<CookieEvent>,
2929
}

src/sse/endpoint.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use crate::http::{mime, Body, StatusCode};
2+
use crate::log;
3+
use crate::sse::Sender;
4+
use crate::utils::BoxFuture;
5+
use crate::{Endpoint, Request, Response, Result};
6+
7+
use async_std::future::Future;
8+
use async_std::io::BufReader;
9+
use async_std::task;
10+
11+
use std::marker::PhantomData;
12+
use std::sync::Arc;
13+
14+
/// Create an endpoint that can handle SSE connections.
15+
pub fn endpoint<F, Fut, State>(handler: F) -> SseEndpoint<F, Fut, State>
16+
where
17+
State: Send + Sync + 'static,
18+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
19+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
20+
{
21+
SseEndpoint {
22+
handler: Arc::new(handler),
23+
__state: PhantomData,
24+
__fut: PhantomData,
25+
}
26+
}
27+
28+
/// An endpoint that can handle SSE connections.
29+
#[derive(Debug)]
30+
pub struct SseEndpoint<F, Fut, State>
31+
where
32+
State: Send + Sync + 'static,
33+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
34+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
35+
{
36+
handler: Arc<F>,
37+
__state: PhantomData<State>,
38+
__fut: PhantomData<Fut>,
39+
}
40+
41+
impl<F, Fut, State> Endpoint<State> for SseEndpoint<F, Fut, State>
42+
where
43+
State: Send + Sync + 'static,
44+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
45+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
46+
{
47+
fn call<'a>(&'a self, req: Request<State>) -> BoxFuture<'a, Result<Response>> {
48+
let handler = self.handler.clone();
49+
Box::pin(async move {
50+
let (sender, encoder) = async_sse::encode();
51+
task::spawn(async move {
52+
let sender = Sender::new(sender);
53+
if let Err(err) = handler(req, sender).await {
54+
log::error!("SSE handler error: {:?}", err);
55+
}
56+
});
57+
58+
// Perform the handshake as described here:
59+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
60+
let mut res = Response::new(StatusCode::Ok);
61+
res.res.insert_header("Cache-Control", "no-cache").unwrap();
62+
res.res.set_content_type(mime::SSE);
63+
64+
let body = Body::from_reader(BufReader::new(encoder), None);
65+
res.set_body(body);
66+
67+
Ok(res)
68+
})
69+
}
70+
}

src/sse/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//! Server-Sent Events (SSE) types.
2+
//!
3+
//! # Errors
4+
//!
5+
//! Errors originating in the SSE handler will be logged. Errors originating
6+
//! during the encoding of the SSE stream will be handled by the backend engine
7+
//! the way any other IO error is handled.
8+
//!
9+
//! In the future we may introduce a better mechanism to handle errors that
10+
//! originate outside of regular endpoints.
11+
//!
12+
//! # Examples
13+
//!
14+
//! ```no_run
15+
//! # fn main() -> Result<(), std::io::Error> { async_std::task::block_on(async {
16+
//! #
17+
//! use tide::sse;
18+
//!
19+
//! let mut app = tide::new();
20+
//! app.at("/sse").get(sse::endpoint(|_req, sender| async move {
21+
//! sender.send("fruit", "banana", None).await;
22+
//! sender.send("fruit", "apple", None).await;
23+
//! Ok(())
24+
//! }));
25+
//! app.listen("localhost:8080").await?;
26+
//! # Ok(()) }) }
27+
//! ```
28+
29+
mod endpoint;
30+
mod sender;
31+
mod upgrade;
32+
33+
pub use endpoint::{endpoint, SseEndpoint};
34+
pub use sender::Sender;
35+
pub use upgrade::upgrade;

src/sse/sender.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/// An SSE message sender.
2+
#[derive(Debug)]
3+
pub struct Sender {
4+
sender: async_sse::Sender,
5+
}
6+
7+
impl Sender {
8+
/// Create a new instance of `Sender`.
9+
pub(crate) fn new(sender: async_sse::Sender) -> Self {
10+
Self { sender }
11+
}
12+
13+
/// Send data from the SSE channel.
14+
///
15+
/// Each message constists of a "name" and "data".
16+
pub async fn send(&self, name: &str, data: impl AsRef<str>, id: Option<&str>) {
17+
self.sender.send(name, data.as_ref().as_bytes(), id).await;
18+
}
19+
}

src/sse/upgrade.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use crate::http::{mime, Body, StatusCode};
2+
use crate::log;
3+
use crate::{Request, Response, Result};
4+
5+
use super::Sender;
6+
7+
use async_std::future::Future;
8+
use async_std::io::BufReader;
9+
use async_std::task;
10+
11+
/// Upgrade an existing HTTP connection to an SSE connection.
12+
pub fn upgrade<F, Fut, State>(req: Request<State>, handler: F) -> Response
13+
where
14+
State: Send + Sync + 'static,
15+
F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
16+
Fut: Future<Output = Result<()>> + Send + Sync + 'static,
17+
{
18+
let (sender, encoder) = async_sse::encode();
19+
task::spawn(async move {
20+
let sender = Sender::new(sender);
21+
if let Err(err) = handler(req, sender).await {
22+
log::error!("SSE handler error: {:?}", err);
23+
}
24+
});
25+
26+
// Perform the handshake as described here:
27+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
28+
let mut res = Response::new(StatusCode::Ok);
29+
res.res.insert_header("Cache-Control", "no-cache").unwrap();
30+
res.res.set_content_type(mime::SSE);
31+
32+
let body = Body::from_reader(BufReader::new(encoder), None);
33+
res.set_body(body);
34+
35+
res
36+
}

0 commit comments

Comments
 (0)