Skip to content

Commit 08af471

Browse files
authored
Add BufWriter::with_attributes and ::with_tags in object_store (#5693)
* Add `BufWriter::with_attributes` Signed-off-by: netthier <[email protected]> * Add `BufWriter::with_tags` Signed-off-by: netthier <[email protected]> --------- Signed-off-by: netthier <[email protected]>
1 parent 7feb542 commit 08af471

File tree

4 files changed

+99
-19
lines changed

4 files changed

+99
-19
lines changed

object_store/src/aws/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -437,10 +437,16 @@ mod tests {
437437

438438
// Object tagging is not supported by S3 Express One Zone
439439
if config.session_provider.is_none() {
440-
tagging(&integration, !config.disable_tagging, |p| {
441-
let client = Arc::clone(&integration.client);
442-
async move { client.get_object_tagging(&p).await }
443-
})
440+
tagging(
441+
Arc::new(AmazonS3 {
442+
client: Arc::clone(&integration.client),
443+
}),
444+
!config.disable_tagging,
445+
|p| {
446+
let client = Arc::clone(&integration.client);
447+
async move { client.get_object_tagging(&p).await }
448+
},
449+
)
444450
.await;
445451
}
446452

object_store/src/azure/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,10 +296,16 @@ mod tests {
296296
signing(&integration).await;
297297

298298
let validate = !integration.client.config().disable_tagging;
299-
tagging(&integration, validate, |p| {
300-
let client = Arc::clone(&integration.client);
301-
async move { client.get_blob_tagging(&p).await }
302-
})
299+
tagging(
300+
Arc::new(MicrosoftAzure {
301+
client: Arc::clone(&integration.client),
302+
}),
303+
validate,
304+
|p| {
305+
let client = Arc::clone(&integration.client);
306+
async move { client.get_blob_tagging(&p).await }
307+
},
308+
)
303309
.await;
304310

305311
// Azurite doesn't support attributes properly

object_store/src/buffered.rs

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
//! Utilities for performing tokio-style buffered IO
1919
2020
use crate::path::Path;
21-
use crate::{ObjectMeta, ObjectStore, PutPayloadMut, WriteMultipart};
21+
use crate::{
22+
Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet,
23+
WriteMultipart,
24+
};
2225
use bytes::Bytes;
2326
use futures::future::{BoxFuture, FutureExt};
2427
use futures::ready;
@@ -217,6 +220,8 @@ impl AsyncBufRead for BufReader {
217220
pub struct BufWriter {
218221
capacity: usize,
219222
max_concurrency: usize,
223+
attributes: Option<Attributes>,
224+
tags: Option<TagSet>,
220225
state: BufWriterState,
221226
store: Arc<dyn ObjectStore>,
222227
}
@@ -252,6 +257,8 @@ impl BufWriter {
252257
capacity,
253258
store,
254259
max_concurrency: 8,
260+
attributes: None,
261+
tags: None,
255262
state: BufWriterState::Buffer(path, PutPayloadMut::new()),
256263
}
257264
}
@@ -266,6 +273,22 @@ impl BufWriter {
266273
}
267274
}
268275

276+
/// Set the attributes of the uploaded object
277+
pub fn with_attributes(self, attributes: Attributes) -> Self {
278+
Self {
279+
attributes: Some(attributes),
280+
..self
281+
}
282+
}
283+
284+
/// Set the tags of the uploaded object
285+
pub fn with_tags(self, tags: TagSet) -> Self {
286+
Self {
287+
tags: Some(tags),
288+
..self
289+
}
290+
}
291+
269292
/// Abort this writer, cleaning up any partially uploaded state
270293
///
271294
/// # Panic
@@ -306,9 +329,13 @@ impl AsyncWrite for BufWriter {
306329
if b.content_length().saturating_add(buf.len()) >= cap {
307330
let buffer = std::mem::take(b);
308331
let path = std::mem::take(path);
332+
let opts = PutMultipartOpts {
333+
attributes: self.attributes.take().unwrap_or_default(),
334+
tags: self.tags.take().unwrap_or_default(),
335+
};
309336
let store = Arc::clone(&self.store);
310337
self.state = BufWriterState::Prepare(Box::pin(async move {
311-
let upload = store.put_multipart(&path).await?;
338+
let upload = store.put_multipart_opts(&path, opts).await?;
312339
let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
313340
for chunk in buffer.freeze() {
314341
chunked.put(chunk);
@@ -346,9 +373,14 @@ impl AsyncWrite for BufWriter {
346373
BufWriterState::Buffer(p, b) => {
347374
let buf = std::mem::take(b);
348375
let path = std::mem::take(p);
376+
let opts = PutOptions {
377+
attributes: self.attributes.take().unwrap_or_default(),
378+
tags: self.tags.take().unwrap_or_default(),
379+
..Default::default()
380+
};
349381
let store = Arc::clone(&self.store);
350382
self.state = BufWriterState::Flush(Box::pin(async move {
351-
store.put(&path, buf.into()).await?;
383+
store.put_opts(&path, buf.into(), opts).await?;
352384
Ok(())
353385
}));
354386
}
@@ -383,6 +415,7 @@ mod tests {
383415
use super::*;
384416
use crate::memory::InMemory;
385417
use crate::path::Path;
418+
use crate::{Attribute, GetOptions};
386419
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
387420

388421
#[tokio::test]
@@ -464,26 +497,54 @@ mod tests {
464497
}
465498
}
466499

500+
// Note: `BufWriter::with_tags` functionality is tested in `crate::tests::tagging`
467501
#[tokio::test]
468502
async fn test_buf_writer() {
469503
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
470504
let path = Path::from("file.txt");
505+
let attributes = Attributes::from_iter([
506+
(Attribute::ContentType, "text/html"),
507+
(Attribute::CacheControl, "max-age=604800"),
508+
]);
471509

472510
// Test put
473-
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
511+
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
512+
.with_attributes(attributes.clone());
474513
writer.write_all(&[0; 20]).await.unwrap();
475514
writer.flush().await.unwrap();
476515
writer.write_all(&[0; 5]).await.unwrap();
477516
writer.shutdown().await.unwrap();
478-
assert_eq!(store.head(&path).await.unwrap().size, 25);
517+
let response = store
518+
.get_opts(
519+
&path,
520+
GetOptions {
521+
head: true,
522+
..Default::default()
523+
},
524+
)
525+
.await
526+
.unwrap();
527+
assert_eq!(response.meta.size, 25);
528+
assert_eq!(response.attributes, attributes);
479529

480530
// Test multipart
481-
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
531+
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30)
532+
.with_attributes(attributes.clone());
482533
writer.write_all(&[0; 20]).await.unwrap();
483534
writer.flush().await.unwrap();
484535
writer.write_all(&[0; 20]).await.unwrap();
485536
writer.shutdown().await.unwrap();
486-
487-
assert_eq!(store.head(&path).await.unwrap().size, 40);
537+
let response = store
538+
.get_opts(
539+
&path,
540+
GetOptions {
541+
head: true,
542+
..Default::default()
543+
},
544+
)
545+
.await
546+
.unwrap();
547+
assert_eq!(response.meta.size, 40);
548+
assert_eq!(response.attributes, attributes);
488549
}
489550
}

object_store/src/lib.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,12 +1314,14 @@ mod test_util {
13141314
#[cfg(test)]
13151315
mod tests {
13161316
use super::*;
1317+
use crate::buffered::BufWriter;
13171318
use crate::multipart::MultipartStore;
13181319
use crate::test_util::flatten_list_stream;
13191320
use chrono::TimeZone;
13201321
use futures::stream::FuturesUnordered;
13211322
use rand::distributions::Alphanumeric;
13221323
use rand::{thread_rng, Rng};
1324+
use tokio::io::AsyncWriteExt;
13231325

13241326
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
13251327
delete_fixtures(storage).await;
@@ -2365,7 +2367,7 @@ mod tests {
23652367
}
23662368

23672369
#[cfg(any(feature = "aws", feature = "azure"))]
2368-
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: bool, get_tags: F)
2370+
pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
23692371
where
23702372
F: Fn(Path) -> Fut + Send + Sync,
23712373
Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
@@ -2415,19 +2417,24 @@ mod tests {
24152417

24162418
let multi_path = Path::from("tag_test_multi");
24172419
let mut write = storage
2418-
.put_multipart_opts(&multi_path, tag_set.into())
2420+
.put_multipart_opts(&multi_path, tag_set.clone().into())
24192421
.await
24202422
.unwrap();
24212423

24222424
write.put_part("foo".into()).await.unwrap();
24232425
write.complete().await.unwrap();
24242426

2427+
let buf_path = Path::from("tag_test_buf");
2428+
let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
2429+
buf.write_all(b"foo").await.unwrap();
2430+
buf.shutdown().await.unwrap();
2431+
24252432
// Write should always succeed, but certain configurations may simply ignore tags
24262433
if !validate {
24272434
return;
24282435
}
24292436

2430-
for path in [path, multi_path] {
2437+
for path in [path, multi_path, buf_path] {
24312438
let resp = get_tags(path.clone()).await.unwrap();
24322439
let body = resp.bytes().await.unwrap();
24332440

0 commit comments

Comments
 (0)