Skip to content

Commit b3695ec

Browse files
committed
Reflect user types in retries and responses
This change adds mechanisms to 1) inform users when a (possibly buffered) message has been published, and 2) retry operations, both now in the user's message type. Previously retries were only possible in an opaque pubsub grpc type, and the publish listening wasn't available.
1 parent d8a78fc commit b3695ec

File tree

10 files changed

+1192
-285
lines changed

10 files changed

+1192
-285
lines changed

Cargo.toml

+9-5
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ maintenance = { status = "actively-developed" }
2323
default = []
2424

2525
# Backends
26-
google = ["ya-gcp", "tracing"]
27-
mock = ["async-channel"]
26+
google = ["ya-gcp", "tracing", "parking_lot"]
27+
mock = ["async-channel", "parking_lot"]
2828

2929
# Validators
3030
json-schema = ["valico", "serde_json", "serde"]
3131
protobuf = ["prost"]
3232

3333
[[example]]
3434
name = "googlepubsub"
35-
required-features = ["google", "json-schema"]
35+
required-features = ["google", "protobuf"]
3636

3737
[dependencies]
3838
async-trait = { version = "0.1" }
@@ -47,17 +47,21 @@ uuid = { version = "^0.8", features = ["v4"], default-features = false }
4747
async-channel = { version = "1.6", optional = true }
4848
serde = { version = "^1.0", optional = true, default-features = false }
4949
serde_json = { version = "^1", features = ["std"], optional = true, default-features = false }
50-
ya-gcp = { version = "0.6", features = ["pubsub"], optional = true }
50+
parking_lot = { version = "0.11", optional = true }
5151
prost = { version = "0.8", optional = true, features = ["std"], default-features = false }
5252
tracing = { version = "0.1.26", optional = true }
5353
valico = { version = "^3.2", optional = true, default-features = false }
54+
ya-gcp = { version = "0.6.3", features = ["pubsub"], optional = true }
5455

5556
[dev-dependencies]
5657
async-channel = { version = "1.6" }
58+
futures-channel = "0.3.17"
59+
parking_lot = { version = "0.11" }
5760
prost = { version = "0.8", features = ["std", "prost-derive"] }
5861
tokio = { version = "1", features = ["macros", "rt"] }
62+
tonic = "0.5"
5963
serde = { version = "1", features = ["derive"] }
60-
ya-gcp = { version = "0.6", features = ["pubsub", "emulators"] }
64+
ya-gcp = { version = "0.6.3", features = ["pubsub", "emulators"] }
6165
structopt = "0.3"
6266

6367
[package.metadata.docs.rs]

examples/googlepubsub.rs

+140-47
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,41 @@
1-
use futures_util::{SinkExt, StreamExt};
1+
//! An example of ingesting messages from a PubSub subscription, applying a
2+
//! transformation, then submitting those transformations to another PubSub topic.
3+
4+
use futures_util::{SinkExt, StreamExt, TryFutureExt};
25
use hedwig::{
36
googlepubsub::{
4-
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, ServiceAccountAuth,
5-
StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
7+
AcknowledgeToken, AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig,
8+
PubSubMessage, PublishError, ServiceAccountAuth, StreamSubscriptionConfig,
9+
SubscriptionConfig, SubscriptionName, TopicConfig, TopicName,
610
},
711
validators, Consumer, DecodableMessage, EncodableMessage, Headers, Publisher,
812
};
9-
use std::time::SystemTime;
13+
use std::{error::Error as StdError, time::SystemTime};
1014
use structopt::StructOpt;
1115

12-
#[derive(Clone, PartialEq, Eq, prost::Message)]
16+
const USER_CREATED_TOPIC: &str = "user.created";
17+
const USER_UPDATED_TOPIC: &str = "user.updated";
18+
19+
/// The input data, representing some user being created with the given name
20+
#[derive(PartialEq, Eq, prost::Message)]
1321
struct UserCreatedMessage {
1422
#[prost(string, tag = "1")]
15-
payload: String,
23+
name: String,
1624
}
1725

