Skip to content

Commit

Permalink
Parse vertex object spec env variable and remove the need for serving…
Browse files Browse the repository at this point in the history
… source spec as env variable

Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 31, 2025
1 parent 5a54cb6 commit 5114b4b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 38 deletions.
13 changes: 1 addition & 12 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,6 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
}

if v.IsASource() && v.Spec.Source.Serving != nil {
servingSource := v.Spec.Source.Serving
servingSourceBytes, err := json.Marshal(servingSource)
if err != nil {
return nil, errors.New("failed to marshal serving source spec")
}
encodedServingSourceSpec := base64.StdEncoding.EncodeToString(servingSourceBytes)

// Create a SimplifiedPipelineSpec and populate it with the vertex names and edges
simplifiedPipelineSpec := PipelineSpec{
Vertices: req.PipelineSpec.Vertices,
Expand All @@ -372,7 +365,6 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
containers[0].Env,
// set the serving source stream name in the environment because the numa container will be reading from it
corev1.EnvVar{Name: EnvServingJetstreamStream, Value: req.ServingSourceStreamName},
corev1.EnvVar{Name: EnvServingObject, Value: encodedServingSourceSpec},
corev1.EnvVar{Name: EnvServingMinPipelineSpec, Value: encodedPipelineSpec},
corev1.EnvVar{Name: EnvServingPort, Value: strconv.Itoa(VertexHTTPSPort)},
corev1.EnvVar{
Expand All @@ -383,12 +375,9 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
},
},
},
corev1.EnvVar{
Name: EnvServingStoreTTL,
Value: strconv.Itoa(int(servingSource.Store.GetTTL().Seconds())),
},
)

servingSource := v.Spec.Source.Serving
// if auth is configured, set the auth token in the environment
if servingSource.Auth != nil && servingSource.Auth.Token != nil {
containers[0].Env = append(containers[0].Env,
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/vertex_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ func TestGetPodSpec(t *testing.T) {
}
assert.ElementsMatch(t, envNames, []string{
"test-env", EnvNamespace, EnvPod, EnvPipelineName, EnvVertexName, EnvVertexObject, EnvReplica,
EnvCallbackEnabled, EnvServingAuthToken, EnvServingObject, EnvServingMinPipelineSpec,
EnvServingHostIP, EnvServingPort, EnvServingStoreTTL, EnvServingJetstreamStream,
EnvCallbackEnabled, EnvServingAuthToken, EnvServingMinPipelineSpec,
EnvServingHostIP, EnvServingPort, EnvServingJetstreamStream,
})

assert.Contains(t, s.Containers[0].Args, "processor")
Expand Down
2 changes: 1 addition & 1 deletion rust/serving/src/app/callback/store/redisstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ mod tests {
// if the key exists, the TTL should be set to 1 second
if exists {
let ttl: isize = conn_manager.ttl(&key).await.expect("Failed to check TTL");
assert_eq!(ttl, 1, "TTL should be set to 1 second");
assert_eq!(ttl, 86400, "TTL should be set to 1 second");
}
}
}
54 changes: 31 additions & 23 deletions rust/serving/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ use crate::{
Error::{self, ParseConfig},
};

const ENV_NUMAFLOW_SERVING_SOURCE_OBJECT: &str = "NUMAFLOW_SERVING_SOURCE_OBJECT";
const ENV_NUMAFLOW_SERVING_STORE_TTL: &str = "NUMAFLOW_SERVING_STORE_TTL";
const ENV_NUMAFLOW_SERVING_HOST_IP: &str = "NUMAFLOW_SERVING_HOST_IP";
const ENV_NUMAFLOW_SERVING_APP_PORT: &str = "NUMAFLOW_SERVING_APP_LISTEN_PORT";
const ENV_NUMAFLOW_SERVING_AUTH_TOKEN: &str = "NUMAFLOW_SERVING_AUTH_TOKEN";
const ENV_MIN_PIPELINE_SPEC: &str = "NUMAFLOW_SERVING_MIN_PIPELINE_SPEC";
const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT";

pub const DEFAULT_ID_HEADER: &str = "X-Numaflow-Id";
pub const DEFAULT_CALLBACK_URL_HEADER_KEY: &str = "X-Numaflow-Callback-Url";
Expand Down Expand Up @@ -44,7 +43,7 @@ impl Default for RedisConfig {
retries: 5,
retries_duration_millis: 100,
// TODO: we might need an option type here. Zero value of u32 can be used instead of None
ttl_secs: Some(1),
ttl_secs: Some(86400),
}
}
}
Expand Down Expand Up @@ -138,32 +137,42 @@ impl TryFrom<HashMap<String, String>> for Settings {
})?;
}

// Update redis.ttl_secs from environment variable
if let Some(ttl_secs) = env_vars.get(ENV_NUMAFLOW_SERVING_STORE_TTL) {
let ttl_secs: u32 = ttl_secs.parse().map_err(|e| {
ParseConfig(format!("parsing {ENV_NUMAFLOW_SERVING_STORE_TTL}: {e:?}"))
})?;
settings.redis.ttl_secs = Some(ttl_secs);
}

