Skip to content

Commit

Permalink
chore: tracker changes to support serving
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Feb 20, 2025
1 parent ae6c2a6 commit 1923822
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 55 deletions.
37 changes: 24 additions & 13 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,17 @@ impl MapHandle {
match result {
Ok(Ok(mapped_messages)) => {
// update the tracker with the number of messages sent and send the mapped messages
for message in mapped_messages.iter() {
tracker_handle
.update(offset.clone(), message.tags.clone())
.await
.expect("failed to update tracker");
}
tracker_handle
.update(
offset.clone(),
mapped_messages.iter().map(|m| m.tags.clone()).collect(),
)
.await
.expect("failed to update tracker");

// done with the batch
tracker_handle
.update_eof(offset)
.eof(offset)
.await
.expect("failed to update eof");
// send messages downstream
Expand Down Expand Up @@ -504,12 +506,15 @@ impl MapHandle {
if offset.is_none() {
offset = Some(message.offset.clone());
}
tracker_handle
.update(message.offset.clone(), message.tags.clone())
.await?;
}
if let Some(offset) = offset {
tracker_handle.update_eof(offset).await?;
tracker_handle
.update(
offset.clone(),
mapped_messages.iter().map(|m| m.tags.clone()).collect(),
)
.await?;
tracker_handle.eof(offset).await?;
}
for mapped_message in mapped_messages {
output_tx
Expand Down Expand Up @@ -570,13 +575,19 @@ impl MapHandle {
return;
}

// we need update the tracker with no responses, because unlike unary and batch, we cannot update the
// responses here we will have to append the responses.
tracker_handle
.update(read_msg.offset.clone(), vec![])
.await
.expect("failed to update tracker");
loop {
tokio::select! {
result = receiver.recv() => {
match result {
Some(Ok(mapped_message)) => {
tracker_handle
.update(mapped_message.offset.clone(), mapped_message.tags.clone())
.append(mapped_message.offset.clone(), mapped_message.tags.clone())
.await
.expect("failed to update tracker");
output_tx.send(mapped_message).await.expect("failed to send response");
Expand Down Expand Up @@ -607,7 +618,7 @@ impl MapHandle {
}

tracker_handle
.update_eof(read_msg.offset)
.eof(read_msg.offset)
.await
.expect("failed to update eof");
});
Expand Down
2 changes: 0 additions & 2 deletions rust/numaflow-core/src/source/serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ impl super::SourceReader for ServingSource {

impl super::SourceAcker for ServingSource {
/// HTTP response is sent only once we have confirmation that the message has been written to the ISB.
// TODO: Current implementation only works for `/v1/process/async` endpoint.
// For `/v1/process/{sync,sync_serve}` endpoints: https://github.com/numaproj/numaflow/issues/2308
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> {
let mut serving_offsets = vec![];
for offset in offsets {
Expand Down
93 changes: 60 additions & 33 deletions rust/numaflow-core/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,19 @@ enum ActorMessage {
},
Update {
offset: Offset,
responses: Option<Vec<String>>,
responses: Vec<Option<Vec<String>>>,
},
Append {
offset: Offset,
response: Option<Vec<String>>,
},
Delete {
offset: Offset,
},
Discard {
offset: Offset,
},
UpdateEOF {
EOF {
offset: Offset,
},
#[cfg(test)]
Expand Down Expand Up @@ -93,8 +97,8 @@ impl TryFrom<&Message> for ServingCallbackInfo {
.get(DEFAULT_CALLBACK_URL_HEADER_KEY)
.ok_or_else(|| {
Error::Source(format!(
"{DEFAULT_CALLBACK_URL_HEADER_KEY} header is not present in the message headers",
))
"{DEFAULT_CALLBACK_URL_HEADER_KEY} header is not present in the message headers",
))
})?
.to_owned();
let uuid = message
Expand All @@ -114,17 +118,11 @@ impl TryFrom<&Message> for ServingCallbackInfo {
.previous_vertex
.clone();

let mut msg_tags = None;
if let Some(ref tags) = message.tags {
if !tags.is_empty() {
msg_tags = Some(tags.iter().cloned().collect());
}
};
Ok(ServingCallbackInfo {
id: uuid,
callback_url,
from_vertex,
responses: vec![msg_tags],
responses: vec![None],
})
}
}
Expand Down Expand Up @@ -174,15 +172,18 @@ impl Tracker {
ActorMessage::Update { offset, responses } => {
self.handle_update(offset, responses);
}
ActorMessage::UpdateEOF { offset } => {
self.handle_update_eof(offset).await;
ActorMessage::EOF { offset } => {
self.handle_eof(offset).await;
}
ActorMessage::Delete { offset } => {
self.handle_delete(offset).await;
}
ActorMessage::Discard { offset } => {
self.handle_discard(offset).await;
}
ActorMessage::Append { offset, response } => {
self.handle_append(offset, response);
}
#[cfg(test)]
ActorMessage::IsEmpty { respond_to } => {
let is_empty = self.entries.is_empty();
Expand Down Expand Up @@ -215,19 +216,31 @@ impl Tracker {
}

/// Updates an existing entry in the tracker with the number of expected messages for this offset.
fn handle_update(&mut self, offset: Offset, responses: Option<Vec<String>>) {
fn handle_update(&mut self, offset: Offset, responses: Vec<Option<Vec<String>>>) {
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};

entry.count += responses.len();
if let Some(cb) = entry.serving_callback_info.as_mut() {
cb.responses = responses;
}
}

/// Appends a response to the serving callback info for the given offset.
fn handle_append(&mut self, offset: Offset, response: Option<Vec<String>>) {
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};

entry.count += 1;
if let Some(cb) = entry.serving_callback_info.as_mut() {
cb.responses.push(responses);
cb.responses.push(response);
}
}

/// Update whether we have seen the eof (end of stream) for this offset.
async fn handle_update_eof(&mut self, offset: Offset) {
async fn handle_eof(&mut self, offset: Offset) {
let Some(entry) = self.entries.get_mut(&offset) else {
return;
};
Expand Down Expand Up @@ -368,21 +381,35 @@ impl TrackerHandle {
/// Informs the tracker that a new message has been generated. The tracker should contain
/// and entry for this message's offset.
pub(crate) async fn update(
&self,
offset: Offset,
response_tags: Vec<Option<Arc<[String]>>>,
) -> Result<()> {
let responses = response_tags
.into_iter()
.map(|tags| tags.map(|tags| tags.iter().map(|tag| tag.to_string()).collect()))
.collect();
let message = ActorMessage::Update { offset, responses };
self.sender
.send(message)
.await
.map_err(|e| Error::Tracker(format!("{:?}", e)))?;
Ok(())
}

pub(crate) async fn append(
&self,
offset: Offset,
message_tags: Option<Arc<[String]>>,
) -> Result<()> {
let responses: Option<Vec<String>> = match (self.enable_callbacks, message_tags) {
(true, Some(tags)) => {
if !tags.is_empty() {
Some(tags.iter().cloned().collect::<Vec<String>>())
} else {
None
}
}
_ => None,
let tags: Option<Vec<String>> = match message_tags {
Some(tags) => Some(tags.to_vec()),
None => None,
};
let message = ActorMessage::Append {
offset,
response: tags,
};
let message = ActorMessage::Update { offset, responses };
self.sender
.send(message)
.await
Expand All @@ -391,8 +418,8 @@ impl TrackerHandle {
}

/// Updates the EOF status for an offset in the Tracker
pub(crate) async fn update_eof(&self, offset: Offset) -> Result<()> {
let message = ActorMessage::UpdateEOF { offset };
pub(crate) async fn eof(&self, offset: Offset) -> Result<()> {
let message = ActorMessage::EOF { offset };
self.sender
.send(message)
.await
Expand Down Expand Up @@ -523,10 +550,10 @@ mod tests {

// Update the message
handle
.update(message.offset.clone(), message.tags.clone())
.update(message.offset.clone(), vec![message.tags.clone()])
.await
.unwrap();
handle.update_eof(message.offset.clone()).await.unwrap();
handle.eof(message.offset.clone()).await.unwrap();

// Delete the message
handle.delete(message.offset).await.unwrap();
Expand Down Expand Up @@ -566,7 +593,7 @@ mod tests {
// Update the message with a count of 3
for message in messages {
handle
.update(message.offset.clone(), message.tags.clone())
.update(message.offset.clone(), vec![message.tags.clone()])
.await
.unwrap();
}
Expand Down Expand Up @@ -646,7 +673,7 @@ mod tests {
let messages: Vec<Message> = std::iter::repeat(message.clone()).take(3).collect();
for message in messages {
handle
.update(message.offset.clone(), message.tags.clone())
.update(message.offset.clone(), vec![message.tags.clone()])
.await
.unwrap();
}
Expand Down Expand Up @@ -744,7 +771,7 @@ mod tests {

// Insert a new message
handle.insert(&message, ack_send).await.unwrap();
handle.update_eof(offset).await.unwrap();
handle.eof(offset).await.unwrap();

// Verify that the message was discarded and Ack was received
let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap();
Expand Down
16 changes: 10 additions & 6 deletions rust/numaflow-core/src/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,16 @@ impl Transformer {
.await?;

// update the tracker with the number of responses for each message
for message in transformed_messages.iter() {
tracker_handle
.update(read_msg.offset.clone(), message.tags.clone())
.await?;
}
tracker_handle.update_eof(read_msg.offset.clone()).await?;
tracker_handle
.update(
read_msg.offset.clone(),
transformed_messages
.iter()
.map(|m| m.tags.clone())
.collect(),
)
.await?;
tracker_handle.eof(read_msg.offset.clone()).await?;

Ok::<Vec<Message>, Error>(transformed_messages)
})
Expand Down
3 changes: 2 additions & 1 deletion rust/serving/src/app/callback.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use axum::{body::Bytes, extract::State, http::HeaderMap, routing, Json, Router};
use tracing::error;
use tracing::{error, info};

use self::store::Store;
use crate::app::response::ApiError;
Expand Down Expand Up @@ -61,6 +61,7 @@ async fn callback<T: Send + Sync + Clone + Store>(
State(app_state): State<CallbackAppState<T>>,
Json(payload): Json<Vec<Callback>>,
) -> Result<(), ApiError> {
info!(?payload, "Received callback request");
app_state
.callback_state
.clone()
Expand Down

0 comments on commit 1923822

Please sign in to comment.