Skip to content

Optimize DB Schema & Query for Top-Earning Leaderboard #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions db/migrations/008.do.create-participants-table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE participants (
id SERIAL PRIMARY KEY,
participant_address TEXT NOT NULL UNIQUE
);
9 changes: 9 additions & 0 deletions db/migrations/009.do.backfill-participants.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Backfill existing participant addresses from daily_scheduled_rewards
INSERT INTO participants (participant_address)
SELECT DISTINCT participant_address FROM daily_scheduled_rewards
ON CONFLICT (participant_address) DO NOTHING;

-- Backfill existing participant addresses from daily_reward_transfers
INSERT INTO participants (participant_address)
SELECT DISTINCT to_address FROM daily_reward_transfers
ON CONFLICT (participant_address) DO NOTHING;
46 changes: 46 additions & 0 deletions db/migrations/010.do.migrate-foreign-keys.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- Step 1: Add foreign key columns
ALTER TABLE daily_scheduled_rewards ADD COLUMN participant_id INT;
ALTER TABLE daily_reward_transfers ADD COLUMN to_address_id INT;

-- Step 2: Populate the new foreign key columns
UPDATE daily_scheduled_rewards dsr
SET participant_id = p.id
FROM participants p
WHERE dsr.participant_address = p.participant_address;

UPDATE daily_reward_transfers drt
SET to_address_id = p.id
FROM participants p
WHERE drt.to_address = p.participant_address;

-- Step 3: Replace Primary Keys
ALTER TABLE daily_scheduled_rewards
DROP CONSTRAINT daily_scheduled_rewards_pkey;
ALTER TABLE daily_scheduled_rewards
ADD PRIMARY KEY (day, participant_id);

ALTER TABLE daily_reward_transfers
DROP CONSTRAINT daily_reward_transfers_pkey;
ALTER TABLE daily_reward_transfers
ADD PRIMARY KEY (day, to_address_id);

-- Step 4: Add Foreign Key Constraints
ALTER TABLE daily_scheduled_rewards
ADD CONSTRAINT fk_dsr_participant FOREIGN KEY (participant_id)
REFERENCES participants(id) ON DELETE CASCADE;

ALTER TABLE daily_reward_transfers
ADD CONSTRAINT fk_drt_to_address FOREIGN KEY (to_address_id)
REFERENCES participants(id) ON DELETE CASCADE;

-- Step 5: Enforce NOT NULL Constraint
ALTER TABLE daily_scheduled_rewards ALTER COLUMN participant_id SET NOT NULL;
ALTER TABLE daily_reward_transfers ALTER COLUMN to_address_id SET NOT NULL;

-- Step 6: Drop old indexes referencing participant_address
DROP INDEX IF EXISTS daily_reward_transfers_to_address_day;
DROP INDEX IF EXISTS idx_daily_scheduled_rewards_participant_address;

-- Step 7: Drop Old participant_address Columns (if they exist)
ALTER TABLE daily_scheduled_rewards DROP COLUMN IF EXISTS participant_address;
ALTER TABLE daily_reward_transfers DROP COLUMN IF EXISTS to_address;
2 changes: 2 additions & 0 deletions db/migrations/011.do.index-daily-scheduled-rewards.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX CONCURRENTLY idx_daily_scheduled_rewards_pid_day
ON daily_scheduled_rewards (participant_id, day DESC);
2 changes: 2 additions & 0 deletions db/migrations/012.do.index-daily-reward-transfers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX CONCURRENTLY idx_daily_reward_transfers_to_address_day
ON daily_reward_transfers (to_address_id, day DESC);
68 changes: 56 additions & 12 deletions db/test-helpers.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import { mapParticipantsToIds } from '@filecoin-station/spark-evaluate/lib/platform-stats.js'

