Skip to content

Commit

Permalink
Merge new changes for rust
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Sep 30, 2024
1 parent ac7b33b commit a04f9cd
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 97 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ log = "0.4.22"

[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "0c1682864a4b906fab52e149cfd7cacc679ce688" }
numaflow = { git = "https://github.com/BulkBeing/numaflow-rs.git", rev = "6eb7f3865d42a8ab11ade51622dc4d8feda25b5e" }

[build-dependencies]
tonic-build = "0.12.1"
33 changes: 26 additions & 7 deletions rust/numaflow-core/proto/sourcetransform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,36 @@ service SourceTransform {
// SourceTransformFn applies a function to each request element.
// In addition to map function, SourceTransformFn also supports assigning a new event time to response.
// SourceTransformFn can be used only at source vertex by source data transformer.
rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse);
rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/*
* Handshake message between client and server to indicate the start of transmission.
*/
message Handshake {
// Required field indicating the start of transmission.
bool sot = 1;
}


/**
* SourceTransformerRequest represents a request element.
*/
message SourceTransformRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
message Request {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// This ID is used to uniquely identify a transform request
string id = 6;
}
Request request = 1;
optional Handshake handshake = 2;
}

/**
Expand All @@ -37,11 +52,15 @@ message SourceTransformResponse {
repeated string tags = 4;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
// Handshake message between client and server to indicate the start of transmission.
optional Handshake handshake = 3;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
}
18 changes: 8 additions & 10 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use numaflow_models::models::{Backoff, MonoVertex, RetryStrategy};
use std::env;
use std::fmt::Display;
use std::sync::OnceLock;

const DEFAULT_SOURCE_SOCKET: &str = "/var/run/numaflow/source.sock";
Expand Down Expand Up @@ -53,17 +54,14 @@ impl OnFailureStrategy {
_ => Some(DEFAULT_SINK_RETRY_ON_FAIL_STRATEGY),
}
}
}

/// Converts the `OnFailureStrategy` enum variant to a String.
/// This facilitates situations where the enum needs to be displayed or logged as a string.
///
/// # Returns
/// A string representing the `OnFailureStrategy` enum variant.
fn to_string(&self) -> String {
impl Display for OnFailureStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
OnFailureStrategy::Retry => "retry".to_string(),
OnFailureStrategy::Fallback => "fallback".to_string(),
OnFailureStrategy::Drop => "drop".to_string(),
OnFailureStrategy::Retry => write!(f, "retry"),
OnFailureStrategy::Fallback => write!(f, "fallback"),
OnFailureStrategy::Drop => write!(f, "drop"),
}
}
}
Expand Down Expand Up @@ -647,4 +645,4 @@ mod tests {
let drop = OnFailureStrategy::Drop;
assert_eq!(drop.to_string(), "drop");
}
}
}
16 changes: 10 additions & 6 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use chrono::{DateTime, Utc};
use crate::error::Error;
use crate::monovertex::sink_pb::sink_request::Request;
use crate::monovertex::sink_pb::SinkRequest;
use crate::monovertex::source_pb;
use crate::monovertex::{source_pb, sourcetransform_pb};
use crate::monovertex::source_pb::{read_response, AckRequest};
use crate::monovertex::sourcetransform_pb::SourceTransformRequest;
use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp};
Expand Down Expand Up @@ -58,11 +58,15 @@ impl From<Offset> for AckRequest {
impl From<Message> for SourceTransformRequest {
fn from(message: Message) -> Self {
Self {
keys: message.keys,
value: message.value,
event_time: prost_timestamp_from_utc(message.event_time),
watermark: None,
headers: message.headers,
request: Some(sourcetransform_pb::source_transform_request::Request {
id: message.id,
keys: message.keys,
value: message.value,
event_time: prost_timestamp_from_utc(message.event_time),
watermark: None,
headers: message.headers,
}),
handshake: None,
}
}
}
Expand Down
32 changes: 10 additions & 22 deletions rust/numaflow-core/src/monovertex/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
use chrono::Utc;
use log::warn;
use std::collections::HashMap;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

use crate::config::{config, OnFailureStrategy};
use crate::error;
use crate::error::Error;
Expand All @@ -8,13 +15,6 @@ use crate::monovertex::sink_pb::Status::{Failure, Fallback, Success};
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::Source;
use crate::transformer::user_defined::SourceTransformer;
use chrono::Utc;
use log::warn;
use std::collections::HashMap;
use tokio::task::JoinSet;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

/// Forwarder is responsible for reading messages from the source, applying transformation if
/// transformer is present, writing the messages to the sink, and then acknowledging the messages
Expand Down Expand Up @@ -193,26 +193,14 @@ impl Forwarder {

// Applies transformation to the messages if transformer is present
// we concurrently apply transformation to all the messages.
async fn apply_transformer(&self, messages: Vec<Message>) -> error::Result<Vec<Message>> {
let Some(transformer_client) = &self.source_transformer else {
async fn apply_transformer(&mut self, messages: Vec<Message>) -> error::Result<Vec<Message>> {
let Some(transformer_client) = &mut self.source_transformer else {
// return early if there is no transformer
return Ok(messages);
};

let start_time = tokio::time::Instant::now();
let mut jh = JoinSet::new();
for message in messages {
let mut transformer_client = transformer_client.clone();
jh.spawn(async move { transformer_client.transform_fn(message).await });
}

let mut results = Vec::new();
while let Some(task) = jh.join_next().await {
let result = task.map_err(|e| Error::TransformerError(format!("{:?}", e)))?;
if let Some(result) = result? {
results.extend(result);
}
}
let results = transformer_client.transform_fn(messages).await?;

debug!(
"Transformer latency - {}ms",
Expand Down
Loading

0 comments on commit a04f9cd

Please sign in to comment.