From 51c5b4f1c65989e755f4cca95b18e9468d406dc7 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 19 Feb 2025 08:51:10 +0530 Subject: [PATCH] Rename store trait method Signed-off-by: Sreekanth --- rust/serving/src/app.rs | 4 ++++ rust/serving/src/app/callback/state.rs | 4 ++-- rust/serving/src/app/callback/store.rs | 9 +++++---- rust/serving/src/app/callback/store/memstore.rs | 2 +- rust/serving/src/app/callback/store/redisstore.rs | 4 ++-- rust/serving/src/app/jetstream_proxy.rs | 2 +- rust/serving/src/config.rs | 2 ++ 7 files changed, 17 insertions(+), 10 deletions(-) diff --git a/rust/serving/src/app.rs b/rust/serving/src/app.rs index 2d4bee982b..9e904ba949 100644 --- a/rust/serving/src/app.rs +++ b/rust/serving/src/app.rs @@ -86,6 +86,10 @@ where // We don't need request ID for these endpoints return info_span!("request", method=?req.method(), path=req_path); } + + // Generate a tid with good enough randomness and not too long + // Example of a UUID v7: 01951b72-d0f4-711e-baba-4efe03d9cb76 + // We use the characters representing timestamp in milliseconds (without '-'), and last 5 characters for randomness. let uuid = Uuid::now_v7().to_string(); let tid = format!("{}{}{}", &uuid[..8], &uuid[10..13], &uuid[uuid.len() - 5..]); diff --git a/rust/serving/src/app/callback/state.rs b/rust/serving/src/app/callback/state.rs index 5f3b031a9f..a818803d88 100644 --- a/rust/serving/src/app/callback/state.rs +++ b/rust/serving/src/app/callback/state.rs @@ -50,7 +50,7 @@ where ) -> StoreResult<(String, oneshot::Receiver>)> { // TODO: add an entry in Redis to note that the entry has been registered - let id = self.store.register(id).await?; // FIXME: + let id = self.store.register(id).await?; let (tx, rx) = oneshot::channel(); { @@ -197,7 +197,7 @@ where )); }; - self.store.deregister(id.to_string()).await.unwrap(); // FIXME: + self.store.done(id.to_string()).await?; state .tx diff --git a/rust/serving/src/app/callback/store.rs b/rust/serving/src/app/callback/store.rs index 39813e01a9..9d1a5f4dab 100644 --- a/rust/serving/src/app/callback/store.rs +++ b/rust/serving/src/app/callback/store.rs @@ -55,11 +55,12 @@ pub(crate) type Result = std::result::Result; #[trait_variant::make(Store: Send)] #[allow(dead_code)] pub(crate) trait LocalStore { - /// Register a request id in the store. If user provides a request id, the same will be returned - /// if the same doesn't already exist in the store. An error is returned if the user-specified request id - /// already exists in the store. If the `id` is `None`, the store will generate a new unique request id. + /// Register a request id in the store. If user provides a request id, the same should be returned + /// if it doesn't already exist in the store. An error should be returned if the user-specified request id + /// already exists in the store. If the `id` is `None`, the store should generate a new unique request id. async fn register(&mut self, id: Option) -> Result; - async fn deregister(&mut self, id: String) -> Result<()>; + /// This method will be called when processing is completed for a request id. + async fn done(&mut self, id: String) -> Result<()>; async fn save(&mut self, messages: Vec) -> Result<()>; /// retrieve the callback payloads async fn retrieve_callbacks(&mut self, id: &str) -> Result>>; diff --git a/rust/serving/src/app/callback/store/memstore.rs b/rust/serving/src/app/callback/store/memstore.rs index 591c2dfac1..ef97bed502 100644 --- a/rust/serving/src/app/callback/store/memstore.rs +++ b/rust/serving/src/app/callback/store/memstore.rs @@ -30,7 +30,7 @@ impl super::Store for InMemoryStore { async fn register(&mut self, id: Option) -> StoreResult { Ok(id.unwrap_or_else(|| Uuid::now_v7().to_string())) } - async fn deregister(&mut self, _id: String) -> StoreResult<()> { + async fn done(&mut self, _id: String) -> StoreResult<()> { Ok(()) } /// Saves a vector of `PayloadToSave` into the `HashMap`. diff --git a/rust/serving/src/app/callback/store/redisstore.rs b/rust/serving/src/app/callback/store/redisstore.rs index fcd2cc8338..c2fc33b7f3 100644 --- a/rust/serving/src/app/callback/store/redisstore.rs +++ b/rust/serving/src/app/callback/store/redisstore.rs @@ -164,7 +164,7 @@ impl super::Store for RedisConnection { } } } - async fn deregister(&mut self, id: String) -> StoreResult<()> { + async fn done(&mut self, id: String) -> StoreResult<()> { let key = format!("request:{id}:status"); let status: bool = redis::cmd("SET") .arg(&key) @@ -365,7 +365,7 @@ mod tests { assert_eq!(datums.unwrap(), PipelineResult::Processing); - redis_conn.deregister(key.clone()).await.unwrap(); + redis_conn.done(key.clone()).await.unwrap(); let datums = redis_conn.retrieve_datum(&key).await.unwrap(); let PipelineResult::Completed(datums) = datums else { panic!("Expected completed results"); diff --git a/rust/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs index 192f86a407..5c85061912 100644 --- a/rust/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -321,7 +321,7 @@ mod tests { async fn register(&mut self, id: Option) -> StoreResult { Ok(id.unwrap_or_else(|| Uuid::now_v7().to_string())) } - async fn deregister(&mut self, _id: String) -> StoreResult<()> { + async fn done(&mut self, _id: String) -> StoreResult<()> { Ok(()) } async fn save(&mut self, _messages: Vec) -> StoreResult<()> { diff --git a/rust/serving/src/config.rs b/rust/serving/src/config.rs index 8c04c1d6cb..92bd768409 100644 --- a/rust/serving/src/config.rs +++ b/rust/serving/src/config.rs @@ -53,6 +53,8 @@ impl Default for RedisConfig { #[derive(Debug, Deserialize, Clone, PartialEq)] pub struct Settings { + /// The HTTP header used to communicate to the client about the unique id assigned for a request in the store + /// The client may also set the value of this header when sending the payload. pub tid_header: String, pub app_listen_port: u16, pub metrics_server_listen_port: u16,