Skip to content

Commit cc4ae52

Browse files
Zarathustra2keelerm84
authored andcommitted
chore: Emit SSE::Connected event when stream is established
1 parent 4cf3faf commit cc4ae52

File tree

6 files changed

+17
-16
lines changed

6 files changed

+17
-16
lines changed

contract-tests/src/bin/sse-test-api/main.rs

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ struct Config {
5050
#[derive(Serialize, Debug)]
5151
#[serde(tag = "kind", rename_all = "camelCase")]
5252
enum EventType {
53+
Connected {},
5354
Event { event: Event },
5455
Comment { comment: String },
5556
Error { error: String },
@@ -66,6 +67,7 @@ impl From<es::SSE> for EventType {
6667
},
6768
},
6869
es::SSE::Comment(comment) => Self::Comment { comment },
70+
es::SSE::Connected(_) => Self::Connected {},
6971
}
7072
}
7173
}

contract-tests/src/bin/sse-test-api/stream_entity.rs

+4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ impl Inner {
3636
match stream.try_next().await {
3737
Ok(Some(event)) => {
3838
let event_type: EventType = event.into();
39+
if let EventType::Connected {} = event_type {
40+
continue;
41+
}
42+
3943
if !self.send_message(event_type, &client).await {
4044
break;
4145
}

eventsource-client/examples/tail.rs

+3
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ fn tail_events(client: impl es::Client) -> impl Stream<Item = Result<(), ()>> {
4646
es::SSE::Comment(comment) => {
4747
println!("got a comment: \n{}", comment)
4848
}
49+
es::SSE::Connected(headers) => {
50+
println!("got a connection start with headers: \n{:?}", headers)
51+
}
4952
})
5053
.map_err(|err| eprintln!("error streaming events: {:?}", err))
5154
}

eventsource-client/src/client.rs

+4-14
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ where
403403
Poll::Ready(Some(Ok(event)))
404404
}
405405
SSE::Comment(_) => Poll::Ready(Some(Ok(event))),
406+
SSE::Connected(_) => Poll::Ready(Some(Ok(event))),
406407
};
407408
}
408409

@@ -436,13 +437,15 @@ where
436437
debug!("HTTP response: {:#?}", resp);
437438

438439
if resp.status().is_success() {
440+
let reply =
441+
Poll::Ready(Some(Ok(SSE::Connected(resp.headers().to_owned()))));
439442
self.as_mut().project().retry_strategy.reset(Instant::now());
440443
self.as_mut().reset_redirects();
441444
self.as_mut()
442445
.project()
443446
.state
444447
.set(State::Connected(resp.into_body()));
445-
continue;
448+
return reply;
446449
}
447450

448451
if resp.status() == 301 || resp.status() == 307 {
@@ -575,19 +578,6 @@ fn delay(dur: Duration, description: &str) -> Sleep {
575578
tokio::time::sleep(dur)
576579
}
577580

578-
#[derive(Debug)]
579-
struct StatusError {
580-
status: StatusCode,
581-
}
582-
583-
impl Display for StatusError {
584-
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
585-
write!(f, "Invalid status code: {}", self.status)
586-
}
587-
}
588-
589-
impl std::error::Error for StatusError {}
590-
591581
mod private {
592582
use crate::client::ClientImpl;
593583

eventsource-client/src/event_parser.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{collections::VecDeque, convert::TryFrom, str::from_utf8};
22

3-
use hyper::body::Bytes;
3+
use hyper::{body::Bytes, http::HeaderValue, HeaderMap};
44
use log::{debug, log_enabled, trace};
55
use pin_project::pin_project;
66

@@ -32,6 +32,7 @@ impl EventData {
3232

3333
#[derive(Debug, Eq, PartialEq)]
3434
pub enum SSE {
35+
Connected(HeaderMap<HeaderValue>),
3536
Event(Event),
3637
Comment(String),
3738
}

eventsource-client/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
//! let mut stream = Box::pin(client.stream())
1515
//! .map_ok(|event| match event {
1616
//! SSE::Comment(comment) => println!("got a comment event: {:?}", comment),
17-
//! SSE::Event(evt) => println!("got an event: {}", evt.event_type)
17+
//! SSE::Event(evt) => println!("got an event: {}", evt.event_type),
18+
//! SSE::Connected(headers) => println!("got connection start with headers: {:?}", headers)
1819
//! })
1920
//! .map_err(|e| println!("error streaming events: {:?}", e));
2021
//! # while let Ok(Some(_)) = stream.try_next().await {}

0 commit comments

Comments
 (0)