18-
impl<'a> EncodableMessage for UserCreatedMessage {
26+
impl EncodableMessage for UserCreatedMessage {
1927
type Error = validators::ProstValidatorError;
2028
type Validator = validators::ProstValidator;
2129
fn topic(&self) -> hedwig::Topic {
22-
"user.created".into()
30+
USER_CREATED_TOPIC.into()
2331
}
24-
fn encode(self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
32+
fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
2533
Ok(validator.validate(
2634
uuid::Uuid::new_v4(),
2735
SystemTime::now(),
2836
"user.created/1.0",
2937
Headers::new(),
30-
&self,
38+
self,
3139
)?)
3240
}
3341
}
@@ -42,6 +50,46 @@ impl DecodableMessage for UserCreatedMessage {
4250
}
4351
}
4452

53+
/// The output data, where the given user has now been assigned an ID and some metadata
54+
#[derive(PartialEq, Eq, prost::Message)]
55+
struct UserUpdatedMessage {
56+
#[prost(string, tag = "1")]
57+
name: String,
58+
59+
#[prost(int64, tag = "2")]
60+
id: i64,
61+
62+
#[prost(string, tag = "3")]
63+
metadata: String,
64+
}
65+
66+
#[derive(Debug)]
67+
struct TransformedMessage {
68+
// keep the input's ack token to ack when the output is successfully published, or nack on
69+
// failure
70+
input_token: AcknowledgeToken,
71+
output: UserUpdatedMessage,
72+
}
73+
74+
impl EncodableMessage for TransformedMessage {
75+
type Error = validators::ProstValidatorError;
76+
type Validator = validators::ProstValidator;
77+
78+
fn topic(&self) -> hedwig::Topic {
79+
USER_UPDATED_TOPIC.into()
80+
}
81+
82+
fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
83+
Ok(validator.validate(
84+
uuid::Uuid::new_v4(),
85+
SystemTime::now(),
86+
"user.updated/1.0",
87+
Headers::new(),
88+
&self.output,
89+
)?)
90+
}
91+
}
92+
4593
#[derive(Debug, StructOpt)]
4694
struct Args {
4795
/// The name of the pubsub project
@@ -50,7 +98,7 @@ struct Args {
5098
}
5199

52100
#[tokio::main(flavor = "current_thread")]
53-
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
101+
async fn main() -> Result<(), Box<dyn StdError>> {
54102
let args = Args::from_args();
55103

56104
println!("Building PubSub clients");
@@ -61,49 +109,59 @@ async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
61109
)
62110
.await?;
63111

64-
let topic_name = TopicName::new("pubsub_example_topic");
65-
let subscription_name = SubscriptionName::new("pubsub_example_subscription");
112+
let input_topic_name = TopicName::new(USER_CREATED_TOPIC);
113+
let subscription_name = SubscriptionName::new("user-metadata-updaters");
114+
115+
let output_topic_name = TopicName::new(USER_UPDATED_TOPIC);
116+
const APP_NAME: &str = "user-metadata-updater";
66117

67118
let mut publisher_client = builder
68-
.build_publisher(&args.project_name, "myapp_publisher")
69-
.await?;
70-
let mut consumer_client = builder
71-
.build_consumer(&args.project_name, "myapp_consumer")
119+
.build_publisher(&args.project_name, APP_NAME)
72120
.await?;
121+
let mut consumer_client = builder.build_consumer(&args.project_name, APP_NAME).await?;
73122

74-
println!("Creating topic {:?}", &topic_name);
123+
for topic_name in [&input_topic_name, &output_topic_name] {
124+
println!("Creating topic {:?}", topic_name);
75125

76-
publisher_client
77-
.create_topic(TopicConfig {
78-
name: topic_name.clone(),
79-
..TopicConfig::default()
80-
})
81-
.await?;
126+
publisher_client
127+
.create_topic(TopicConfig {
128+
name: topic_name.clone(),
129+
..TopicConfig::default()
130+
})
131+
.await?;
132+
}
82133

83134
println!("Creating subscription {:?}", &subscription_name);
84135

85136
consumer_client
86137
.create_subscription(SubscriptionConfig {
87-
topic: topic_name.clone(),
138+
topic: input_topic_name.clone(),
88139
name: subscription_name.clone(),
89140
..SubscriptionConfig::default()
90141
})
91142
.await?;
92143

93-
println!("Publishing message to topic");
144+
println!(
145+
"Synthesizing input messages for topic {:?}",
146+
&input_topic_name
147+
);
94148

95-
let validator = hedwig::validators::ProstValidator::new();
96-
let mut publisher = publisher_client.publisher().publish_sink(validator);
149+
{
150+
let validator = validators::ProstValidator::new();
151+
let mut input_sink =
152+
Publisher::<UserCreatedMessage>::publish_sink(publisher_client.publisher(), validator);
97153

98-
for i in 1..=10 {
99-
let message = UserCreatedMessage {
100-
payload: format!("this is message #{}", i),
101-
};
154+
for i in 1..=10 {
155+
let message = UserCreatedMessage {
156+
name: format!("Example Name #{}", i),
157+
};
102158

103-
publisher.send(message).await?;
159+
input_sink.feed(message).await?;
160+
}
161+
input_sink.flush().await?;
104162
}
105163

106-
println!("Reading back published message");
164+
println!("Ingesting input messages, applying transformations, and publishing to destination");
107165

108166
let mut read_stream = consumer_client
109167
.stream_subscription(
@@ -114,35 +172,70 @@ async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
114172
hedwig::validators::prost::ExactSchemaMatcher::new("user.created/1.0"),
115173
));
116174

175+
let mut output_sink = Publisher::<TransformedMessage, _>::publish_sink_with_responses(
176+
publisher_client.publisher(),
177+
validators::ProstValidator::new(),
178+
futures_util::sink::unfold((), |_, message: TransformedMessage| async move {
179+
// if the output is successfully sent, ack the input to mark it as processed
180+
message.input_token.ack().await
181+
}),
182+
);
183+
117184
for i in 1..=10 {
118-
let message = read_stream
185+
let PubSubMessage { ack_token, message } = read_stream
119186
.next()
120187
.await
121-
.ok_or("unexpected end of stream")??
122-
.ack()
123-
.await?;
188+
.expect("stream should have 10 elements")?;
124189

125-
assert_eq!(
126-
message,
127-
UserCreatedMessage {
128-
payload: format!("this is message #{}", i)
129-
}
130-
);
190+
assert_eq!(&message.name, &format!("Example Name #{}", i));
191+
192+
let transformed = TransformedMessage {
193+
output: UserUpdatedMessage {
194+
name: message.name,
195+
id: random_id(),
196+
metadata: "some metadata".into(),
197+
},
198+
input_token: ack_token,
199+
};
200+
201+
output_sink
202+
.feed(transformed)
203+
.or_else(|publish_error| async move {
204+
// if publishing fails, nack the failed messages to allow later retries
205+
Err(match publish_error {
206+
PublishError::Publish { cause, messages } => {
207+
for failed_transform in messages {
208+
failed_transform.input_token.nack().await?;
209+
}
210+
Box::<dyn StdError>::from(cause)
211+
}
212+
err => Box::<dyn StdError>::from(err),
213+
})
214+
})
215+
.await?
131216
}
217+
output_sink.flush().await?;
132218

133-
println!("All messages matched!");
219+
println!("All messages matched and published successfully!");
134220

135221
println!("Deleting subscription {:?}", &subscription_name);
136222

137223
consumer_client
138224
.delete_subscription(subscription_name)
139225
.await?;
140226

141-
println!("Deleting topic {:?}", &topic_name);
227+
for topic_name in [input_topic_name, output_topic_name] {
228+
println!("Deleting topic {:?}", &topic_name);
142229

143-
publisher_client.delete_topic(topic_name).await?;
230+
publisher_client.delete_topic(topic_name).await?;
231+
}
144232

145233
println!("Done");
146234

147235
Ok(())
148236
}
237+
238+
fn random_id() -> i64 {
239+
4 // chosen by fair dice roll.
240+
// guaranteed to be random.
241+
}

src/backends/googlepubsub/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::{borrow::Cow, fmt::Display};
77
pub use ya_gcp::{
88
grpc::StatusCodeSet,
99
pubsub::{
10-
AcknowledgeError, BuildError, Error as PubSubError, MakeConnection, ModifyAcknowledgeError,
11-
PubSubConfig, SinkError, StreamSubscriptionConfig, Uri, DEFAULT_RETRY_CODES,
10+
AcknowledgeError, AcknowledgeToken, BuildError, Error as PubSubError, MakeConnection,
11+
ModifyAcknowledgeError, PubSubConfig, SinkError, StreamSubscriptionConfig, Uri,
12+
DEFAULT_RETRY_CODES,
1213
},
1314
retry_policy, AuthFlow, ClientBuilderConfig, Connect, CreateBuilderError, DefaultConnector,
1415
ServiceAccountAuth,

0 commit comments

Comments
 (0)