Skip to content

Commit c252a18

Browse files
authored
Implement MultiPartStore for InMemory (#5495)
1 parent 6fd3a16 commit c252a18

File tree

1 file changed

+91
-1
lines changed

1 file changed

+91
-1
lines changed

object_store/src/memory.rs

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
//! An in-memory object store implementation
19+
use crate::multipart::{MultiPartStore, PartId};
1920
use crate::util::InvalidGetRange;
2021
use crate::{
2122
path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore,
@@ -28,8 +29,8 @@ use chrono::{DateTime, Utc};
2829
use futures::{stream::BoxStream, StreamExt};
2930
use parking_lot::RwLock;
3031
use snafu::{OptionExt, ResultExt, Snafu};
31-
use std::collections::BTreeMap;
3232
use std::collections::BTreeSet;
33+
use std::collections::{BTreeMap, HashMap};
3334
use std::io;
3435
use std::ops::Range;
3536
use std::pin::Pin;
@@ -52,6 +53,12 @@ enum Error {
5253

5354
#[snafu(display("ETag required for conditional update"))]
5455
MissingETag,
56+
57+
#[snafu(display("MultipartUpload not found: {id}"))]
58+
UploadNotFound { id: String },
59+
60+
#[snafu(display("Missing part at index: {part}"))]
61+
MissingPart { part: usize },
5562
}
5663

5764
impl From<Error> for super::Error {
@@ -101,6 +108,12 @@ impl Entry {
101108
struct Storage {
102109
next_etag: usize,
103110
map: BTreeMap<Path, Entry>,
111+
uploads: HashMap<usize, PartStorage>,
112+
}
113+
114+
#[derive(Debug, Default, Clone)]
115+
struct PartStorage {
116+
parts: Vec<Option<Bytes>>,
104117
}
105118

106119
type SharedStorage = Arc<RwLock<Storage>>;
@@ -154,6 +167,24 @@ impl Storage {
154167
}
155168
}
156169
}
170+
171+
fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
172+
let parts = id
173+
.parse()
174+
.ok()
175+
.and_then(|x| self.uploads.get_mut(&x))
176+
.context(UploadNotFoundSnafu { id })?;
177+
Ok(parts)
178+
}
179+
180+
fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
181+
let parts = id
182+
.parse()
183+
.ok()
184+
.and_then(|x| self.uploads.remove(&x))
185+
.context(UploadNotFoundSnafu { id })?;
186+
Ok(parts)
187+
}
157188
}
158189

159190
impl std::fmt::Display for InMemory {
@@ -359,6 +390,64 @@ impl ObjectStore for InMemory {
359390
}
360391
}
361392

393+
#[async_trait]
394+
impl MultiPartStore for InMemory {
395+
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
396+
let mut storage = self.storage.write();
397+
let etag = storage.next_etag;
398+
storage.next_etag += 1;
399+
storage.uploads.insert(etag, Default::default());
400+
Ok(etag.to_string())
401+
}
402+
403+
async fn put_part(
404+
&self,
405+
_path: &Path,
406+
id: &MultipartId,
407+
part_idx: usize,
408+
data: Bytes,
409+
) -> Result<PartId> {
410+
let mut storage = self.storage.write();
411+
let upload = storage.upload_mut(id)?;
412+
if part_idx <= upload.parts.len() {
413+
upload.parts.resize(part_idx + 1, None);
414+
}
415+
upload.parts[part_idx] = Some(data);
416+
Ok(PartId {
417+
content_id: Default::default(),
418+
})
419+
}
420+
421+
async fn complete_multipart(
422+
&self,
423+
path: &Path,
424+
id: &MultipartId,
425+
_parts: Vec<PartId>,
426+
) -> Result<PutResult> {
427+
let mut storage = self.storage.write();
428+
let upload = storage.remove_upload(id)?;
429+
430+
let mut cap = 0;
431+
for (part, x) in upload.parts.iter().enumerate() {
432+
cap += x.as_ref().context(MissingPartSnafu { part })?.len();
433+
}
434+
let mut buf = Vec::with_capacity(cap);
435+
for x in &upload.parts {
436+
buf.extend_from_slice(x.as_ref().unwrap())
437+
}
438+
let etag = storage.insert(path, buf.into());
439+
Ok(PutResult {
440+
e_tag: Some(etag.to_string()),
441+
version: None,
442+
})
443+
}
444+
445+
async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
446+
self.storage.write().remove_upload(id)?;
447+
Ok(())
448+
}
449+
}
450+
362451
impl InMemory {
363452
/// Create new in-memory storage.
364453
pub fn new() -> Self {
@@ -444,6 +533,7 @@ mod tests {
444533
copy_if_not_exists(&integration).await;
445534
stream_get(&integration).await;
446535
put_opts(&integration, true).await;
536+
multipart(&integration, &integration).await;
447537
}
448538

449539
#[tokio::test]

0 commit comments

Comments
 (0)