Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 56 additions & 46 deletions sentry_streams/src/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::committable::clone_committable;
use crate::routes::{Route, RoutedValue};
use sentry_arroyo::processing::strategies::{
CommitRequest, ProcessingStrategy, StrategyError, SubmitError,
CommitRequest, MessageRejected, ProcessingStrategy, StrategyError, SubmitError,
};
use sentry_arroyo::types::{Message, Partition};
use std::collections::{BTreeMap, HashMap};
Expand Down Expand Up @@ -67,8 +67,8 @@ impl Broadcaster {
}
}

/// Attempts to re-submit a pending message, if successful deletes it from the pending buffer.
fn retry_pending_message(
/// Attempts to submit a pending message, if successful deletes it from the pending buffer.
fn submit_pending_message(
&mut self,
message: Message<RoutedValue>,
identifier: &MessageIdentifier,
Expand All @@ -85,48 +85,14 @@ impl Broadcaster {
let msg = self.pending_messages.get(&identifier).unwrap();
// we only need to take action here if the returned error is `InvalidMessage`
if let Err(SubmitError::InvalidMessage(e)) =
self.retry_pending_message(msg.clone(), &identifier)
self.submit_pending_message(msg.clone(), &identifier)
{
self.pending_messages.remove(&identifier);
return Err(e.into());
}
}
Ok(())
}

/// Attempts to submit a message to the next step, if backpressure
/// is raised then adds the message to the pending buffer.
fn submit_to_next_step(
&mut self,
message: Message<RoutedValue>,
identifier: MessageIdentifier,
) -> Result<(), SubmitError<RoutedValue>> {
let msg_clone = message.clone();
match self.next_step.submit(message) {
Ok(..) => Ok(()),
Err(e) => {
self.pending_messages.insert(identifier, msg_clone);
Err(e)
}
}
}

// If the message is in the pending buffer, attempts to submit it to the
/// next step
fn handle_submit(
&mut self,
message: Message<RoutedValue>,
) -> Result<(), SubmitError<RoutedValue>> {
let identifier = MessageIdentifier {
route: message.payload().route.clone(),
committable: clone_committable(&message),
};
if self.pending_messages.contains_key(&identifier) {
self.retry_pending_message(message, &identifier)
} else {
self.submit_to_next_step(message, identifier)
}
}
}