/**
* @param {import('./typings.js').Queryable} pgPool
* Populate daily participants in spark_evaluate database
*
* @param {import('./typings.js').PgPoolEvaluate} pgPool
* @param {string} day
* @param {string[]} participantAddresses
*/
export const givenDailyParticipants = async (pgPool, day, participantAddresses) => {
export const givenDailyParticipants = async (
pgPool,
day,
participantAddresses
) => {
const ids = await mapParticipantsToIds(pgPool, new Set(participantAddresses))
await pgPool.query(`

await pgPool.query(
`
INSERT INTO daily_participants (day, participant_id)
SELECT $1 as day, UNNEST($2::INT[]) AS participant_id
ON CONFLICT DO NOTHING
`, [
day,
Array.from(ids.values())
])
`,
[day, Array.from(ids.values())]
)
}

/**
Expand All @@ -23,12 +30,49 @@ export const givenDailyParticipants = async (pgPool, day, participantAddresses)
* @param {number} count
*/
export const givenDailyDesktopUsers = async (pgPool, day, count) => {
await pgPool.query(`
await pgPool.query(
`
INSERT INTO daily_desktop_users (day, user_count)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
`, [
day,
count
])
`,
[day, count]
)
}

// Map addresses and insert into daily_scheduled_rewards
export const givenScheduledRewards = async (pgClient, day, rewardsMap) => {
const addresses = Array.from(rewardsMap.keys())
const addressMap = await mapParticipantsToIds(pgClient, new Set(addresses))

for (const [address, rewards] of rewardsMap.entries()) {
const id = addressMap.get(address)
await pgClient.query(
`
INSERT INTO daily_scheduled_rewards (day, participant_id, scheduled_rewards)
VALUES ($1, $2, $3)
`,
[day, id, rewards]
)
}
}

// Map address and insert into daily_reward_transfers
export const givenRewardTransfer = async (
pgClient,
day,
address,
amount,
lastCheckedBlock = 0
) => {
const addressMap = await mapParticipantsToIds(pgClient, new Set([address]))
const id = addressMap.get(address)

await pgClient.query(
`
INSERT INTO daily_reward_transfers (day, to_address_id, amount, last_checked_block)
VALUES ($1, $2, $3, $4)
`,
[day, id, amount, lastCheckedBlock]
)
}
73 changes: 73 additions & 0 deletions observer/lib/map-participants-to-ids.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// This is a copy of the code from spark-evaluate:
// https://github.com/CheckerNetwork/spark-evaluate/blob/7548057f3c9609c4bc52baf896b0a85d7a7f8197/lib/platform-stats.js#L154-L219

import assert from 'node:assert'
import createDebug from 'debug'
const debug = createDebug('spark:observer:map-participants-to-ids')

/**
* @param {import('@filecoin-station/spark-stats-db').Queryable} pgClient
* @param {Set<string>} participantsSet
* @returns {Promise<Map<string, number>>} A map of participant addresses to ids.
*/
export const mapParticipantsToIds = async (pgClient, participantsSet) => {
debug('Mapping participants to id, count=%s', participantsSet.size)

/** @type {Map<string, number>} */
const participantsMap = new Map()

// TODO: We can further optimise performance of this function by using
// an in-memory LRU cache. Our network has currently ~2k participants,
// we need ~50 bytes for each (address, id) pair, that's only ~100KB of data.

// TODO: passing the entire list of participants as a single query parameter
// will probably not scale beyond several thousands of addresses. We will
// need to rework the queries to split large arrays into smaller batches.

// In most rounds, we have already seen most of the participant addresses
// If we use "INSERT...ON CONFLICT", then PG increments id counter even for
// existing addresses where we end up skipping the insert. This could quickly
// exhaust the space of all 32bit integers.
// Solution: query the table for know records before running the insert.
//
// Caveat: In my testing, this query was not able to leverage the (unique)
// index on participants.participant_address and performed a full table scan
// after the array grew past ~10 items. If this becomes a problem, we can
// introduce the LRU cache mentioned above.
const { rows: found } = await pgClient.query(
'SELECT * FROM participants WHERE participant_address = ANY($1::TEXT[])',
[Array.from(participantsSet.values())]
)
debug('Known participants count=%s', found.length)

// eslint-disable-next-line camelcase
for (const { id, participant_address } of found) {
participantsMap.set(participant_address, id)
participantsSet.delete(participant_address)
}

debug('New participant addresses count=%s', participantsSet.size)

// Register the new addresses. Use "INSERT...ON CONFLICT" to handle the race condition
// where another client may have registered these addresses between our previous
// SELECT query and the next INSERT query.
const newAddresses = Array.from(participantsSet.values())
debug('Registering new participant addresses, count=%s', newAddresses.length)
const { rows: created } = await pgClient.query(`
INSERT INTO participants (participant_address)
SELECT UNNEST($1::TEXT[]) AS participant_address
ON CONFLICT(participant_address) DO UPDATE
-- this no-op update is needed to populate "RETURNING id, participant_address"
SET participant_address = EXCLUDED.participant_address
RETURNING id, participant_address
`, [
newAddresses
])

assert.strictEqual(created.length, newAddresses.length)
for (const { id, participant_address: participantAddress } of created) {
participantsMap.set(participantAddress, id)
}

return participantsMap
}
40 changes: 34 additions & 6 deletions observer/lib/observer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { updateDailyTransferStats } from './platform-stats.js'
import * as Sentry from '@sentry/node'
import assert from 'node:assert'
import { mapParticipantsToIds } from './map-participants-to-ids.js'

/**
* Observe the transfer events on the Filecoin blockchain
Expand All @@ -24,12 +25,32 @@ export const observeTransferEvents = async (pgPoolStats, ieContract, provider) =
const events = await ieContract.queryFilter(ieContract.filters.Transfer(), queryFromBlock)

console.log(`Found ${events.length} Transfer events`)

const filteredEvents = events.filter(isEventLog)

// gather addresses
const addresses = new Set()
for (const event of filteredEvents) {
addresses.add(event.args.to)
}

const addressMap = await mapParticipantsToIds(pgPoolStats, addresses)

// handle events now that every toAddress is guaranteed an ID
for (const event of events.filter(isEventLog)) {
const toAddress = event.args.to
const toAddressId = addressMap.get(toAddress)
if (!toAddressId) {
console.warn('Could not find or create participant for address:', toAddress)
continue
}

const transferEvent = {
toAddress: event.args.to,
toAddressId,
amount: event.args.amount
}
console.log('Transfer event:', transferEvent)

// 2) Update call to accommodate `to_address_id`
await updateDailyTransferStats(pgPoolStats, transferEvent, currentBlockNumber)
}

Expand Down Expand Up @@ -65,6 +86,11 @@ export const observeScheduledRewards = async (pgPools, ieContract, fetch = globa
JOIN daily_participants d ON p.id = d.participant_id
WHERE d.day >= now() - interval '3 days'
`)

// The query above fetched participant addresses from the spark_evaluate database
// Now we need to register those participants in the spark_stats database too
const addressToIdMap = await mapParticipantsToIds(pgPools.stats, new Set(rows.map(r => r.participant_address)))

for (const { participant_address: address } of rows) {
let scheduledRewards
try {
Expand All @@ -79,13 +105,15 @@ export const observeScheduledRewards = async (pgPools, ieContract, fetch = globa
continue
}
console.log('Scheduled rewards for', address, scheduledRewards)
const participantId = addressToIdMap.get(address)

await pgPools.stats.query(`
INSERT INTO daily_scheduled_rewards
(day, participant_address, scheduled_rewards)
(day, participant_id, scheduled_rewards)
VALUES (now(), $1, $2)
ON CONFLICT (day, participant_address) DO UPDATE SET
scheduled_rewards = EXCLUDED.scheduled_rewards
`, [address, scheduledRewards])
ON CONFLICT (day, participant_id) DO UPDATE SET
scheduled_rewards = EXCLUDED.scheduled_rewards
`, [participantId, scheduledRewards])
}
}

Expand Down
12 changes: 7 additions & 5 deletions observer/lib/platform-stats.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
/**
*
* @param {import('@filecoin-station/spark-stats-db').Queryable} pgClient
* @param {Object} transferEvent
* @param {string} transferEvent.toAddress
* @param {number} transferEvent.amount
* @param {BigInt | number | string} transferEvent.amount
* @param {number} transferEvent.toAddressId
* @param {number} currentBlockNumber
*/
export const updateDailyTransferStats = async (pgClient, transferEvent, currentBlockNumber) => {
await pgClient.query(`
INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block)
INSERT INTO daily_reward_transfers
(day, to_address_id, amount, last_checked_block)
VALUES (now(), $1, $2, $3)
ON CONFLICT (day, to_address) DO UPDATE SET
ON CONFLICT (day, to_address_id) DO UPDATE SET
amount = daily_reward_transfers.amount + EXCLUDED.amount,
last_checked_block = EXCLUDED.last_checked_block
`, [transferEvent.toAddress, transferEvent.amount, currentBlockNumber])
`, [transferEvent.toAddressId, transferEvent.amount, currentBlockNumber])
}
Loading