Skip to content

Commit

Permalink
feat: more progress
Browse files Browse the repository at this point in the history
  • Loading branch information
alanshaw committed Jan 15, 2025
1 parent 127c19e commit ebe532e
Show file tree
Hide file tree
Showing 33 changed files with 856 additions and 396 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,17 @@ DID of the filecoin aggregator service.

URL of the filecoin aggregator service.

#### `CONTENT_CLAIMS_DID`
#### `INDEXING_SERVICE_DID`

DID of the [content claims service](https://github.com/web3-storage/content-claims).
DID of the [indexing service](https://github.com/storacha/indexing-service).

#### `CONTENT_CLAIMS_URL`
#### `INDEXING_SERVICE_URL`

URL of the [content claims service](https://github.com/web3-storage/content-claims).
URL of the [indexing service](https://github.com/storacha/indexing-service).

#### `INDEXING_SERVICE_PROOF`

Proof that the upload service can publish claims to the [indexing service](https://github.com/storacha/indexing-service).

#### `DEAL_TRACKER_DID`

Expand Down Expand Up @@ -253,7 +257,7 @@ To set a fallback value for `staging` or an ephmeral PR build use [`sst secrets

```sh
# set `PRIVATE_KEY` for any stage in us-east-2
$ npx sst secrets set-fallback --region us-east-2 PRIVATE_KEY "MgCZG7...="
$ npx sst secrets set --fallback --region us-east-2 PRIVATE_KEY "MgCZG7...="
```

**Note**: The fallback value can only be inherited by stages deployed in the same AWS account and region.
Expand Down
15 changes: 11 additions & 4 deletions billing/lib/ucan-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as ServiceBlobCaps from '@storacha/capabilities/web3.storage/blob'
import * as BlobCaps from '@storacha/capabilities/blob'
import * as SpaceBlobCaps from '@storacha/capabilities/space/blob'
import * as StoreCaps from '@storacha/capabilities/store'
import * as DID from '@ipld/dag-ucan/did'

/**
* Filters UCAN stream messages that are receipts for invocations that alter
Expand All @@ -19,8 +20,10 @@ export const findSpaceUsageDeltas = messages => {
let resource
/** @type {number|undefined} */
let size
if (isReceiptForCapability(message, ServiceBlobCaps.allocate) && isServiceBlobAllocateSuccess(message.out)) {
resource = message.value.att[0].nb?.space
if (isReceiptForCapability(message, BlobCaps.allocate) && isBlobAllocateSuccess(message.out)) {
const spaceDigestBytes = message.value.att[0].nb?.space
if (!spaceDigestBytes) throw new Error('missing space in allocate caveats')
resource = DID.decode(spaceDigestBytes).did()
size = message.out.ok.size
} else if (isReceiptForCapability(message, SpaceBlobCaps.remove) && isSpaceBlobRemoveSuccess(message.out)) {
resource = /** @type {import('@ucanto/interface').DID} */ (message.value.att[0].with)
Expand Down Expand Up @@ -106,6 +109,10 @@ export const storeSpaceUsageDeltas = async (deltas, ctx) => {
spaceDiffs.push(...res.ok)
}

if (spaceDiffs.length === 0) {
return { ok: 'no space diffs to store', error: undefined }
}

console.log(`Total space diffs to store: ${spaceDiffs.length}`)
return ctx.spaceDiffStore.batchPut(spaceDiffs)
}
Expand All @@ -120,7 +127,7 @@ const isReceipt = m => m.type === 'receipt'
* @param {import('@ucanto/interface').Result} r
* @returns {r is { ok: import('@storacha/capabilities/types').BlobAllocateSuccess }}
*/
const isServiceBlobAllocateSuccess = r =>
const isBlobAllocateSuccess = r =>
!r.error &&
r.ok != null &&
typeof r.ok === 'object' &&
Expand Down
2 changes: 2 additions & 0 deletions billing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
"@aws-sdk/client-dynamodb": "^3.515.0",
"@aws-sdk/client-sqs": "^3.515.0",
"@aws-sdk/util-dynamodb": "^3.515.0",
"@ipld/dag-ucan": "^3.4.0",
"@sentry/serverless": "^7.74.1",
"@storacha/capabilities": "^1.2.0",
"@ucanto/interface": "^10.0.1",
"@ucanto/server": "^10.0.0",
"big.js": "^6.2.1",
Expand Down
2 changes: 1 addition & 1 deletion billing/test/helpers/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { randomInteger } from './math.js'
*/
export const randomConsumer = async (base = {}) => ({
consumer: await randomDID(),
provider: Schema.did({ method: 'web' }).from(['did:web:web3.storage', 'did:web:nft.storage'][randomInteger(0, 2)]),
provider: Schema.did({ method: 'web' }).from(['did:web:upload.storacha.network', 'did:web:web3.storage', 'did:web:nft.storage'][randomInteger(0, 2)]),
subscription: randomLink().toString(),
customer: randomCustomer().customer,
cause: randomLink(),
Expand Down
2 changes: 1 addition & 1 deletion billing/test/helpers/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { randomInteger } from './math.js'
*/
export const randomSubscription = async (base = {}) => ({
customer: randomDIDMailto(),
provider: Schema.did({ method: 'web' }).from(['did:web:web3.storage', 'did:web:nft.storage'][randomInteger(0, 2)]),
provider: Schema.did({ method: 'web' }).from(['did:web:upload.storacha.network', 'did:web:web3.storage', 'did:web:nft.storage'][randomInteger(0, 2)]),
subscription: randomLink().toString(),
cause: randomLink(),
insertedAt: new Date(),
Expand Down
2 changes: 1 addition & 1 deletion billing/test/lib/customer-billing-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { randomSubscription } from '../helpers/subscription.js'
export const test = {
'should queue all spaces for a customer': async (/** @type {import('entail').assert} */ assert, ctx) => {
const customer = randomCustomer()
const provider = 'did:web:web3.storage'
const provider = 'did:web:upload.storacha.network'

const consumers = await Promise.all([
randomConsumer({ provider }),
Expand Down
2 changes: 1 addition & 1 deletion billing/test/lib/ucan-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ export const test = {

const deltas = findSpaceUsageDeltas(receipts)
const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx)
assert.equal(storeDeltasRes.error?.name, 'InsufficientRecords')
assert.equal(storeDeltasRes.ok, 'no space diffs to store')

const res = await ctx.spaceDiffStore.list({
provider: consumer.provider,
Expand Down
10 changes: 5 additions & 5 deletions filecoin/functions/handle-cron-tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Sentry.AWSLambda.init({
const AWS_REGION = process.env.AWS_REGION || 'us-west-2'

export async function handleCronTick () {
const { did, pieceTableName, workflowBucketName, invocationBucketName, aggregatorDid, storefrontProof } = getEnv()
const { did, pieceTableName, agentMessageBucketName, agentIndexBucketName, aggregatorDid, storefrontProof } = getEnv()
const { PRIVATE_KEY: privateKey } = Config

// create context
Expand All @@ -40,8 +40,8 @@ export async function handleCronTick () {
const context = {
id,
pieceStore: createPieceTable(AWS_REGION, pieceTableName),
taskStore: createTaskStore(AWS_REGION, invocationBucketName, workflowBucketName),
receiptStore: createReceiptStore(AWS_REGION, invocationBucketName, workflowBucketName),
taskStore: createTaskStore(AWS_REGION, agentIndexBucketName, agentMessageBucketName),
receiptStore: createReceiptStore(AWS_REGION, agentIndexBucketName, agentMessageBucketName),
aggregatorId: DID.parse(aggregatorDid),
}

Expand All @@ -64,8 +64,8 @@ function getEnv () {
return {
did: mustGetEnv('DID'),
pieceTableName: mustGetEnv('PIECE_TABLE_NAME'),
workflowBucketName: mustGetEnv('WORKFLOW_BUCKET_NAME'),
invocationBucketName: mustGetEnv('INVOCATION_BUCKET_NAME'),
agentMessageBucketName: mustGetEnv('AGENT_MESSAGE_BUCKET_NAME'),
agentIndexBucketName: mustGetEnv('AGENT_INDEX_BUCKET_NAME'),
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
storefrontProof: process.env.PROOF,
}
Expand Down
41 changes: 15 additions & 26 deletions filecoin/functions/handle-piece-insert-to-content-claim.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ import * as Sentry from '@sentry/serverless'
import { Config } from 'sst/node/config'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import * as Delegation from '@ucanto/core/delegation'
import { fromString } from 'uint8arrays/from-string'
import * as DID from '@ipld/dag-ucan/did'

import * as Link from 'multiformats/link'
import { base64 } from 'multiformats/bases/base64'
import * as storefrontEvents from '@storacha/filecoin-api/storefront/events'

import { decodeRecord } from '../store/piece.js'
import { getServiceConnection, getServiceSigner } from '../service.js'
import { mustGetEnv } from '../../lib/env.js'
Expand All @@ -25,8 +23,8 @@ Sentry.AWSLambda.init({
* @param {import('aws-lambda').DynamoDBStreamEvent} event
*/
async function pieceCidReport (event) {
const { contentClaimsDid, contentClaimsUrl, contentClaimsProof } = getEnv()
const { CONTENT_CLAIMS_PRIVATE_KEY: contentClaimsPrivateKey } = Config
const { indexingServiceDid, indexingServiceUrl, indexingServiceProof } = getEnv()
const { PRIVATE_KEY: privateKey } = Config

const records = parseDynamoDbEvent(event)
if (records.length > 1) {
Expand All @@ -39,30 +37,21 @@ async function pieceCidReport (event) {
const record = decodeRecord(storeRecord)

const connection = getServiceConnection({
did: contentClaimsDid,
url: contentClaimsUrl
did: indexingServiceDid,
url: indexingServiceUrl
})
let claimsIssuer = getServiceSigner({
privateKey: contentClaimsPrivateKey
})
const claimsProofs = []
if (contentClaimsProof) {
const proof = await Delegation.extract(fromString(contentClaimsProof, 'base64pad'))
if (!proof.ok) throw new Error('failed to extract proof', { cause: proof.error })
claimsProofs.push(proof.ok)
} else {
// if no proofs, we must be using the service private key to sign
claimsIssuer = claimsIssuer.withDID(DID.parse(contentClaimsDid).did())
}
const cid = Link.parse(indexingServiceProof, base64)
const proof = await Delegation.extract(cid.multihash.digest)
if (!proof.ok) throw new Error('failed to extract proof', { cause: proof.error })

const context = {
claimsService: {
connection,
invocationConfig: {
issuer: claimsIssuer,
issuer: getServiceSigner({ privateKey }),
audience: connection.id,
with: claimsIssuer.did(),
proofs: claimsProofs
with: connection.id.did(),
proofs: [proof.ok]
},
},
}
Expand All @@ -89,9 +78,9 @@ export const main = Sentry.AWSLambda.wrapHandler(pieceCidReport)
*/
function getEnv() {
return {
contentClaimsDid: mustGetEnv('CONTENT_CLAIMS_DID'),
contentClaimsUrl: mustGetEnv('CONTENT_CLAIMS_URL'),
contentClaimsProof: process.env.CONTENT_CLAIMS_PROOF,
indexingServiceDid: mustGetEnv('INDEXING_SERVICE_DID'),
indexingServiceUrl: mustGetEnv('INDEXING_SERVICE_URL'),
indexingServiceProof: mustGetEnv('INDEXING_SERVICE_PROOF'),
}
}

Expand Down
12 changes: 6 additions & 6 deletions filecoin/functions/metrics-aggregate-offer-and-accept-total.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ async function handler(event) {
const ucanInvocations = parseKinesisEvent(event)
const {
metricsTableName,
invocationBucketName,
workflowBucketName,
agentMessageBucketName,
agentIndexBucketName,
startEpochMs
} = getLambdaEnv()

const filecoinMetricsStore = createFilecoinMetricsTable(AWS_REGION, metricsTableName)
const workflowStore = createWorkflowStore(AWS_REGION, workflowBucketName)
const invocationStore = createInvocationStore(AWS_REGION, invocationBucketName)
const workflowStore = createWorkflowStore(AWS_REGION, agentMessageBucketName)
const invocationStore = createInvocationStore(AWS_REGION, agentIndexBucketName)

await Promise.all([
updateAggregateOfferTotal(ucanInvocations, {
Expand All @@ -50,8 +50,8 @@ async function handler(event) {
function getLambdaEnv () {
return {
metricsTableName: mustGetEnv('METRICS_TABLE_NAME'),
workflowBucketName: mustGetEnv('WORKFLOW_BUCKET_NAME'),
invocationBucketName: mustGetEnv('INVOCATION_BUCKET_NAME'),
agentMessageBucketName: mustGetEnv('AGENT_MESSAGE_BUCKET_NAME'),
agentIndexBucketName: mustGetEnv('AGENT_INDEX_BUCKET_NAME'),
startEpochMs: process.env.START_FILECOIN_METRICS_EPOCH_MS ? parseInt(process.env.START_FILECOIN_METRICS_EPOCH_MS) : undefined
}
}
Expand Down
18 changes: 9 additions & 9 deletions filecoin/store/receipt.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@ import { getS3Client } from '../../lib/aws/s3.js'
* handled receipts.
*
* @param {string} region
* @param {string} invocationBucketName
* @param {string} workflowBucketName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
*/
export function createReceiptStore(region, invocationBucketName, workflowBucketName, options = {}) {
export function createReceiptStore(region, agentIndexBucketName, agentMessageBucketName, options = {}) {
const s3client = getS3Client({
region,
...options,
})
return useReceiptStore(s3client, invocationBucketName, workflowBucketName)
return useReceiptStore(s3client, agentIndexBucketName, agentMessageBucketName)
}

/**
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} invocationBucketName
* @param {string} workflowBucketName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @returns {import('@storacha/filecoin-api/storefront/api').ReceiptStore}
*/
export const useReceiptStore = (s3client, invocationBucketName, workflowBucketName) => {
export const useReceiptStore = (s3client, agentIndexBucketName, agentMessageBucketName) => {
const store = Store.open({
connection: { channel: s3client },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
buckets: {
index: { name: invocationBucketName },
message: { name: workflowBucketName },
index: { name: agentIndexBucketName },
message: { name: agentMessageBucketName },
}
})

Expand Down
18 changes: 9 additions & 9 deletions filecoin/store/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@ import { getS3Client } from '../../lib/aws/s3.js'
* handled Tasks and their indexes.
*
* @param {string} region
* @param {string} invocationBucketName
* @param {string} workflowBucketName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @param {import('@aws-sdk/client-s3').ServiceInputTypes} [options]
*/
export function createTaskStore(region, invocationBucketName, workflowBucketName, options = {}) {
export function createTaskStore(region, agentIndexBucketName, agentMessageBucketName, options = {}) {
const s3client = getS3Client({
region,
...options,
})
return useTaskStore(s3client, invocationBucketName, workflowBucketName)
return useTaskStore(s3client, agentIndexBucketName, agentMessageBucketName)
}

/**
* @param {import('@aws-sdk/client-s3').S3Client} s3client
* @param {string} invocationBucketName
* @param {string} workflowBucketName
* @param {string} agentIndexBucketName
* @param {string} agentMessageBucketName
* @returns {import('@storacha/filecoin-api/storefront/api').TaskStore}
*/
export const useTaskStore = (s3client, invocationBucketName, workflowBucketName) => {
export const useTaskStore = (s3client, agentIndexBucketName, agentMessageBucketName) => {
const store = Store.open({
connection: { channel: s3client },
region: typeof s3client.config.region === 'string' ? s3client.config.region : 'us-west-2',
buckets: {
index: { name: invocationBucketName },
message: { name: workflowBucketName },
index: { name: agentIndexBucketName },
message: { name: agentMessageBucketName },
}
})

Expand Down
Loading

0 comments on commit ebe532e

Please sign in to comment.