Skip to content

Commit 54bcfcf

Browse files
committed
Add a payload_size operation
Without a blob object to read, if something wants to know the size of a payload it is possible with this to do it without having to read the payload data. Signed-off-by: J Robert Ray <[email protected]>
1 parent 45e3f09 commit 54bcfcf

File tree

12 files changed

+105
-0
lines changed

12 files changed

+105
-0
lines changed

crates/spfs/src/proto/defs/payload.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@ message HasPayloadResponse{
2323
bool exists =3;
2424
}
2525

26+
message PayloadSizeRequest{
27+
Digest digest = 1;
28+
}
29+
message PayloadSizeResponse{
30+
oneof result {
31+
Error error = 1;
32+
uint64 ok = 2;
33+
}
34+
}
35+
2636
message WritePayloadRequest{}
2737
message WritePayloadResponse{
2838
message UploadOption {
@@ -72,6 +82,7 @@ message RemovePayloadResponse{
7282
service PayloadService {
7383
rpc IterDigests(IterDigestsRequest) returns (stream IterDigestsResponse);
7484
rpc HasPayload(HasPayloadRequest) returns (HasPayloadResponse);
85+
rpc PayloadSize(PayloadSizeRequest) returns (PayloadSizeResponse);
7586
rpc WritePayload(WritePayloadRequest) returns (WritePayloadResponse);
7687
rpc OpenPayload(OpenPayloadRequest) returns (OpenPayloadResponse);
7788
rpc RemovePayload(RemovePayloadRequest) returns (RemovePayloadResponse);

crates/spfs/src/proto/result.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ rpc_result!(
127127
bool
128128
);
129129

130+
rpc_result!(
131+
g::PayloadSizeResponse,
132+
g::payload_size_response::Result,
133+
u64
134+
);
130135
rpc_result!(
131136
g::WritePayloadResponse,
132137
g::write_payload_response::Result,

crates/spfs/src/server/payload.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@ impl proto::payload_service_server::PayloadService for PayloadService {
7070
Ok(Response::new(result))
7171
}
7272

73+
async fn payload_size(
74+
&self,
75+
request: Request<proto::PayloadSizeRequest>,
76+
) -> Result<Response<proto::PayloadSizeResponse>, Status> {
77+
let request = request.into_inner();
78+
let digest = convert_digest(request.digest)
79+
.map_err(|err| Status::invalid_argument(err.to_string()))?;
80+
let size = proto::handle_error!(self.repo.payload_size(digest).await);
81+
let result = proto::PayloadSizeResponse::ok(size);
82+
Ok(Response::new(result))
83+
}
84+
7385
async fn open_payload(
7486
&self,
7587
request: Request<proto::OpenPayloadRequest>,

crates/spfs/src/storage/fallback/repository.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,10 @@ impl PayloadStorage for FallbackProxy {
280280
false
281281
}
282282

283+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
284+
crate::storage::proxy::payload_size(self, digest).await
285+
}
286+
283287
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
284288
self.primary.iter_payload_digests()
285289
}

crates/spfs/src/storage/fs/payloads.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ impl crate::storage::PayloadStorage for MaybeOpenFsRepository {
2121
opened.has_payload(digest).await
2222
}
2323

24+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
25+
let opened = self.opened().await?;
26+
opened.payload_size(digest).await
27+
}
28+
2429
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
2530
self.opened()
2631
.and_then(|opened| ready(Ok(opened.iter_payload_digests())))
@@ -52,6 +57,17 @@ impl crate::storage::PayloadStorage for OpenFsRepository {
5257
tokio::fs::symlink_metadata(path).await.is_ok()
5358
}
5459

60+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
61+
let path = self.payloads.build_digest_path(&digest);
62+
tokio::fs::symlink_metadata(&path)
63+
.await
64+
.map(|meta| meta.len())
65+
.map_err(|err| match err.kind() {
66+
ErrorKind::NotFound => Error::UnknownObject(digest),
67+
_ => Error::StorageReadError("symlink_metadata on payload", path, err),
68+
})
69+
}
70+
5571
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
5672
Box::pin(self.payloads.iter())
5773
}

crates/spfs/src/storage/handle.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,10 @@ impl PayloadStorage for RepositoryHandle {
257257
each_variant!(self, repo, { repo.has_payload(digest).await })
258258
}
259259

260+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
261+
each_variant!(self, repo, { repo.payload_size(digest).await })
262+
}
263+
260264
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
261265
each_variant!(self, repo, { repo.iter_payload_digests() })
262266
}
@@ -427,6 +431,10 @@ impl PayloadStorage for Arc<RepositoryHandle> {
427431
each_variant!(&**self, repo, { repo.has_payload(digest).await })
428432
}
429433

434+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
435+
each_variant!(&**self, repo, { repo.payload_size(digest).await })
436+
}
437+
430438
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
431439
each_variant!(&**self, repo, { repo.iter_payload_digests() })
432440
}

crates/spfs/src/storage/payload.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub trait PayloadStorage: Sync + Send {
2222
/// Return true if the identified payload exists.
2323
async fn has_payload(&self, digest: encoding::Digest) -> bool;
2424

25+
/// Return the payload size if the identified payload exists.
26+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64>;
27+
2528
/// Store the contents of the given stream, returning its digest and size
2629
async fn write_data(&self, reader: Pin<Box<dyn BlobRead>>) -> Result<(encoding::Digest, u64)>;
2730

@@ -47,6 +50,10 @@ impl<T: PayloadStorage> PayloadStorage for &T {
4750
PayloadStorage::has_payload(&**self, digest).await
4851
}
4952

53+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
54+
PayloadStorage::payload_size(&**self, digest).await
55+
}
56+
5057
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
5158
PayloadStorage::iter_payload_digests(&**self)
5259
}

crates/spfs/src/storage/pinned/repository.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ where
127127
self.inner.has_payload(digest).await
128128
}
129129

130+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
131+
self.inner.payload_size(digest).await
132+
}
133+
130134
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
131135
self.inner.iter_payload_digests()
132136
}

crates/spfs/src/storage/proxy/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub(crate) use repository::{
1212
find_tags_in_namespace,
1313
iter_tag_streams_in_namespace,
1414
ls_tags_in_namespace,
15+
payload_size,
1516
read_tag_in_namespace,
1617
};
1718

crates/spfs/src/storage/proxy/repository.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,21 @@ impl graph::DatabaseExt for ProxyRepository {
202202
}
203203
}
204204

205+
pub(crate) async fn payload_size<R>(repo: R, digest: encoding::Digest) -> Result<u64>
206+
where
207+
R: ProxyRepositoryExt,
208+
{
209+
if let Ok(size) = repo.primary().payload_size(digest).await {
210+
return Ok(size);
211+
}
212+
for secondary in repo.secondary().iter() {
213+
if let Ok(size) = secondary.payload_size(digest).await {
214+
return Ok(size);
215+
}
216+
}
217+
Err(crate::Error::UnknownObject(digest))
218+
}
219+
205220
#[async_trait::async_trait]
206221
impl PayloadStorage for ProxyRepository {
207222
async fn has_payload(&self, digest: encoding::Digest) -> bool {
@@ -216,6 +231,10 @@ impl PayloadStorage for ProxyRepository {
216231
false
217232
}
218233

234+
async fn payload_size(&self, digest: encoding::Digest) -> Result<u64> {
235+
payload_size(self, digest).await
236+
}
237+
219238
fn iter_payload_digests(&self) -> Pin<Box<dyn Stream<Item = Result<encoding::Digest>> + Send>> {
220239
self.primary.iter_payload_digests()
221240
}

0 commit comments

Comments
 (0)