Skip to content

Commit

Permalink
Unit tests monovertex config parsing
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Feb 19, 2025
1 parent 184a4e1 commit 0fbe28a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 8 deletions.
15 changes: 10 additions & 5 deletions rust/serving/src/app/jetstream_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const NUMAFLOW_RESP_ARRAY_IDX_LEN: &str = "Numaflow-Array-Index-Len";
struct ProxyState<T> {
message: mpsc::Sender<MessageWrapper>,
tid_header: String,
/// Lets the HTTP handlers know whether they are in a Monovertex or a Pipeline
monovertex: bool,
callback: state::State<T>,
}
Expand All @@ -53,14 +54,14 @@ pub(crate) async fn jetstream_proxy<T: Clone + Send + Sync + Store + 'static>(
let proxy_state = Arc::new(ProxyState {
message: state.message.clone(),
tid_header: state.settings.tid_header.clone(),
monovertex: state.settings.pipeline_spec.edges.is_empty(), // FIXME:
monovertex: state.settings.pipeline_spec.edges.is_empty(),
callback: state.callback_state.clone(),
});

let router = Router::new()
.route("/async", post(async_publish))
.route("/sync", post(sync_publish))
.route("/serve", get(serve))
.route("/fetch", get(fetch))
.with_state(proxy_state);
Ok(router)
}
Expand All @@ -70,7 +71,7 @@ struct ServeQueryParams {
id: String,
}

async fn serve<T: Send + Sync + Clone + Store>(
async fn fetch<T: Send + Sync + Clone + Store>(
State(proxy_state): State<Arc<ProxyState<T>>>,
Query(ServeQueryParams { id }): Query<ServeQueryParams>,
) -> Response {
Expand Down Expand Up @@ -267,6 +268,8 @@ async fn async_publish<T: Send + Sync + Clone + Store>(
));
}
};
// A send operation happens at the sender end of the notify channel when all callbacks are received.
// We keep the receiver alive to avoid send failure.
tokio::spawn(notify);

let (confirm_save_tx, confirm_save_rx) = oneshot::channel();
Expand All @@ -281,6 +284,8 @@ async fn async_publish<T: Send + Sync + Clone + Store>(

proxy_state.message.send(message).await.unwrap(); // FIXME:
if proxy_state.monovertex {
// A send operation happens at the sender end of the confirm_save channel when writing to Sink is successful and ACK is received for the message.
// We keep the receiver alive to avoid send failure.
tokio::spawn(confirm_save_rx);
return Ok(Json(ServeResponse::new(
"Successfully published message".to_string(),
Expand Down Expand Up @@ -620,7 +625,7 @@ mod tests {
// Get result for the request id using /serve endpoint
let req = Request::builder()
.method("GET")
.uri(format!("/serve?id={ID_VALUE}"))
.uri(format!("/fetch?id={ID_VALUE}"))
.body(axum::body::Body::empty())
.unwrap();
let response = app.clone().oneshot(req).await.unwrap();
Expand All @@ -634,7 +639,7 @@ mod tests {
// Request for an id that doesn't exist in the store
let req = Request::builder()
.method("GET")
.uri("/serve?id=unknown")
.uri("/fetch?id=unknown")
.body(axum::body::Body::empty())
.unwrap();
let response = app.oneshot(req).await.unwrap();
Expand Down
49 changes: 46 additions & 3 deletions rust/serving/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::{collections::HashMap, env};

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -114,7 +114,7 @@ pub struct CallbackStorageConfig {
impl TryFrom<HashMap<String, String>> for Settings {
type Error = Error;
fn try_from(env_vars: HashMap<String, String>) -> std::result::Result<Self, Self::Error> {
let is_monovertex = env::var("NUMAFLOW_MONO_VERTEX_OBJECT").is_ok();
let is_monovertex = env_vars.contains_key(ENV_MONOVERTEX_OBJ);
let mut settings = Settings {
host_ip: "localhost".to_string(),
pipeline_spec: PipelineDCG::monovertex(),
Expand Down Expand Up @@ -257,7 +257,7 @@ mod tests {
}

#[test]
fn test_config_parse() {
fn test_pipeline_config_parse() {
// Set up the environment variables
let env_vars = [
(ENV_NUMAFLOW_SERVING_HOST_IP, "10.2.3.5"),
Expand Down Expand Up @@ -308,4 +308,47 @@ mod tests {
};
assert_eq!(settings, expected_config);
}

#[test]
fn test_monovertex_config_parse() {
// Set up the environment variables
let env_vars = [
(ENV_NUMAFLOW_SERVING_HOST_IP, "10.2.3.5"),
(ENV_NUMAFLOW_SERVING_APP_PORT, "8443"),
(ENV_MONOVERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoidHJhbnNmb3JtZXItbW9uby12ZXJ0ZXgiLCJuYW1lc3BhY2UiOiJkZWZhdWx0IiwiY3JlYXRpb25UaW1lc3RhbXAiOm51bGx9LCJzcGVjIjp7InJlcGxpY2FzIjowLCJzb3VyY2UiOnsic2VydmluZyI6eyJhdXRoIjpudWxsLCJzZXJ2aWNlIjp0cnVlLCJtc2dJREhlYWRlcktleSI6IlgtTnVtYWZsb3ctSWQiLCJzdG9yZSI6eyJ1cmwiOiJyZWRpczovL3JlZGlzOjYzNzkifX19LCJzaW5rIjp7InVkc2luayI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJzZXJ2ZXNpbms6MC4xIiwiZW52IjpbeyJuYW1lIjoiTlVNQUZMT1dfQ0FMTEJBQ0tfVVJMX0tFWSIsInZhbHVlIjoiWC1OdW1hZmxvdy1DYWxsYmFjay1VcmwifSx7Im5hbWUiOiJOVU1BRkxPV19NU0dfSURfSEVBREVSX0tFWSIsInZhbHVlIjoiWC1OdW1hZmxvdy1JZCJ9XSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifX0sInJldHJ5U3RyYXRlZ3kiOnt9fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiZW52IjpbeyJuYW1lIjoiTlVNQUZMT1dfQ0FMTEJBQ0tfRU5BQkxFRCIsInZhbHVlIjoidHJ1ZSJ9XX0sImxpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMifSwic2NhbGUiOnt9LCJ1cGRhdGVTdHJhdGVneSI6e30sImxpZmVjeWNsZSI6e319LCJzdGF0dXMiOnsicmVwbGljYXMiOjAsImRlc2lyZWRSZXBsaWNhcyI6MCwibGFzdFVwZGF0ZWQiOm51bGwsImxhc3RTY2FsZWRBdCI6bnVsbH19"),
];

// Call the config method
let settings: Settings = env_vars
.into_iter()
.map(|(key, val)| (key.to_owned(), val.to_owned()))
.collect::<HashMap<String, String>>()
.try_into()
.unwrap();

let expected_config = Settings {
tid_header: "X-Numaflow-Id".into(),
app_listen_port: 8443,
metrics_server_listen_port: 3001,
upstream_addr: "localhost:8888".into(),
drain_timeout_secs: 10,
redis: RedisConfig {
addr: "redis://redis:6379".into(),
max_tasks: 50,
retries: 5,
retries_duration_millis: 100,
ttl_secs: Some(DEFAULT_REDIS_TTL_IN_SECS),
},
host_ip: "localhost".into(),
api_auth_token: None,
pipeline_spec: PipelineDCG {
vertices: vec![Vertex {
name: "source".into(),
}],
edges: vec![],
},
..Default::default()
};
assert_eq!(settings, expected_config);
}
}

0 comments on commit 0fbe28a

Please sign in to comment.