Skip to content

Commit 17ea5ff

Browse files
authored
Implement a Sink interface for hedwig publishing (#25)
This introduces [`Sink`][1] implementations which validate and publish messages to hedwig. See the docs for details. The underlying implementation supports retries, but does not actually perform them. I plan to add a retrying Sink wrapper at some point in the future. I also took the liberty to add a type alias for topics, because `&'static str` is not particularly descriptive for a type. [1]: https://docs.rs/futures/0.3.13/futures/sink/trait.Sink.html
1 parent 4ebcf0d commit 17ea5ff

File tree

5 files changed

+961
-36
lines changed

5 files changed

+961
-36
lines changed

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,27 @@ categories = ["asynchronous", "web-programming"]
1818
maintenance = { status = "actively-developed" }
1919

2020
[features]
21-
default = []
21+
default = ["sink"]
2222
# Publishers
2323
google = ["base64", "yup-oauth2", "hyper", "http", "serde_json", "serde", "serde/derive", "uuid/serde"]
2424
# Validators
2525
json-schema = ["valico", "serde_json", "serde"]
2626
protobuf = ["prost"]
2727

28+
sink = ["futures-util/sink", "either"]
29+
2830
[[example]]
2931
name = "publish"
3032
required-features = ["google", "json-schema"]
3133

3234
[dependencies]
3335
futures-util = { version = "0.3", features = ["std"], default-features = false }
36+
pin-project = "1"
3437
thiserror = { version = "1", default-features = false }
3538
url = { version = "2", default-features = false }
3639
uuid = { version = "^0.8", features = ["v4"], default-features = false }
3740

41+
either = { version = "1", optional = true, default-features = false }
3842
serde = { version = "^1.0", optional = true, default-features = false }
3943
serde_json = { version = "^1", features = ["std"], optional = true, default-features = false }
4044
valico = { version = "^3.2", optional = true, default-features = false }

src/lib.rs

Lines changed: 102 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
//! impl<'a> hedwig::Message for &'a UserCreatedMessage {
5959
//! type Error = hedwig::validators::JsonSchemaValidatorError;
6060
//! type Validator = hedwig::validators::JsonSchemaValidator;
61-
//! fn topic(&self) -> &'static str { "user.created" }
61+
//! fn topic(&self) -> hedwig::Topic { "user.created" }
6262
//! fn encode(self, validator: &Self::Validator)
6363
//! -> Result<hedwig::ValidatedMessage, Self::Error> {
6464
//! validator.validate(
@@ -99,21 +99,38 @@
9999
broken_intra_doc_links,
100100
clippy::all,
101101
unsafe_code,
102-
unreachable_pub,
103-
unused
102+
unreachable_pub
104103
)]
104+
#![allow(clippy::unknown_clippy_lints)]
105+
#![cfg_attr(not(test), deny(unused))]
105106
#![cfg_attr(docsrs, feature(doc_cfg))]
106107

107-
use std::{collections::BTreeMap, time::SystemTime};
108+
use std::{
109+
collections::BTreeMap,
110+
pin::Pin,
111+
task::{Context, Poll},
112+
time::SystemTime,
113+
};
108114

109-
use futures_util::stream::{self, Stream, StreamExt};
115+
use futures_util::{
116+
ready,
117+
stream::{self, Stream},
118+
};
119+
use pin_project::pin_project;
110120
use uuid::Uuid;
111121

112122
pub mod publishers;
113123
#[cfg(test)]
114124
mod tests;
115125
pub mod validators;
116126

