Skip to content

Commit

Permalink
fix: prevent duplicate space diff entries by using the BlobAccept cap…
Browse files Browse the repository at this point in the history
…ability instead of BlobAllocate

- Duplicate usage events occurred because the blob/allocate receipt was added to the UCAN stream multiple times.
- The blob/allocate receipt was embedded within the blob/add receipt.
- The UCAN invocation router extracted receipts from agent messages, adding blob/allocate to the stream initially and again when processing blob/add.
- Resolution: Changed the capability used to track space usage deltas from BlobAllocate to BlobAccept to avoid duplicate events.
  • Loading branch information
BravoNatalie committed Jan 17, 2025
1 parent c09f3ba commit 6387690
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 38 deletions.
13 changes: 7 additions & 6 deletions billing/lib/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ export const findSpaceUsageDeltas = messages => {
let resource
/** @type {number|undefined} */
let size
if (isReceiptForCapability(message, ServiceBlobCaps.allocate) && isServiceBlobAllocateSuccess(message.out)) {
if (isReceiptForCapability(message, ServiceBlobCaps.accept) && isServiceBlobAcceptSuccess(message.out)) {
resource = message.value.att[0].nb?.space
size = message.out.ok.size
size = message.value.att[0].nb?.blob.size
} else if (isReceiptForCapability(message, BlobCaps.remove) && isBlobRemoveSuccess(message.out)) {
resource = /** @type {import('@ucanto/interface').DID} */ (message.value.att[0].with)
size = -message.out.ok.size
Expand Down Expand Up @@ -82,6 +82,7 @@ export const storeSpaceUsageDeltas = async (deltas, ctx) => {
// could have multiple providers for the same consumer (space).
const consumers = consumerList.ok.results
console.log(`Found ${consumers.length} consumers for ${delta.resource}`)
console.log(`agentMessage (cause): ${delta.cause}`)
for (const consumer of consumers) {
diffs.push({
provider: consumer.provider,
Expand Down Expand Up @@ -118,14 +119,14 @@ const isReceipt = m => m.type === 'receipt'

/**
* @param {import('@ucanto/interface').Result} r
* @returns {r is { ok: import('@web3-storage/capabilities/types').BlobAllocateSuccess }}
* @returns {r is { ok: import('@web3-storage/capabilities/types').BlobAcceptSuccess }}
*/
const isServiceBlobAllocateSuccess = r =>
const isServiceBlobAcceptSuccess = (r) =>
!r.error &&
r.ok != null &&
typeof r.ok === 'object' &&
'size' in r.ok &&
(typeof r.ok.size === 'number')
'site' in r.ok &&
typeof r.ok.site === 'object'

/**
* @param {import('@ucanto/interface').Result} r
Expand Down
66 changes: 34 additions & 32 deletions upload-api/stores/agent/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,22 @@ export const assert = async (message, { stream, store }) => {
for (const member of message.index) {
if (member.invocation) {
const { task, invocation, message } = member.invocation
const data = JSON.stringify({
// This is bad naming but not worth a breaking change
carCid: message.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
ts: Date.now(),
type: stream.workflow.type,
})

records.push({
Data: UTF8.fromString(
JSON.stringify({
// This is bad naming but not worth a breaking change
carCid: message.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
ts: Date.now(),
type: stream.workflow.type,
})
),
Data: UTF8.fromString(data),
PartitionKey: partitionKey(member),
})
}
Expand Down Expand Up @@ -133,24 +133,26 @@ export const assert = async (message, { stream, store }) => {
console.warn("receipt will not serialize to JSON", "receipt", receipt.out, "error", error)
}

const data = JSON.stringify(
{
carCid: message.toString(),
invocationCid: invocation.cid.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
out: receipt.out,
ts: Date.now(),
type: stream.receipt.type,
},
(_, value) => (typeof value === 'bigint' ? Number(value) : value)
)

records.push({
Data: UTF8.fromString(
JSON.stringify({
// This is bad naming but not worth a breaking change
carCid: message.toString(),
invocationCid: invocation.cid.toString(),
task: task.toString(),
value: {
att: invocation.capabilities,
aud: invocation.audience.did(),
iss: invocation.issuer.did(),
cid: invocation.cid.toString(),
},
out: receipt.out,
ts: Date.now(),
type: stream.receipt.type,
}, (_, value) => typeof value === "bigint" ? Number(value) : value )
),
Data: UTF8.fromString(data),
PartitionKey: partitionKey(member),
})
}
Expand Down

0 comments on commit 6387690

Please sign in to comment.