Skip to content

Commit 8f76fbe

Browse files
committed
Moving, renaming, comments
Signed-off-by: itowlson <[email protected]>
1 parent bad6c40 commit 8f76fbe

File tree

13 files changed

+472
-502
lines changed

13 files changed

+472
-502
lines changed

crates/blobstore-azure/src/store.rs

+7-14
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,6 @@ impl ContainerManager for AzureContainerManager {
5353
fn is_defined(&self, _store_name: &str) -> bool {
5454
true
5555
}
56-
57-
fn summary(&self, _store_name: &str) -> Option<String> {
58-
Some(format!(
59-
"Azure blob storage account {}",
60-
self.client.account()
61-
))
62-
}
6356
}
6457

6558
struct AzureContainer {
@@ -144,18 +137,18 @@ impl Container for AzureContainer {
144137
Ok(Box::new(AzureIncomingData::new(client, range)))
145138
}
146139

147-
async fn connect_stm(
140+
async fn write_data(
148141
&self,
149142
name: &str,
150-
stm: tokio::io::ReadHalf<tokio::io::SimplexStream>,
143+
data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
151144
finished_tx: tokio::sync::mpsc::Sender<anyhow::Result<()>>,
152145
) -> anyhow::Result<()> {
153146
let client = self.client.blob_client(name);
154147

155148
tokio::spawn(async move {
156-
let result = Self::connect_stm_core(stm, client).await;
149+
let write_result = Self::write_data_core(data, client).await;
157150
finished_tx
158-
.send(result)
151+
.send(write_result)
159152
.await
160153
.expect("should sent finish tx");
161154
});
@@ -170,8 +163,8 @@ impl Container for AzureContainer {
170163
}
171164

172165
impl AzureContainer {
173-
async fn connect_stm_core(
174-
mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>,
166+
async fn write_data_core(
167+
mut data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
175168
client: azure_storage_blobs::prelude::BlobClient,
176169
) -> anyhow::Result<()> {
177170
use tokio::io::AsyncReadExt;
@@ -186,7 +179,7 @@ impl AzureContainer {
186179
'put_blocks: loop {
187180
let mut bytes = Vec::with_capacity(BLOCK_SIZE);
188181
loop {
189-
let read = stm.read_buf(&mut bytes).await?;
182+
let read = data.read_buf(&mut bytes).await?;
190183
let len = bytes.len();
191184

192185
if read == 0 {

crates/blobstore-fs/src/lib.rs

+26-13
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,13 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
102102
self.name.clone()
103103
}
104104
async fn info(&self) -> anyhow::Result<spin_factor_blobstore::ContainerMetadata> {
105-
todo!()
105+
let meta = self.path.metadata()?;
106+
let created_at = created_at_nanos(&meta)?;
107+
108+
Ok(spin_factor_blobstore::ContainerMetadata {
109+
name: self.name.to_owned(),
110+
created_at,
111+
})
106112
}
107113
async fn clear(&self) -> anyhow::Result<()> {
108114
let entries = std::fs::read_dir(&self.path)?.collect::<Vec<_>>();
@@ -140,11 +146,7 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
140146
name: &str,
141147
) -> anyhow::Result<spin_factor_blobstore::ObjectMetadata> {
142148
let meta = tokio::fs::metadata(self.object_path(name)?).await?;
143-
let created_at = meta
144-
.created()?
145-
.duration_since(std::time::SystemTime::UNIX_EPOCH)?
146-
.as_nanos()
147-
.try_into()?;
149+
let created_at = created_at_nanos(&meta)?;
148150
Ok(spin_factor_blobstore::ObjectMetadata {
149151
name: name.to_string(),
150152
container: self.name.to_string(),
@@ -168,10 +170,10 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
168170
}))
169171
}
170172

171-
async fn connect_stm(
173+
async fn write_data(
172174
&self,
173175
name: &str,
174-
stm: tokio::io::ReadHalf<tokio::io::SimplexStream>,
176+
data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
175177
finished_tx: tokio::sync::mpsc::Sender<anyhow::Result<()>>,
176178
) -> anyhow::Result<()> {
177179
let path = self.object_path(name)?;
@@ -181,9 +183,9 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
181183
let file = tokio::fs::File::create(&path).await?;
182184

183185
tokio::spawn(async move {
184-
let result = Self::connect_stm_core(stm, file).await;
186+
let write_result = Self::write_data_core(data, file).await;
185187
finished_tx
186-
.send(result)
188+
.send(write_result)
187189
.await
188190
.expect("shoulda sent finished_tx");
189191
});
@@ -203,8 +205,8 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
203205
}
204206

205207
impl FileSystemContainer {
206-
async fn connect_stm_core(
207-
mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>,
208+
async fn write_data_core(
209+
mut data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
208210
mut file: tokio::fs::File,
209211
) -> anyhow::Result<()> {
210212
use tokio::io::AsyncReadExt;
@@ -214,8 +216,9 @@ impl FileSystemContainer {
214216

215217
loop {
216218
let mut buf = vec![0; BUF_SIZE];
217-
let count = stm.read(&mut buf).await?;
219+
let count = data.read(&mut buf).await?;
218220
if count == 0 {
221+
_ = file.flush().await;
219222
break;
220223
}
221224
file.write_all(&buf[0..count]).await?;
@@ -363,3 +366,13 @@ impl spin_factor_blobstore::ObjectNames for BlobNames {
363366
Ok((count, at_end))
364367
}
365368
}
369+
370+
fn created_at_nanos(meta: &std::fs::Metadata) -> anyhow::Result<u64> {
371+
let time_nanos = meta
372+
.created()?
373+
.duration_since(std::time::SystemTime::UNIX_EPOCH)?
374+
.as_nanos()
375+
.try_into()
376+
.unwrap_or_default();
377+
Ok(time_nanos)
378+
}

crates/blobstore-s3/src/store.rs

+7-11
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,6 @@ impl ContainerManager for BlobStoreS3 {
134134
fn is_defined(&self, _store_name: &str) -> bool {
135135
true
136136
}
137-
138-
fn summary(&self, _store_name: &str) -> Option<String> {
139-
Some("AWS S3 blob storage".to_owned())
140-
}
141137
}
142138

143139
struct S3Container {
@@ -257,19 +253,19 @@ impl Container for S3Container {
257253
Ok(Box::new(S3IncomingData::new(resp)))
258254
}
259255

260-
async fn connect_stm(
256+
async fn write_data(
261257
&self,
262258
name: &str,
263-
stm: tokio::io::ReadHalf<tokio::io::SimplexStream>,
259+
data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
264260
finished_tx: tokio::sync::mpsc::Sender<anyhow::Result<()>>,
265261
) -> anyhow::Result<()> {
266262
let store = self.store.clone();
267263
let path = object_store::path::Path::from(name);
268264

269265
tokio::spawn(async move {
270-
let conn_result = Self::connect_stm_core(stm, store, path).await;
266+
let write_result = Self::write_data_core(data, store, path).await;
271267
finished_tx
272-
.send(conn_result)
268+
.send(write_result)
273269
.await
274270
.expect("should sent finish tx");
275271
});
@@ -289,8 +285,8 @@ impl Container for S3Container {
289285
}
290286

291287
impl S3Container {
292-
async fn connect_stm_core(
293-
mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>,
288+
async fn write_data_core(
289+
mut data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
294290
store: object_store::aws::AmazonS3,
295291
path: object_store::path::Path,
296292
) -> anyhow::Result<()> {
@@ -301,7 +297,7 @@ impl S3Container {
301297
loop {
302298
use tokio::io::AsyncReadExt;
303299
let mut buf = vec![0; 5 * 1024 * 1024];
304-
let read_amount = stm.read(&mut buf).await?;
300+
let read_amount = data.read(&mut buf).await?;
305301
if read_amount == 0 {
306302
break;
307303
}

crates/factor-blobstore/Cargo.toml

-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ wasmtime-wasi = { workspace = true }
2323

2424
[dev-dependencies]
2525
spin-factors-test = { path = "../factors-test" }
26-
# spin-key-value-redis = { path = "../key-value-redis" }
27-
# spin-key-value-spin = { path = "../key-value-spin" }
2826
tempfile = { workspace = true }
2927
tokio = { workspace = true, features = ["macros", "rt"] }
3028

0 commit comments

Comments
 (0)