impl ProcessingStrategy<RoutedValue> for Broadcaster {
Expand All @@ -135,13 +101,24 @@ impl ProcessingStrategy<RoutedValue> for Broadcaster {
self.next_step.poll()
}

/// Instead of submitting messages to the next step, `submit` puts
/// the routed message clones into an in-memory buffer that gets flushed on `poll` and `join`.
fn submit(&mut self, message: Message<RoutedValue>) -> Result<(), SubmitError<RoutedValue>> {
// To preserve ordering we block new messages even if they're not routed for this branch
if !self.pending_messages.is_empty() {
return Err(SubmitError::MessageRejected(MessageRejected { message }));
}
if self.route != message.payload().route {
return self.next_step.submit(message);
}

let unfolded_messages = generate_broadcast_messages(&self.downstream_branches, message);
for msg in unfolded_messages {
self.handle_submit(msg)?;
let msg_key = MessageIdentifier {
route: msg.payload().route.clone(),
committable: clone_committable(&msg),
};
self.pending_messages.insert(msg_key, msg);
}
Ok(())
}
Expand All @@ -159,8 +136,11 @@ impl ProcessingStrategy<RoutedValue> for Broadcaster {
#[cfg(test)]
mod tests {
use super::*;
use crate::fake_strategy::FakeStrategy;
use crate::fake_strategy::{assert_messages_match, assert_watermarks_match};
use crate::fake_strategy::{
assert_messages_match, assert_routes_match, assert_watermarks_match, submitted_payloads,
submitted_watermark_payloads,
};
use crate::fake_strategy::{submitted_routes, FakeStrategy};
use crate::messages::{RoutedValuePayload, Watermark, WatermarkMessage};
use crate::routes::Route;
use crate::testutils::{build_routed_value, make_committable};
Expand All @@ -187,10 +167,10 @@ mod tests {
let mut step = Broadcaster::new(
Box::new(next_step),
Route {
source: String::from("source"),
source: "source".to_string(),
waypoints: vec![],
},
vec![String::from("branch1"), String::from("branch2")],
vec!["branch1".to_string(), "branch2".to_string()],
);

// Assert MessageRejected adds message to pending
Expand All @@ -210,7 +190,7 @@ mod tests {
let watermark = Message::new_any_message(
RoutedValue {
route: Route {
source: String::from("source"),
source: "source".to_string(),
waypoints: vec![],
},
payload: RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark(
Expand Down Expand Up @@ -240,22 +220,52 @@ mod tests {
let _ = step.submit(message.clone());
assert_eq!(step.pending_messages.len(), 1);
let actual_messages = submitted_messages_clone.lock().unwrap();
let actual_payloads = submitted_payloads(actual_messages.deref());
assert_messages_match(
py,
vec![
"test_message".into_py_any(py).unwrap(),
"test_message".into_py_any(py).unwrap(),
],
actual_messages.deref(),
&actual_payloads,
);
let actual_routes = submitted_routes(actual_messages.deref());
assert_routes_match(
vec![
Route {
source: "source".to_string(),
waypoints: vec!["branch1".to_string()],
},
Route {
source: "source".to_string(),
waypoints: vec!["branch2".to_string()],
},
],
&actual_routes,
);

// Assert poll clears remaining pending messages
let _ = step.poll();
assert_eq!(step.pending_messages.len(), 0);
let actual_watermarks = submitted_watermarks_clone.lock().unwrap();
let watermark_payloads = submitted_watermark_payloads(actual_watermarks.deref());
assert_watermarks_match(
vec![Watermark::new(make_committable(2, 0))],
actual_watermarks.deref(),
&watermark_payloads,
);
let actual_routes = submitted_routes(actual_watermarks.deref());
assert_routes_match(
vec![
Route {
source: "source".to_string(),
waypoints: vec!["branch1".to_string()],
},
Route {
source: "source".to_string(),
waypoints: vec!["branch2".to_string()],
},
],
&actual_routes,
);
})
}
Expand Down
5 changes: 3 additions & 2 deletions sentry_streams/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ impl ProcessingStrategyFactory<KafkaPayload> for ArroyoStreamingFactory {
#[cfg(test)]
mod tests {
use super::*;
use crate::fake_strategy::assert_messages_match;
use crate::fake_strategy::FakeStrategy;
use crate::fake_strategy::{assert_messages_match, submitted_payloads};
use crate::operators::RuntimeOperator;
use crate::routes::Route;
use crate::testutils::make_lambda;
Expand Down Expand Up @@ -379,8 +379,9 @@ mod tests {
.unwrap();
let expected_messages = vec![value];
let actual_messages = submitted_messages_clone.lock().unwrap();
let actual_payloads = submitted_payloads(actual_messages.deref());

assert_messages_match(py, expected_messages, actual_messages.deref());
assert_messages_match(py, expected_messages, &actual_payloads);
})
}
}
88 changes: 64 additions & 24 deletions sentry_streams/src/fake_strategy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;
use crate::messages::{PyStreamingMessage, RoutedValuePayload, Watermark, WatermarkMessage};
#[cfg(test)]
use crate::routes::Route;
use crate::routes::RoutedValue;
use crate::utils::traced_with_gil;

Expand All @@ -13,16 +15,16 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

pub struct FakeStrategy {
pub submitted: Arc<Mutex<Vec<Py<PyAny>>>>,
pub submitted_watermarks: Arc<Mutex<Vec<Watermark>>>,
pub submitted: Arc<Mutex<Vec<RoutedValue>>>,
pub submitted_watermarks: Arc<Mutex<Vec<RoutedValue>>>,
pub reject_message: bool,
commit_request: Option<CommitRequest>,
}

impl FakeStrategy {
pub fn new(
submitted: Arc<Mutex<Vec<Py<PyAny>>>>,
submitted_watermarks: Arc<Mutex<Vec<Watermark>>>,
submitted: Arc<Mutex<Vec<RoutedValue>>>,
submitted_watermarks: Arc<Mutex<Vec<RoutedValue>>>,
reject_message: bool,
) -> Self {
Self {
Expand Down Expand Up @@ -70,26 +72,14 @@ impl ProcessingStrategy<RoutedValue> for FakeStrategy {
Some(build_commit_request(&message)),
);

let msg_payload = message.into_payload().payload;
match msg_payload {
RoutedValuePayload::WatermarkMessage(msg) => match msg {
WatermarkMessage::Watermark(watermark) => {
self.submitted_watermarks.lock().unwrap().push(watermark)
}
WatermarkMessage::PyWatermark(..) => (),
},
RoutedValuePayload::PyStreamingMessage(py_payload) => {
traced_with_gil!(|py| {
let msg = match py_payload {
PyStreamingMessage::PyAnyMessage { content } => {
content.bind(py).getattr("payload").unwrap()
}
PyStreamingMessage::RawMessage { content } => {
content.bind(py).getattr("payload").unwrap()
}
};
self.submitted.lock().unwrap().push(msg.unbind());
});
match message.payload().payload {
RoutedValuePayload::WatermarkMessage(..) => self
.submitted_watermarks
.lock()
.unwrap()
.push(message.into_payload()),
RoutedValuePayload::PyStreamingMessage(..) => {
self.submitted.lock().unwrap().push(message.into_payload())
}
}
Ok(())
Expand Down Expand Up @@ -140,3 +130,53 @@ pub fn assert_watermarks_match(expected_messages: Vec<Watermark>, actual_message
assert_eq!(actual, expected);
}
}

#[cfg(test)]
pub fn assert_routes_match(expected_routes: Vec<Route>, actual_routes: &[Route]) {
for (actual, expected) in actual_routes.iter().zip(expected_routes.iter()) {
assert_eq!(actual, expected);
}
}

#[cfg(test)]
pub fn submitted_payloads(messages: &Vec<RoutedValue>) -> Vec<Py<PyAny>> {
let mut ret = vec![];
traced_with_gil!(|py| {
for msg in messages {
if let RoutedValuePayload::PyStreamingMessage(py_msg) = msg.payload.clone() {
let payload = match py_msg {
PyStreamingMessage::PyAnyMessage { content } => {
content.bind(py).getattr("payload").unwrap()
}
PyStreamingMessage::RawMessage { content } => {
content.bind(py).getattr("payload").unwrap()
}
};
ret.push(payload.unbind());
}
}
});
ret
}

#[cfg(test)]
pub fn submitted_watermark_payloads(watermarks: &Vec<RoutedValue>) -> Vec<Watermark> {
let mut ret = vec![];
for msg in watermarks {
if let RoutedValuePayload::WatermarkMessage(WatermarkMessage::Watermark(watermark)) =
msg.payload.clone()
{
ret.push(watermark);
}
}
ret
}

#[cfg(test)]
pub fn submitted_routes(messages: &Vec<RoutedValue>) -> Vec<Route> {
let mut ret = vec![];
for msg in messages {
ret.push(msg.route.clone())
}
ret
}
8 changes: 6 additions & 2 deletions sentry_streams/src/filter_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ impl ProcessingStrategy<RoutedValue> for Filter {
mod tests {
use super::*;
use crate::fake_strategy::assert_messages_match;
use crate::fake_strategy::submitted_payloads;
use crate::fake_strategy::submitted_watermark_payloads;
use crate::fake_strategy::FakeStrategy;
use crate::messages::Watermark;
use crate::routes::Route;
Expand Down Expand Up @@ -270,8 +272,9 @@ mod tests {
"test_message".into_py_any(py).unwrap(),
];
let actual_messages = submitted_messages_clone.lock().unwrap();
let message_payloads = submitted_payloads(actual_messages.deref());

assert_messages_match(py, expected_messages, actual_messages.deref());
assert_messages_match(py, expected_messages, &message_payloads);

let watermark_val = RoutedValue {
route: Route::new(String::from("source"), vec![]),
Expand All @@ -281,7 +284,8 @@ mod tests {
let watermark_res = strategy.submit(watermark_msg);
assert!(watermark_res.is_ok());
let watermark_messages = submitted_watermarks_clone.lock().unwrap();
assert_eq!(watermark_messages[0], Watermark::new(BTreeMap::new()));
let watermark_payloads = submitted_watermark_payloads(watermark_messages.deref());
assert_eq!(watermark_payloads[0], Watermark::new(BTreeMap::new()));
});
}
}
Loading
Loading