Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions rust/garbage_collector/src/garbage_collector_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ mod tests {
use chroma_storage::config::{
ObjectStoreBucketConfig, ObjectStoreConfig, ObjectStoreType, StorageConfig,
};
use chroma_storage::GetOptions;
use chroma_sysdb::{GrpcSysDbConfig, SysDbConfig};
use chroma_system::System;
use std::str::FromStr;
Expand Down Expand Up @@ -751,7 +752,7 @@ mod tests {

async fn get_hnsw_index_ids(storage: &Storage) -> Vec<Uuid> {
storage
.list_prefix("hnsw")
.list_prefix("hnsw", GetOptions::default())
.await
.unwrap()
.into_iter()
Expand Down Expand Up @@ -935,7 +936,7 @@ mod tests {
.unwrap();

let deleted_hnsw_files_before_test: Vec<_> = storage
.list_prefix("gc")
.list_prefix("gc", GetOptions::default())
.await
.unwrap()
.into_iter()
Expand Down Expand Up @@ -1060,7 +1061,7 @@ mod tests {

// Verify that "deleted" files are renamed with the "gc" prefix
let deleted_hnsw_files: Vec<_> = storage
.list_prefix("gc")
.list_prefix("gc", GetOptions::default())
.await
.unwrap()
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl Drop for GarbageCollectorUnderTest {
self.runtime.block_on(async {
self.sysdb.reset().await.unwrap();

let files = self.storage.list_prefix("").await.unwrap();
let files = self
.storage
.list_prefix("", GetOptions::default())
.await
.unwrap();
if files.is_empty() {
return;
}
Expand Down Expand Up @@ -472,7 +476,7 @@ impl StateMachineTest for GarbageCollectorUnderTest {
let file_ref_counts = ref_state.get_file_ref_counts();
let files_on_disk = ref_state
.runtime
.block_on(state.storage.list_prefix(""))
.block_on(state.storage.list_prefix("", GetOptions::default()))
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
Expand Down
10 changes: 10 additions & 0 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,16 @@ impl AdmissionControlledS3Storage {
// Akin to a HEAD request; no AC.
self.storage.copy(src_key, dst_key).await
}

pub async fn list_prefix(
&self,
prefix: &str,
options: GetOptions,
) -> Result<Vec<String>, StorageError> {
let atomic_priority = Arc::new(AtomicUsize::new(options.priority.as_usize()));
let _permit = self.rate_limiter.enter(atomic_priority, None).await;
Copy link
Collaborator

@HammadB HammadB May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should list be part of the same rate limiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. It's a request to S3. A new tier might make sense, but I think rate limiting it as a read makes sense absent any other direction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current rate limits assume a read is roughly 8MB and tries to saturate the network bandwidth accordingly. Seems like this has different characteristics and might not be ok to be used with the same rate limiter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it separate? All I'm seeing is that we'll under perform during lists. Given that the only lists are offline ops, it seems OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If lists don't compete with the usual reads and writes for tokens then probably fine to keep it, otherwise I'd make it separate

self.storage.list_prefix(prefix).await
}
}

#[async_trait]
Expand Down
14 changes: 7 additions & 7 deletions rust/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,16 @@ impl Storage {
}
}

pub async fn list_prefix(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
pub async fn list_prefix(
&self,
prefix: &str,
options: GetOptions,
) -> Result<Vec<String>, StorageError> {
match self {
Storage::Local(local) => local.list_prefix(prefix).await,
Storage::S3(_) => {
unimplemented!("list_prefix not implemented for S3")
}
Storage::S3(s3) => s3.list_prefix(prefix).await,
Storage::ObjectStore(object_store) => object_store.list_prefix(prefix).await,
Storage::AdmissionControlledS3(_) => {
unimplemented!("list_prefix not implemented for AdmissionControlledS3")
}
Storage::AdmissionControlledS3(acs3) => acs3.list_prefix(prefix, options).await,
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions rust/storage/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,33 @@ impl S3Storage {
}
}
}

pub async fn list_prefix(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
let mut outs = self
.client
.list_objects_v2()
.bucket(&self.bucket)
.set_max_keys(Some(1000))
.prefix(prefix)
.into_paginator()
.send();
let mut paths = vec![];
while let Some(result) = outs.next().await {
let output = result.map_err(|err| StorageError::Generic {
source: Arc::new(err),
})?;
for object in output.contents() {
if let Some(key) = object.key() {
paths.push(key.to_string());
} else {
return Err(StorageError::Message {
message: format!("list on prefix {:?} led to empty key", prefix),
});
}
}
}
Ok(paths)
}
}

#[async_trait]
Expand Down Expand Up @@ -1131,4 +1158,32 @@ mod tests {
let bytes = storage.get("test2/00").await.unwrap();
assert_eq!("ABC123XYZ".as_bytes(), bytes.as_slice());
}

#[tokio::test]
async fn test_k8s_integration_list_prefix() {
let storage = setup_with_bucket(1024 * 1024 * 8, 1024 * 1024 * 8).await;
for i in 0..16 {
storage
.oneshot_upload(
&format!("test/{:02x}", i),
0,
|_| Box::pin(ready(Ok(ByteStream::from(Bytes::new())))) as _,
PutOptions {
if_not_exists: true,
if_match: None,
priority: StorageRequestPriority::P0,
},
)
.await
.unwrap();
}

let mut results = storage.list_prefix("test/").await.unwrap();
results.sort();
eprintln!("Results of listing (should be 0x00..0xff inclusive): {results:#?}");
assert_eq!(16, results.len());
for (i, result) in results.iter().enumerate() {
assert_eq!(format!("test/{:02x}", i), *result, "index = {}", i);
}
}
}
Loading