Skip to content

Commit

Permalink
Rename store trait method
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Feb 19, 2025
1 parent 9b7dfe2 commit 51c5b4f
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 10 deletions.
4 changes: 4 additions & 0 deletions rust/serving/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ where
// We don't need request ID for these endpoints
return info_span!("request", method=?req.method(), path=req_path);
}

// Generate a tid with good enough randomness and not too long
// Example of a UUID v7: 01951b72-d0f4-711e-baba-4efe03d9cb76
// We use the characters representing timestamp in milliseconds (without '-'), and last 5 characters for randomness.
let uuid = Uuid::now_v7().to_string();
let tid = format!("{}{}{}", &uuid[..8], &uuid[10..13], &uuid[uuid.len() - 5..]);

Expand Down
4 changes: 2 additions & 2 deletions rust/serving/src/app/callback/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
) -> StoreResult<(String, oneshot::Receiver<Result<String, Error>>)> {
// TODO: add an entry in Redis to note that the entry has been registered

let id = self.store.register(id).await?; // FIXME:
let id = self.store.register(id).await?;

let (tx, rx) = oneshot::channel();
{
Expand Down Expand Up @@ -197,7 +197,7 @@ where
));
};

self.store.deregister(id.to_string()).await.unwrap(); // FIXME:
self.store.done(id.to_string()).await?;

state
.tx
Expand Down
9 changes: 5 additions & 4 deletions rust/serving/src/app/callback/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ pub(crate) type Result<T> = std::result::Result<T, Error>;
#[trait_variant::make(Store: Send)]
#[allow(dead_code)]
pub(crate) trait LocalStore {
/// Register a request id in the store. If user provides a request id, the same will be returned
/// if the same doesn't already exist in the store. An error is returned if the user-specified request id
/// already exists in the store. If the `id` is `None`, the store will generate a new unique request id.
/// Register a request id in the store. If user provides a request id, the same should be returned
/// if it doesn't already exist in the store. An error should be returned if the user-specified request id
/// already exists in the store. If the `id` is `None`, the store should generate a new unique request id.
async fn register(&mut self, id: Option<String>) -> Result<String>;
async fn deregister(&mut self, id: String) -> Result<()>;
/// This method will be called when processing is completed for a request id.
async fn done(&mut self, id: String) -> Result<()>;
async fn save(&mut self, messages: Vec<PayloadToSave>) -> Result<()>;
/// retrieve the callback payloads
async fn retrieve_callbacks(&mut self, id: &str) -> Result<Vec<Arc<Callback>>>;
Expand Down
2 changes: 1 addition & 1 deletion rust/serving/src/app/callback/store/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl super::Store for InMemoryStore {
async fn register(&mut self, id: Option<String>) -> StoreResult<String> {
Ok(id.unwrap_or_else(|| Uuid::now_v7().to_string()))
}
async fn deregister(&mut self, _id: String) -> StoreResult<()> {
async fn done(&mut self, _id: String) -> StoreResult<()> {
Ok(())
}
/// Saves a vector of `PayloadToSave` into the `HashMap`.
Expand Down
4 changes: 2 additions & 2 deletions rust/serving/src/app/callback/store/redisstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl super::Store for RedisConnection {
}
}
}
async fn deregister(&mut self, id: String) -> StoreResult<()> {
async fn done(&mut self, id: String) -> StoreResult<()> {
let key = format!("request:{id}:status");
let status: bool = redis::cmd("SET")
.arg(&key)
Expand Down Expand Up @@ -365,7 +365,7 @@ mod tests {

assert_eq!(datums.unwrap(), PipelineResult::Processing);

redis_conn.deregister(key.clone()).await.unwrap();
redis_conn.done(key.clone()).await.unwrap();
let datums = redis_conn.retrieve_datum(&key).await.unwrap();
let PipelineResult::Completed(datums) = datums else {
panic!("Expected completed results");
Expand Down
2 changes: 1 addition & 1 deletion rust/serving/src/app/jetstream_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ mod tests {
async fn register(&mut self, id: Option<String>) -> StoreResult<String> {
Ok(id.unwrap_or_else(|| Uuid::now_v7().to_string()))
}
async fn deregister(&mut self, _id: String) -> StoreResult<()> {
async fn done(&mut self, _id: String) -> StoreResult<()> {
Ok(())
}
async fn save(&mut self, _messages: Vec<PayloadToSave>) -> StoreResult<()> {
Expand Down
2 changes: 2 additions & 0 deletions rust/serving/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ impl Default for RedisConfig {

#[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct Settings {
/// The HTTP header used to communicate to the client about the unique id assigned for a request in the store
/// The client may also set the value of this header when sending the payload.
pub tid_header: String,
pub app_listen_port: u16,
pub metrics_server_listen_port: u16,
Expand Down

0 comments on commit 51c5b4f

Please sign in to comment.