let Some(source_spec_encoded) = env_vars.get(ENV_NUMAFLOW_SERVING_SOURCE_OBJECT) else {
return Ok(settings);
// TODO: When we add support for Serving with monovertex, we should check for NUMAFLOW_MONO_VERTEX_OBJECT variable too.
let Some(source_spec_encoded) = env_vars.get(ENV_VERTEX_OBJ) else {
return Err(ParseConfig(format!(
"Environment variable {ENV_VERTEX_OBJ} is not set"
)));
};

let source_spec_decoded = BASE64_STANDARD
.decode(source_spec_encoded.as_bytes())
.map_err(|e| ParseConfig(format!("decoding NUMAFLOW_SERVING_SOURCE: {e:?}")))?;
.map_err(|e| ParseConfig(format!("decoding {ENV_VERTEX_OBJ}: {e:?}")))?;

#[derive(Deserialize)]
struct Source {
serving: Serving,
}
#[derive(Deserialize)]
struct Spec {
source: Source,
}

#[derive(Deserialize)]
struct VertexObject {
spec: Spec,
}

let source_spec = serde_json::from_slice::<Serving>(&source_spec_decoded)
.map_err(|e| ParseConfig(format!("parsing NUMAFLOW_SERVING_SOURCE: {e:?}")))?;
let vertex_obj = serde_json::from_slice::<VertexObject>(&source_spec_decoded)
.map_err(|e| ParseConfig(format!("parsing {ENV_VERTEX_OBJ}: {e:?}")))?;

// Update tid_header from source_spec
if let Some(msg_id_header_key) = source_spec.msg_id_header_key {
if let Some(msg_id_header_key) = vertex_obj.spec.source.serving.msg_id_header_key {
settings.tid_header = msg_id_header_key;
}

// Update redis.addr from source_spec, currently we only support redis as callback storage
settings.redis.addr = source_spec.callback_storage.url;
settings.redis.addr = vertex_obj.spec.source.serving.callback_storage.url;
settings.redis.ttl_secs = settings.redis.ttl_secs;

Ok(settings)
}
Expand Down Expand Up @@ -196,9 +205,8 @@ mod tests {
(ENV_NUMAFLOW_SERVING_HOST_IP, "10.2.3.5"),
(ENV_NUMAFLOW_SERVING_AUTH_TOKEN, "api-auth-token"),
(ENV_NUMAFLOW_SERVING_APP_PORT, "8443"),
(ENV_NUMAFLOW_SERVING_STORE_TTL, "86400"),
(ENV_NUMAFLOW_SERVING_SOURCE_OBJECT, "eyJhdXRoIjpudWxsLCJzZXJ2aWNlIjp0cnVlLCJtc2dJREhlYWRlcktleSI6IlgtTnVtYWZsb3ctSWQiLCJzdG9yZSI6eyJ1cmwiOiJyZWRpczovL3JlZGlzOjYzNzkifX0="),
(ENV_MIN_PIPELINE_SPEC, "eyJ2ZXJ0aWNlcyI6W3sibmFtZSI6InNlcnZpbmctaW4iLCJzb3VyY2UiOnsic2VydmluZyI6eyJhdXRoIjpudWxsLCJzZXJ2aWNlIjp0cnVlLCJtc2dJREhlYWRlcktleSI6IlgtTnVtYWZsb3ctSWQiLCJzdG9yZSI6eyJ1cmwiOiJyZWRpczovL3JlZGlzOjYzNzkifX19LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciIsImVudiI6W3sibmFtZSI6IlJVU1RfTE9HIiwidmFsdWUiOiJpbmZvIn1dfSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fSx7Im5hbWUiOiJzZXJ2aW5nLXNpbmsiLCJzaW5rIjp7InVkc2luayI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJxdWF5LmlvL251bWFpby9udW1hZmxvdy1ycy9zaW5rLWxvZzpzdGFibGUiLCJlbnYiOlt7Im5hbWUiOiJOVU1BRkxPV19DQUxMQkFDS19VUkxfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUNhbGxiYWNrLVVybCJ9LHsibmFtZSI6Ik5VTUFGTE9XX01TR19JRF9IRUFERVJfS0VZIiwidmFsdWUiOiJYLU51bWFmbG93LUlkIn1dLCJyZXNvdXJjZXMiOnt9fX0sInJldHJ5U3RyYXRlZ3kiOnt9fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fV0sImVkZ2VzIjpbeyJmcm9tIjoic2VydmluZy1pbiIsInRvIjoic2VydmluZy1zaW5rIiwiY29uZGl0aW9ucyI6bnVsbH1dLCJsaWZlY3ljbGUiOnt9LCJ3YXRlcm1hcmsiOnt9fQ==")
(ENV_MIN_PIPELINE_SPEC, "eyJ2ZXJ0aWNlcyI6W3sibmFtZSI6InNlcnZpbmctaW4iLCJzb3VyY2UiOnsic2VydmluZyI6eyJhdXRoIjpudWxsLCJzZXJ2aWNlIjp0cnVlLCJtc2dJREhlYWRlcktleSI6IlgtTnVtYWZsb3ctSWQiLCJzdG9yZSI6eyJ1cmwiOiJyZWRpczovL3JlZGlzOjYzNzkifX19LCJjb250YWluZXJUZW1wbGF0ZSI6eyJyZXNvdXJjZXMiOnt9LCJpbWFnZVB1bGxQb2xpY3kiOiJOZXZlciJ9LCJzY2FsZSI6eyJtaW4iOjF9LCJ1cGRhdGVTdHJhdGVneSI6eyJ0eXBlIjoiUm9sbGluZ1VwZGF0ZSIsInJvbGxpbmdVcGRhdGUiOnsibWF4VW5hdmFpbGFibGUiOiIyNSUifX19LHsibmFtZSI6InNlcnZlLXNpbmsiLCJzaW5rIjp7InVkc2luayI6eyJjb250YWluZXIiOnsiaW1hZ2UiOiJzZXJ2ZXNpbms6MC4xIiwiZW52IjpbeyJuYW1lIjoiTlVNQUZMT1dfQ0FMTEJBQ0tfVVJMX0tFWSIsInZhbHVlIjoiWC1OdW1hZmxvdy1DYWxsYmFjay1VcmwifSx7Im5hbWUiOiJOVU1BRkxPV19NU0dfSURfSEVBREVSX0tFWSIsInZhbHVlIjoiWC1OdW1hZmxvdy1JZCJ9XSwicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifX0sInJldHJ5U3RyYXRlZ3kiOnt9fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19fV0sImVkZ2VzIjpbeyJmcm9tIjoic2VydmluZy1pbiIsInRvIjoic2VydmUtc2luayIsImNvbmRpdGlvbnMiOm51bGx9XSwibGlmZWN5Y2xlIjp7fSwid2F0ZXJtYXJrIjp7fX0="),
(ENV_VERTEX_OBJ, "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLXNlcnZpbmctaW4iLCJuYW1lc3BhY2UiOiJkZWZhdWx0IiwiY3JlYXRpb25UaW1lc3RhbXAiOm51bGx9LCJzcGVjIjp7Im5hbWUiOiJzZXJ2aW5nLWluIiwic291cmNlIjp7InNlcnZpbmciOnsiYXV0aCI6bnVsbCwic2VydmljZSI6dHJ1ZSwibXNnSURIZWFkZXJLZXkiOiJYLU51bWFmbG93LUlkIiwic3RvcmUiOnsidXJsIjoicmVkaXM6Ly9yZWRpczo2Mzc5In19fSwiY29udGFpbmVyVGVtcGxhdGUiOnsicmVzb3VyY2VzIjp7fSwiaW1hZ2VQdWxsUG9saWN5IjoiTmV2ZXIifSwibGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjUwMCwicmVhZFRpbWVvdXQiOiIxcyIsImJ1ZmZlck1heExlbmd0aCI6MzAwMDAsImJ1ZmZlclVzYWdlTGltaXQiOjgwfSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJ0b0VkZ2VzIjpbeyJmcm9tIjoic2VydmluZy1pbiIsInRvIjoic2VydmUtc2luayIsImNvbmRpdGlvbnMiOm51bGwsImZyb21WZXJ0ZXhUeXBlIjoiU291cmNlIiwiZnJvbVZlcnRleFBhcnRpdGlvbkNvdW50IjoxLCJmcm9tVmVydGV4TGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjUwMCwicmVhZFRpbWVvdXQiOiIxcyIsImJ1ZmZlck1heExlbmd0aCI6MzAwMDAsImJ1ZmZlclVzYWdlTGltaXQiOjgwfSwidG9WZXJ0ZXhUeXBlIjoiU2luayIsInRvVmVydGV4UGFydGl0aW9uQ291bnQiOjEsInRvVmVydGV4TGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjUwMCwicmVhZFRpbWVvdXQiOiIxcyIsImJ1ZmZlck1heExlbmd0aCI6MzAwMDAsImJ1ZmZlclVzYWdlTGltaXQiOjgwfX1dLCJ3YXRlcm1hcmsiOnsibWF4RGVsYXkiOiIwcyJ9fSwic3RhdHVzIjp7InBoYXNlIjoiIiwicmVwbGljYXMiOjAsImRlc2lyZWRSZXBsaWNhcyI6MCwibGFzdFNjYWxlZEF0IjpudWxsfX0=")
];

// Call the config method
Expand Down Expand Up @@ -230,12 +238,12 @@ mod tests {
name: "serving-in".into(),
},
Vertex {
name: "serving-sink".into(),
name: "serve-sink".into(),
},
],
edges: vec![Edge {
from: "serving-in".into(),
to: "serving-sink".into(),
to: "serve-sink".into(),
conditions: None,
}],
},
Expand Down

0 comments on commit 5114b4b

Please sign in to comment.