127+
#[cfg(feature = "sink")]
128+
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
129+
pub mod sink;
130+
131+
/// A message queue topic name to which messages can be published
132+
pub type Topic = &'static str;
133+
117134
/// All errors that may be returned when operating top level APIs.
118135
#[derive(Debug, thiserror::Error)]
119136
#[non_exhaustive]
@@ -141,7 +158,7 @@ pub trait Publisher {
141158
/// Publish a batch of messages.
142159
///
143160
/// The output stream shall return a result for each message in `messages` slice in order.
144-
fn publish<'a, I>(&self, topic: &'static str, messages: I) -> Self::PublishStream
161+
fn publish<'a, I>(&self, topic: Topic, messages: I) -> Self::PublishStream
145162
where
146163
I: Iterator<Item = &'a ValidatedMessage> + DoubleEndedIterator + ExactSizeIterator;
147164
}
@@ -157,7 +174,7 @@ pub trait Message {
157174
type Validator;
158175

159176
/// Topic into which this message shall be published.
160-
fn topic(&self) -> &'static str;
177+
fn topic(&self) -> Topic;
161178

162179
/// Encode the message payload.
163180
fn encode(self, validator: &Self::Validator) -> Result<ValidatedMessage, Self::Error>;
@@ -170,6 +187,8 @@ pub type Headers = BTreeMap<String, String>;
170187
///
171188
/// The only way to construct this is via a validator.
172189
#[derive(Debug, Clone)]
190+
// derive Eq only in tests so that users can't foot-shoot an expensive == over data
191+
#[cfg_attr(test, derive(PartialEq, Eq))]
173192
pub struct ValidatedMessage {
174193
/// Unique message identifier.
175194
id: Uuid,
@@ -221,7 +240,7 @@ impl ValidatedMessage {
221240
/// A convenience builder for publishing in batches.
222241
#[derive(Default, Debug)]
223242
pub struct PublishBatch {
224-
messages: BTreeMap<&'static str, Vec<ValidatedMessage>>,
243+
messages: BTreeMap<Topic, Vec<ValidatedMessage>>,
225244
}
226245

227246
impl PublishBatch {
@@ -241,7 +260,7 @@ impl PublishBatch {
241260
}
242261

243262
/// Add an already validated message to be published in this batch.
244-
pub fn push(&mut self, topic: &'static str, validated: ValidatedMessage) -> &mut Self {
263+
pub fn push(&mut self, topic: Topic, validated: ValidatedMessage) -> &mut Self {
245264
self.messages.entry(topic).or_default().push(validated);
246265
self
247266
}
@@ -272,28 +291,83 @@ impl PublishBatch {
272291
/// Publisher-specific error types may have methods to make a decision easier.
273292
///
274293
/// [`GooglePubSubPublisher`]: publishers::GooglePubSubPublisher
275-
pub fn publish<P>(
276-
self,
277-
publisher: &P,
278-
) -> impl Stream<
279-
Item = (
280-
Result<P::MessageId, P::MessageError>,
281-
&'static str,
282-
ValidatedMessage,
283-
),
284-
>
294+
pub fn publish<P>(self, publisher: &P) -> PublishBatchStream<P::PublishStream>
285295
where
286296
P: Publisher,
287297
P::PublishStream: Unpin,
288298
{
289-
self.messages
290-
.into_iter()
291-
.map(|(topic, msgs)| {
292-
publisher
293-
.publish(topic, msgs.iter())
294-
.zip(stream::iter(msgs.into_iter()))
295-
.map(move |(r, m)| (r, topic, m))
296-
})
297-
.collect::<stream::SelectAll<_>>()
299+
PublishBatchStream(
300+
self.messages
301+
.into_iter()
302+
.map(|(topic, msgs)| TopicPublishStream::new(topic, msgs, publisher))
303+
.collect::<stream::SelectAll<_>>(),
304+
)
305+
}
306+
}
307+
308+
/// The stream returned by the method [`PublishBatch::publish`](PublishBatch::publish)
309+
// This stream and TopicPublishStream are made explicit types instead of combinators like
310+
// map/zip/etc so that callers can refer to a concrete return type instead of `impl Stream`
311+
#[pin_project]
312+
#[derive(Debug)]
313+
pub struct PublishBatchStream<P>(#[pin] stream::SelectAll<TopicPublishStream<P>>);
314+
315+
impl<P> Stream for PublishBatchStream<P>
316+
where
317+
P: Stream + Unpin,
318+
{
319+
type Item = (P::Item, Topic, ValidatedMessage);
320+
321+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
322+
self.project().0.poll_next(cx)
323+
}
324+
}
325+
326+
#[pin_project]
327+
#[derive(Debug)]
328+
struct TopicPublishStream<P> {
329+
topic: Topic,
330+
messages: std::vec::IntoIter<ValidatedMessage>,
331+
332+
#[pin]
333+
publish_stream: P,
334+
}
335+
336+
impl<P> TopicPublishStream<P> {
337+
fn new<Pub>(topic: Topic, messages: Vec<ValidatedMessage>, publisher: &Pub) -> Self
338+
where
339+
Pub: Publisher<PublishStream = P>,
340+
P: Stream<Item = Result<Pub::MessageId, Pub::MessageError>>,
341+
{
342+
let publish_stream = publisher.publish(topic, messages.iter());
343+
Self {
344+
topic,
345+
messages: messages.into_iter(),
346+
publish_stream,
347+
}
348+
}
349+
}
350+
351+
impl<P> Stream for TopicPublishStream<P>
352+
where
353+
P: Stream,
354+
{
355+
type Item = (P::Item, Topic, ValidatedMessage);
356+
357+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
358+
let this = self.project();
359+
360+
// `map` has lifetime constraints that aren't nice here
361+
#[allow(clippy::manual_map)]
362+
Poll::Ready(match ready!(this.publish_stream.poll_next(cx)) {
363+
None => None,
364+
Some(stream_item) => Some((
365+
stream_item,
366+
this.topic,
367+
this.messages
368+
.next()
369+
.expect("should be as many messages as publishes"),
370+
)),
371+
})
298372
}
299373
}

src/publishers/googlepubsub.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use futures_util::stream::{Stream, StreamExt, TryStreamExt};
22
use std::{borrow::Cow, pin::Pin, sync::Arc, task, time::SystemTime};
33
use yup_oauth2::authenticator::Authenticator;
44

5-
use crate::ValidatedMessage;
5+
use crate::{Topic, ValidatedMessage};
66

77
const AUTH_SCOPES: [&str; 1] = ["https://www.googleapis.com/auth/pubsub"];
88
const JSON_METATYPE: &str = "application/json";
@@ -282,7 +282,7 @@ where
282282
type MessageError = GooglePubSubError;
283283
type PublishStream = GooglePubSubPublishStream;
284284

285-
fn publish<'a, I>(&self, topic: &'static str, messages: I) -> Self::PublishStream
285+
fn publish<'a, I>(&self, topic: Topic, messages: I) -> Self::PublishStream
286286
where
287287
I: Iterator<Item = &'a ValidatedMessage>,
288288
{

src/publishers/mock.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use crate::{Publisher, ValidatedMessage};
1+
use crate::{Publisher, Topic, ValidatedMessage};
22

33
use std::{
44
pin::Pin,
5-
sync::{Arc, Mutex},
5+
sync::{Arc, Mutex, MutexGuard},
66
task,
77
};
88

@@ -21,7 +21,7 @@ use uuid::Uuid;
2121
/// let publisher_view = publisher.clone();
2222
/// ```
2323
#[derive(Debug, Default, Clone)]
24-
pub struct MockPublisher(Arc<Mutex<Vec<(&'static str, ValidatedMessage)>>>);
24+
pub struct MockPublisher(Arc<Mutex<Vec<(Topic, ValidatedMessage)>>>);
2525

2626
impl MockPublisher {
2727
/// Create a new mock publisher.
@@ -45,7 +45,7 @@ impl MockPublisher {
4545
/// be published, was indeed published
4646
///
4747
/// Panics if the message was not published.
48-
pub fn assert_message_published(&self, topic: &'static str, uuid: &Uuid) {
48+
pub fn assert_message_published(&self, topic: Topic, uuid: &Uuid) {
4949
{
5050
let lock = self.0.lock().expect("this mutex cannot get poisoned");
5151
for (mt, msg) in &lock[..] {
@@ -59,14 +59,30 @@ impl MockPublisher {
5959
uuid, topic
6060
);
6161
}
62+
63+
/// Get a view over the messages that have been published to this publisher
64+
pub fn messages(&self) -> Messages {
65+
Messages(self.0.lock().expect("lock poisoned!"))
66+
}
67+
}
68+
69+
/// A view over the messages in a `MockPublisher`, returned by
70+
/// [`messages`](MockPublisher::messages)
71+
pub struct Messages<'a>(MutexGuard<'a, Vec<(Topic, ValidatedMessage)>>);
72+
73+
impl<'a> Messages<'a> {
74+
/// Get an iterator over the messages in the `MockPublisher`
75+
pub fn iter(&self) -> impl Iterator<Item = &(Topic, ValidatedMessage)> {
76+
self.0.iter()
77+
}
6278
}
6379

6480
impl Publisher for MockPublisher {
6581
type MessageId = Uuid;
6682
type MessageError = std::convert::Infallible;
6783
type PublishStream = MockPublishStream;
6884

69-
fn publish<'a, I>(&self, topic: &'static str, messages: I) -> Self::PublishStream
85+
fn publish<'a, I>(&self, topic: Topic, messages: I) -> Self::PublishStream
7086
where
7187
I: Iterator<Item = &'a ValidatedMessage> + ExactSizeIterator,
7288
{

0 commit comments

Comments
 (0)