Skip to content

Feat/511 Implement Data Collection and Visualization for Web3.Storage Measurement Batch #560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion common/telemetry.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@ const networkInfoWriteClient = influx.getWriteApi(
's' // precision
)

// Add new write client for batch metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not add new bucket and write client, rather let's just extend existing publish metric.

const batchMetricsWriteClient = influx.getWriteApi(
'Filecoin Station', // org
'spark-batch-metrics', // bucket
'ns' // precision
)

setInterval(() => {
publishWriteClient.flush().catch(console.error)
networkInfoWriteClient.flush().catch(console.error)
batchMetricsWriteClient.flush().catch(console.error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
batchMetricsWriteClient.flush().catch(console.error)

We won't need this one if we extend existing publish metric.

}, 10_000).unref()

const recordFn = (client, name, fn) => {
Expand All @@ -43,5 +51,6 @@ export {
publishWriteClient,
networkInfoWriteClient,
recordPublishTelemetry,
recordNetworkInfoTelemetry
recordNetworkInfoTelemetry,
batchMetricsWriteClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
recordNetworkInfoTelemetry,
batchMetricsWriteClient
recordNetworkInfoTelemetry

We won't need this one if we extend existing publish metric.

}
17 changes: 17 additions & 0 deletions publish/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ export const publish = async ({

logger.log(`Publishing ${measurements.length} measurements. Total unpublished: ${totalCount}. Batch size: ${maxMeasurements}.`)

// Calculate batch size in bytes
const batchSizeBytes = Buffer.byteLength(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you found some other ways to calculate batch size without serialising objects to JSON? Depending on the batch size that might create consume a lot of memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

We have the following code few lines below:

  const file = new File(
    [measurements.map(m => JSON.stringify(m)).join('\n')],
    'measurements.ndjson',
    { type: 'application/json' }
  )

Please refactor it so that we create only one copy of measurements.map(m => JSON.stringify(m)).join('\n').

measurements.map(m => JSON.stringify(m)).join('\n')
)

// Share measurements
const start = new Date()
const file = new File(
Expand Down Expand Up @@ -126,7 +131,9 @@ export const publish = async ({

logger.log('Done!')

// Enhanced telemetry recording with separate batch metrics
recordTelemetry('publish', point => {
// Existing metrics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extend this metric with new point that collects batch size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me correct that - we should add new fields to the existing point.

point.intField('round_index', roundIndex)
point.intField('measurements', measurements.length)
point.floatField('load', totalCount / maxMeasurements)
Expand All @@ -136,6 +143,16 @@ export const publish = async ({
)
point.intField('add_measurements_duration_ms', ieAddMeasurementsDuration)
})

// Separate batch metrics recording for better organization
recordTelemetry('batch_metrics', point => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's delete this new metric.

point.intField('batch_size_bytes', batchSizeBytes)
point.floatField('avg_measurement_size_bytes', batchSizeBytes / measurements.length)
point.intField('measurement_count', measurements.length)
point.tag('cid', cid.toString())
point.tag('round_index', roundIndex.toString())
Copy link
Member

@bajtos bajtos Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing CIDs and round indexes in tags will cause high cardinality that will degrade performance and/or increase our bill.

Quoting from https://docs.influxdata.com/influxdb/v2/write-data/best-practices/resolve-high-cardinality/

Tags containing highly variable information like unique IDs, hashes, and random strings lead to a large number of series, also known as high series cardinality. High series cardinality is a primary driver of high memory usage for many database workloads.
(...)
Review your tags to ensure each tag does not contain unique values for most entries.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the suggestion, I'll rectify the commit!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what @bajtos said.

point.timestamp(new Date())
})
}

const commitMeasurements = async ({ cid, ieContract, logger, stuckTransactionsCanceller }) => {
Expand Down
Loading