Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multiple space diffs #456

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions billing/lib/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ export const findSpaceUsageDeltas = messages => {
let resource
/** @type {number|undefined} */
let size
if (isReceiptForCapability(message, ServiceBlobCaps.allocate) && isServiceBlobAllocateSuccess(message.out)) {
if (isReceiptForCapability(message, ServiceBlobCaps.accept) && isServiceBlobAcceptSuccess(message.out)) {
resource = message.value.att[0].nb?.space
size = message.out.ok.size
size = message.value.att[0].nb?.blob.size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, and this works because value points to the original invocation for blob accept, which does include the size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only problem with this approach is that it doesn't take into account whether the blob is stored in the space already or not. The receipt for blob/allocatewill have size: 0 when the blob already exists in the space.

It's probably better than the current situation though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If size: 0, we are not adding it to the diff table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I'm saying we'll never receive that. We will add a new record to the space diff table every time the user attempts to store bafyxyz, no matter how many times.

e.g. bafyxyz is 100 bytes

Client store bafyxyz -> insert space diff -> space size = 100
Client store bafyxyz -> insert space diff -> space size = 200
Client store bafyxyz -> insert space diff -> space size = 300
...

but the behaviour we want is:

Client store bafyxyz -> insert space diff -> space size = 100
Client store bafyxyz -> no space diff insert -> space size = 100

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hannahhoward WDYT shall we merge and release this? IMHO it's better than the status quo but still needs more work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the blob already exists, we return success for blob/allocate with a size of 0 and out.ok.address set to null. However, if this occurs, we immediately return the receipt for http/put, and if it has succeeded, we execute blob/accept, leading to two receipts in the stream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in that case we should just not generate those receipts early.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to catch up on this one - if it doesn't come up at sprint planning or we don't have enough time to get into it I'd love to sync briefly after that if y'all are free!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - in our chat this morning we decided to ship this change since it will significantly decrease the impact of the duplicate diff issue, and to deprecate the ucan stream entirely as part of the upload-service work - @alanshaw should we create one more item under that upload service epic for this work? I'm happy to go file that and fill in some basic details if so...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I summarized the plan moving forward here: storacha/RFC#39

} else if (isReceiptForCapability(message, BlobCaps.remove) && isBlobRemoveSuccess(message.out)) {
resource = /** @type {import('@ucanto/interface').DID} */ (message.value.att[0].with)
size = -message.out.ok.size
Expand Down Expand Up @@ -118,14 +118,14 @@ const isReceipt = m => m.type === 'receipt'

/**
* @param {import('@ucanto/interface').Result} r
* @returns {r is { ok: import('@web3-storage/capabilities/types').BlobAllocateSuccess }}
* @returns {r is { ok: import('@web3-storage/capabilities/types').BlobAcceptSuccess }}
*/
const isServiceBlobAllocateSuccess = r =>
const isServiceBlobAcceptSuccess = (r) =>
!r.error &&
r.ok != null &&
typeof r.ok === 'object' &&
'size' in r.ok &&
(typeof r.ok.size === 'number')
'site' in r.ok &&
typeof r.ok.site === 'object'

/**
* @param {import('@ucanto/interface').Result} r
Expand Down
17 changes: 11 additions & 6 deletions billing/test/lib/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const test = {

/**
* @type {import('../../lib/api.js').UcanReceiptMessage<[
* | import('@web3-storage/capabilities/types').BlobAllocate
* | import('@web3-storage/capabilities/types').BlobAccept
* | import('@web3-storage/capabilities/types').BlobRemove
* | import('@web3-storage/capabilities/types').StoreAdd
* | import('@web3-storage/capabilities/types').StoreRemove
Expand All @@ -42,20 +42,25 @@ export const test = {
value: {
att: [{
with: await randomDIDKey(),
can: ServiceBlobCaps.allocate.can,
can: ServiceBlobCaps.accept.can,
nb: {
_put: {
"ucan/await": [
".out.ok",
randomLink()
]
},
blob: {
digest: randomLink().multihash.bytes,
size: 138
},
cause: randomLink(),
space: await randomDIDKey()
}
}],
aud: await randomDID(),
cid: randomLink()
},
out: { ok: { size: 138 } },
out: { ok: { site: randomLink() } },
ts: new Date()
}, {
type: 'receipt',
Expand Down Expand Up @@ -118,8 +123,8 @@ export const test = {
for (const r of receipts) {
assert.ok(deltas.some(d => (
d.cause.toString() === r.invocationCid.toString() &&
// resource for blob allocate is found in the caveats
(r.value.att[0].can === ServiceBlobCaps.allocate.can
// resource for blob accept is found in the caveats
(r.value.att[0].can === ServiceBlobCaps.accept.can
? d.resource === r.value.att[0].nb.space
: d.resource === r.value.att[0].with)
)))
Expand Down
66 changes: 34 additions & 32 deletions upload-api/stores/agent/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,22 @@ export const assert = async (message, { stream, store }) => {
for (const member of message.index) {
if (member.invocation) {
const { task, invocation, message } = member.invocation
const data = JSON.stringify({
// This is bad naming but not worth a breaking change
carCid: message.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
ts: Date.now(),
type: stream.workflow.type,
})

records.push({
Data: UTF8.fromString(
JSON.stringify({
// This is bad naming but not worth a breaking change
carCid: message.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
ts: Date.now(),
type: stream.workflow.type,
})
),
Data: UTF8.fromString(data),
PartitionKey: partitionKey(member),
})
}
Expand Down Expand Up @@ -133,24 +133,26 @@ export const assert = async (message, { stream, store }) => {
console.warn("receipt will not serialize to JSON", "receipt", receipt.out, "error", error)
}

const data = JSON.stringify(
{
carCid: message.toString(),
invocationCid: invocation.cid.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
out: receipt.out,
ts: Date.now(),
type: stream.receipt.type,
},
(_, value) => (typeof value === 'bigint' ? Number(value) : value)
)

records.push({
Data: UTF8.fromString(
JSON.stringify({
// This is bad naming but not worth a breaking change
carCid: message.toString(),
invocationCid: invocation.cid.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
out: receipt.out,
ts: Date.now(),
type: stream.receipt.type,
}, (_, value) => typeof value === "bigint" ? Number(value) : value )
),
Data: UTF8.fromString(data),
PartitionKey: partitionKey(member),
})
}
Expand Down
Loading