Skip to content

Commit ff16892

Browse files
authored
Refactor Publishing (#34)
* Refactor Publishing This changeset completely refactors publishing to be based on async Sinks, and to port the googlepubsub publisher to use gRPC instead of the JSON REST api. Together, these should make the library easier to use, compose better with the broader rust async ecosystem, and provide better performance for publishing. This additionally simplifies the crate hierarchy and some of the crate features, as the divisions only made much sense when publishing and consuming were separate implementations. Now backends are expected to provide publishing and consuming together at the same time. Most of this code is adapted from the private crate standard-hedwig, which has been a proving ground for the publishing API before it could be incorporated into this open-source crate, and has seen effective operation in production. * Bump major version to 5
1 parent d834cbc commit ff16892

26 files changed

+2983
-2501
lines changed

.github/workflows/hedwig.yml

+4-6
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,9 @@ on:
1212

1313
jobs:
1414
lint:
15-
runs-on: ${{ matrix.os }}
15+
runs-on: ubuntu-latest
1616
strategy:
1717
fail-fast: false
18-
matrix:
19-
os: [ubuntu-latest, windows-latest, macOS-latest]
2018
timeout-minutes: 10
2119
steps:
2220
- uses: actions/checkout@v2
@@ -31,7 +29,7 @@ jobs:
3129
uses: actions-rs/cargo@v1
3230
with:
3331
command: clippy
34-
args: -- -Dclippy::correctness -Dclippy::complexity -Dclippy::perf -Dunsafe_code -Dunreachable_pub -Dunused
32+
args: --all-features -- -Dclippy::correctness -Dclippy::complexity -Dclippy::perf -Dunsafe_code -Dunreachable_pub -Dunused
3533

3634
doc:
3735
runs-on: ubuntu-latest
@@ -50,14 +48,14 @@ jobs:
5048
command: doc
5149
args: --all-features --manifest-path=Cargo.toml
5250
env:
53-
RUSTDOCFLAGS: --cfg docsrs -Dmissing_docs -Dbroken_intra_doc_links
51+
RUSTDOCFLAGS: --cfg docsrs -Dmissing_docs -Drustdoc::broken_intra_doc_links
5452

5553
test:
5654
runs-on: ${{ matrix.os }}
5755
strategy:
5856
fail-fast: false
5957
matrix:
60-
rust_toolchain: [nightly, stable, 1.49.0]
58+
rust_toolchain: [nightly, stable, 1.53.0]
6159
os: [ubuntu-latest, windows-latest, macOS-latest]
6260
timeout-minutes: 20
6361
steps:

Cargo.toml

+24-24
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
[package]
22
name = "hedwig"
33
# TODO: When bumping to next major version, make sure to clean up the MRV and lints we allow in CI.
4-
version = "4.1.0"
4+
version = "5.0.0"
55
authors = [
66
"Aniruddha Maru <[email protected]>",
7-
"Simonas Kazlauskas <[email protected]>"
7+
"Simonas Kazlauskas <[email protected]>",
8+
"Renar Narubin <[email protected]>",
89
]
910
edition = "2018"
1011
repository = "https://github.com/standard-ai/hedwig-rust.git"
@@ -19,50 +20,49 @@ categories = ["asynchronous", "web-programming"]
1920
maintenance = { status = "actively-developed" }
2021

2122
[features]
22-
default = ["consume", "sink"]
23+
default = []
2324

24-
# Whether publishing/consuming is enabled
25-
publish = []
26-
consume = ["async-trait", "either"]
27-
28-
# Publishers
29-
google = ["base64", "yup-oauth2", "hyper", "http", "serde_json", "serde", "serde/derive", "uuid/serde"]
25+
# Backends
26+
google = ["ya-gcp", "tracing", "parking_lot"]
27+
mock = ["async-channel", "parking_lot"]
3028

3129
# Validators
3230
json-schema = ["valico", "serde_json", "serde"]
3331
protobuf = ["prost"]
3432

35-
# Convenience API
36-
sink = ["futures-util/sink", "either", "publish"]
37-
3833
[[example]]
39-
name = "publish"
40-
required-features = ["google", "json-schema"]
34+
name = "googlepubsub"
35+
required-features = ["google", "protobuf"]
4136

4237
[dependencies]
38+
async-trait = { version = "0.1" }
4339
bytes = "1"
44-
futures-util = { version = "0.3", features = ["std"], default-features = false }
40+
either = { version = "1", features = ["use_std"], default-features = false }
41+
futures-util = { version = "0.3.17", features = ["std", "sink"], default-features = false }
4542
pin-project = "1"
4643
thiserror = { version = "1", default-features = false }
4744
url = { version = "2", default-features = false }
4845
uuid = { version = "^0.8", features = ["v4"], default-features = false }
4946

50-
async-trait = { version = "0.1", optional = true }
51-
either = { version = "1", optional = true, features = ["use_std"], default-features = false }
47+
async-channel = { version = "1.6", optional = true }
5248
serde = { version = "^1.0", optional = true, default-features = false }
5349
serde_json = { version = "^1", features = ["std"], optional = true, default-features = false }
50+
parking_lot = { version = "0.11", optional = true }
51+
prost = { version = "0.8", optional = true, features = ["std"], default-features = false }
52+
tracing = { version = "0.1.26", optional = true }
5453
valico = { version = "^3.2", optional = true, default-features = false }
55-
base64 = { version = "^0.13", optional = true, default-features = false }
56-
http = { version = "^0.2", optional = true, default-features = false }
57-
hyper = { version = "^0.14.4", optional = true, features = ["client", "stream"], default-features = false }
58-
yup-oauth2 = { version = "5.1", optional = true, features = ["hyper-rustls"], default-features = false }
59-
prost = { version = "0.7", optional = true, features = ["std"], default-features = false }
54+
ya-gcp = { version = "0.6.3", features = ["pubsub"], optional = true }
6055

6156
[dev-dependencies]
62-
hyper-tls = "0.5.0"
63-
prost = { version = "0.7", features = ["std", "prost-derive"] }
57+
async-channel = { version = "1.6" }
58+
futures-channel = "0.3.17"
59+
parking_lot = { version = "0.11" }
60+
prost = { version = "0.8", features = ["std", "prost-derive"] }
6461
tokio = { version = "1", features = ["macros", "rt"] }
62+
tonic = "0.5"
6563
serde = { version = "1", features = ["derive"] }
64+
ya-gcp = { version = "0.6.3", features = ["pubsub", "emulators"] }
65+
structopt = "0.3"
6666

6767
[package.metadata.docs.rs]
6868
all-features = true

examples/googlepubsub.rs

+238
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
//! An example of ingesting messages from a PubSub subscription, applying a
2+
//! transformation, then submitting those transformations to another PubSub topic.
3+
4+
use futures_util::{SinkExt, StreamExt, TryFutureExt};
5+
use hedwig::{
6+
googlepubsub::{
7+
AuthFlow, ClientBuilder, ClientBuilderConfig, PubSubConfig, PubSubMessage, PublishError,
8+
ServiceAccountAuth, StreamSubscriptionConfig, SubscriptionConfig, SubscriptionName,
9+
TopicConfig, TopicName,
10+
},
11+
validators, Consumer, DecodableMessage, EncodableMessage, Headers, Publisher,
12+
};
13+
use std::{error::Error as StdError, time::SystemTime};
14+
use structopt::StructOpt;
15+
16+
const USER_CREATED_TOPIC: &str = "user.created";
17+
const USER_UPDATED_TOPIC: &str = "user.updated";
18+
19+
/// The input data, representing some user being created with the given name
20+
#[derive(PartialEq, Eq, prost::Message)]
21+
struct UserCreatedMessage {
22+
#[prost(string, tag = "1")]
23+
name: String,
24+
}
25+
26+
impl EncodableMessage for UserCreatedMessage {
27+
type Error = validators::ProstValidatorError;
28+
type Validator = validators::ProstValidator;
29+
fn topic(&self) -> hedwig::Topic {
30+
USER_CREATED_TOPIC.into()
31+
}
32+
fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
33+
Ok(validator.validate(
34+
uuid::Uuid::new_v4(),
35+
SystemTime::now(),
36+
"user.created/1.0",
37+
Headers::new(),
38+
self,
39+
)?)
40+
}
41+
}
42+
43+
impl DecodableMessage for UserCreatedMessage {
44+
type Error = validators::ProstDecodeError<validators::prost::SchemaMismatchError>;
45+
type Decoder =
46+
validators::ProstDecoder<validators::prost::ExactSchemaMatcher<UserCreatedMessage>>;
47+
48+
fn decode(msg: hedwig::ValidatedMessage, decoder: &Self::Decoder) -> Result<Self, Self::Error> {
49+
decoder.decode(msg)
50+
}
51+
}
52+
53+
/// The output data, where the given user has now been assigned an ID and some metadata
54+
#[derive(PartialEq, Eq, prost::Message)]
55+
struct UserUpdatedMessage {
56+
#[prost(string, tag = "1")]
57+
name: String,
58+
59+
#[prost(int64, tag = "2")]
60+
id: i64,
61+
62+
#[prost(string, tag = "3")]
63+
metadata: String,
64+
}
65+
66+
/// The output message will carry an ack token from the input message, to ack when the output is
67+
/// successfully published, or nack on failure
68+
#[derive(Debug)]
69+
struct TransformedMessage(PubSubMessage<UserUpdatedMessage>);
70+
71+
impl EncodableMessage for TransformedMessage {
72+
type Error = validators::ProstValidatorError;
73+
type Validator = validators::ProstValidator;
74+
75+
fn topic(&self) -> hedwig::Topic {
76+
USER_UPDATED_TOPIC.into()
77+
}
78+
79+
fn encode(&self, validator: &Self::Validator) -> Result<hedwig::ValidatedMessage, Self::Error> {
80+
Ok(validator.validate(
81+
uuid::Uuid::new_v4(),
82+
SystemTime::now(),
83+
"user.updated/1.0",
84+
Headers::new(),
85+
&self.0.message,
86+
)?)
87+
}
88+
}
89+
90+
#[derive(Debug, StructOpt)]
91+
struct Args {
92+
/// The name of the pubsub project
93+
#[structopt(long)]
94+
project_name: String,
95+
}
96+
97+
#[tokio::main(flavor = "current_thread")]
98+
async fn main() -> Result<(), Box<dyn StdError>> {
99+
let args = Args::from_args();
100+
101+
println!("Building PubSub clients");
102+
103+
let builder = ClientBuilder::new(
104+
ClientBuilderConfig::new().auth_flow(AuthFlow::ServiceAccount(ServiceAccountAuth::EnvVar)),
105+
PubSubConfig::default(),
106+
)
107+
.await?;
108+
109+
let input_topic_name = TopicName::new(USER_CREATED_TOPIC);
110+
let subscription_name = SubscriptionName::new("user-metadata-updaters");
111+
112+
let output_topic_name = TopicName::new(USER_UPDATED_TOPIC);
113+
const APP_NAME: &str = "user-metadata-updater";
114+
115+
let mut publisher_client = builder
116+
.build_publisher(&args.project_name, APP_NAME)
117+
.await?;
118+
let mut consumer_client = builder.build_consumer(&args.project_name, APP_NAME).await?;
119+
120+
for topic_name in [&input_topic_name, &output_topic_name] {
121+
println!("Creating topic {:?}", topic_name);
122+
123+
publisher_client
124+
.create_topic(TopicConfig {
125+
name: topic_name.clone(),
126+
..TopicConfig::default()
127+
})
128+
.await?;
129+
}
130+
131+
println!("Creating subscription {:?}", &subscription_name);
132+
133+
consumer_client
134+
.create_subscription(SubscriptionConfig {
135+
topic: input_topic_name.clone(),
136+
name: subscription_name.clone(),
137+
..SubscriptionConfig::default()
138+
})
139+
.await?;
140+
141+
println!(
142+
"Synthesizing input messages for topic {:?}",
143+
&input_topic_name
144+
);
145+
146+
{
147+
let validator = validators::ProstValidator::new();
148+
let mut input_sink =
149+
Publisher::<UserCreatedMessage>::publish_sink(publisher_client.publisher(), validator);
150+
151+
for i in 1..=10 {
152+
let message = UserCreatedMessage {
153+
name: format!("Example Name #{}", i),
154+
};
155+
156+
input_sink.feed(message).await?;
157+
}
158+
input_sink.flush().await?;
159+
}
160+
161+
println!("Ingesting input messages, applying transformations, and publishing to destination");
162+
163+
let mut read_stream = consumer_client
164+
.stream_subscription(
165+
subscription_name.clone(),
166+
StreamSubscriptionConfig::default(),
167+
)
168+
.consume::<UserCreatedMessage>(hedwig::validators::ProstDecoder::new(
169+
hedwig::validators::prost::ExactSchemaMatcher::new("user.created/1.0"),
170+
));
171+
172+
let mut output_sink = Publisher::<TransformedMessage, _>::publish_sink_with_responses(
173+
publisher_client.publisher(),
174+
validators::ProstValidator::new(),
175+
futures_util::sink::unfold((), |_, message: TransformedMessage| async move {
176+
// if the output is successfully sent, ack the input to mark it as processed
177+
message.0.ack().await.map(|_success| ())
178+
}),
179+
);
180+
181+
for i in 1..=10 {
182+
let PubSubMessage { ack_token, message } = read_stream
183+
.next()
184+
.await
185+
.expect("stream should have 10 elements")?;
186+
187+
assert_eq!(&message.name, &format!("Example Name #{}", i));
188+
189+
let transformed = TransformedMessage(PubSubMessage {
190+
ack_token,
191+
message: UserUpdatedMessage {
192+
name: message.name,
193+
id: random_id(),
194+
metadata: "some metadata".into(),
195+
},
196+
});
197+
198+
output_sink
199+
.feed(transformed)
200+
.or_else(|publish_error| async move {
201+
// if publishing fails, nack the failed messages to allow later retries
202+
Err(match publish_error {
203+
PublishError::Publish { cause, messages } => {
204+
for failed_transform in messages {
205+
failed_transform.0.nack().await?;
206+
}
207+
Box::<dyn StdError>::from(cause)
208+
}
209+
err => Box::<dyn StdError>::from(err),
210+
})
211+
})
212+
.await?
213+
}
214+
output_sink.flush().await?;
215+
216+
println!("All messages matched and published successfully!");
217+
218+
println!("Deleting subscription {:?}", &subscription_name);
219+
220+
consumer_client
221+
.delete_subscription(subscription_name)
222+
.await?;
223+
224+
for topic_name in [input_topic_name, output_topic_name] {
225+
println!("Deleting topic {:?}", &topic_name);
226+
227+
publisher_client.delete_topic(topic_name).await?;
228+
}
229+
230+
println!("Done");
231+
232+
Ok(())
233+
}
234+
235+
fn random_id() -> i64 {
236+
4 // chosen by fair dice roll.
237+
// guaranteed to be random.
238+
}

0 commit comments

Comments
 (0)