Skip to content

feat: Add filters to Contract Subscriptions #543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/db/contractSubscriptions/createContractSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@ interface CreateContractSubscriptionParams {
chainId: number;
contractAddress: string;
webhookId?: number;
processEventLogs: boolean;
filterEvents: string[];
processTransactionReceipts: boolean;
filterFunctions: string[];
}

export const createContractSubscription = async ({
chainId,
contractAddress,
webhookId,
processEventLogs,
filterEvents,
processTransactionReceipts,
filterFunctions,
}: CreateContractSubscriptionParams) => {
return prisma.contractSubscriptions.create({
data: {
chainId,
contractAddress,
webhookId,
processEventLogs,
filterEvents,
processTransactionReceipts,
filterFunctions,
},
include: {
webhook: true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- AlterTable
ALTER TABLE "contract_subscriptions" ADD COLUMN "filterEventLogs" TEXT[] DEFAULT ARRAY[]::TEXT[],
ADD COLUMN "parseEventLogs" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "parseTransactionReceipts" BOOLEAN NOT NULL DEFAULT true;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
Warnings:

- You are about to drop the column `filterEventLogs` on the `contract_subscriptions` table. All the data in the column will be lost.
- You are about to drop the column `parseEventLogs` on the `contract_subscriptions` table. All the data in the column will be lost.
- You are about to drop the column `parseTransactionReceipts` on the `contract_subscriptions` table. All the data in the column will be lost.

*/
-- AlterTable
ALTER TABLE "contract_subscriptions" DROP COLUMN "filterEventLogs",
DROP COLUMN "parseEventLogs",
DROP COLUMN "parseTransactionReceipts",
ADD COLUMN "filterEvents" TEXT[] DEFAULT ARRAY[]::TEXT[],
ADD COLUMN "processEventLogs" BOOLEAN NOT NULL DEFAULT true,
ADD COLUMN "processTransactionReceipts" BOOLEAN NOT NULL DEFAULT true;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "contract_subscriptions" ADD COLUMN "filterFunctions" TEXT[] DEFAULT ARRAY[]::TEXT[];
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "contract_transaction_receipts" ADD COLUMN "functionName" TEXT;
43 changes: 24 additions & 19 deletions src/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@ generator client {
}

model Configuration {
id String @id @default("default") @map("id")
id String @id @default("default") @map("id")
// Chains
chainOverrides String? @map("chainOverrides")
chainOverrides String? @map("chainOverrides")
// Tx Processing
minTxsToProcess Int @map("minTxsToProcess")
maxTxsToProcess Int @map("maxTxsToProcess")
minTxsToProcess Int @map("minTxsToProcess")
maxTxsToProcess Int @map("maxTxsToProcess")
// Tx Updates
minedTxListenerCronSchedule String? @map("minedTxsCronSchedule")
maxTxsToUpdate Int @map("maxTxsToUpdate")
minedTxListenerCronSchedule String? @map("minedTxsCronSchedule")
maxTxsToUpdate Int @map("maxTxsToUpdate")
// Tx Retries
retryTxListenerCronSchedule String? @map("retryTxsCronSchedule")
minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry")
maxFeePerGasForRetries String @map("maxFeePerGasForRetries")
maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries")
maxRetriesPerTx Int @map("maxRetriesPerTx")
retryTxListenerCronSchedule String? @map("retryTxsCronSchedule")
minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry")
maxFeePerGasForRetries String @map("maxFeePerGasForRetries")
maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries")
maxRetriesPerTx Int @map("maxRetriesPerTx")
// Contract Indexer Updates
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")
indexerListenerCronSchedule String? @map("indexerListenerCronSchedule")
maxBlocksToIndex Int @default(25) @map("maxBlocksToIndex")
cursorDelaySeconds Int @default(2) @map("cursorDelaySeconds")
contractSubscriptionsRetryDelaySeconds String @default("10") @map("contractSubscriptionsRetryDelaySeconds")

// AWS
awsAccessKeyId String? @map("awsAccessKeyId")
Expand Down Expand Up @@ -188,10 +188,14 @@ model Relayers {
}

model ContractSubscriptions {
id String @id @default(uuid()) @map("id")
chainId Int
contractAddress String
webhookId Int?
id String @id @default(uuid()) @map("id")
chainId Int
contractAddress String
webhookId Int?
processEventLogs Boolean @default(true)
filterEvents String[] @default([]) // empty array = no filter
processTransactionReceipts Boolean @default(true)
filterFunctions String[] @default([]) // empty array = no filter

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
Expand Down Expand Up @@ -242,6 +246,7 @@ model ContractTransactionReceipts {
blockHash String
timestamp DateTime
data String
functionName String?

to String
from String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ const bodySchema = Type.Object({
examples: ["https://example.com/webhook"],
}),
),
processEventLogs: Type.Boolean({
description: "If true, parse event logs for this contract.",
}),
filterEvents: Type.Optional(
Type.Array(Type.String(), {
description:
"A case-sensitive list of event names to filter event logs. Parses all event logs by default.",
examples: ["Transfer"],
}),
),
processTransactionReceipts: Type.Boolean({
description: "If true, parse transaction receipts for this contract.",
}),
filterFunctions: Type.Optional(
Type.Array(Type.String(), {
description:
"A case-sensitive list of function names to filter transaction receipts. Parses all transaction receipts by default.",
examples: ["mintTo"],
}),
),
});

const responseSchema = Type.Object({
Expand Down Expand Up @@ -63,10 +83,26 @@ export async function addContractSubscription(fastify: FastifyInstance) {
},
},
handler: async (request, reply) => {
const { chain, contractAddress, webhookUrl } = request.body;
const {
chain,
contractAddress,
webhookUrl,
processEventLogs,
filterEvents = [],
processTransactionReceipts,
filterFunctions = [],
} = request.body;

const chainId = await getChainIdFromChain(chain);
const standardizedContractAddress = contractAddress.toLowerCase();

// Must parse logs or receipts.
if (!processEventLogs && !processTransactionReceipts) {
throw createCustomError(
"Contract Subscriptions must parse event logs and/or receipts.",
StatusCodes.BAD_REQUEST,
"BAD_REQUEST",
);
}

// If not currently indexed, upsert the latest block number.
const subscribedChainIds = await getContractSubscriptionsUniqueChainIds();
Expand Down Expand Up @@ -103,8 +139,12 @@ export async function addContractSubscription(fastify: FastifyInstance) {
// Create the contract subscription.
const contractSubscription = await createContractSubscription({
chainId,
contractAddress: standardizedContractAddress,
contractAddress: contractAddress.toLowerCase(),
webhookId,
processEventLogs,
filterEvents,
processTransactionReceipts,
filterFunctions,
});

reply.status(StatusCodes.OK).send({
Expand Down
2 changes: 1 addition & 1 deletion src/server/routes/system/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const getFeatures = (): EngineFeature[] => {
const features: EngineFeature[] = [];

if (env.ENABLE_KEYPAIR_AUTH) features.push("KEYPAIR_AUTH");
// Contract subscriptions requires Redis.
// Contract Subscriptions requires Redis.
if (redis) features.push("CONTRACT_SUBSCRIPTIONS");

return features;
Expand Down
8 changes: 8 additions & 0 deletions src/server/schemas/contractSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ export const contractSubscriptionSchema = Type.Object({
chainId: Type.Number(),
contractAddress: Type.String(),
webhook: Type.Optional(WebhookSchema),
processEventLogs: Type.Boolean(),
filterEvents: Type.Array(Type.String()),
processTransactionReceipts: Type.Boolean(),
filterFunctions: Type.Array(Type.String()),
createdAt: Type.Unsafe<Date>({
type: "string",
format: "date",
Expand All @@ -22,5 +26,9 @@ export const toContractSubscriptionSchema = (
webhook: contractSubscription.webhook
? toWebhookSchema(contractSubscription.webhook)
: undefined,
processEventLogs: contractSubscription.processEventLogs,
filterEvents: contractSubscription.filterEvents,
processTransactionReceipts: contractSubscription.processTransactionReceipts,
filterFunctions: contractSubscription.filterFunctions,
createdAt: contractSubscription.createdAt,
});
7 changes: 6 additions & 1 deletion src/worker/queues/processEventLogsQueue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Queue } from "bullmq";
import SuperJSON from "superjson";
import { Address } from "thirdweb";
import { getConfig } from "../../utils/cache/getConfig";
import { redis } from "../../utils/redis/redis";
import { defaultJobOptions } from "./queues";
Expand All @@ -17,9 +18,13 @@ const _queue = redis
})
: null;

// Each job handles a block range for a given chain, filtered by addresses + events.
export type EnqueueProcessEventLogsData = {
chainId: number;
contractAddresses: string[];
filters: {
address: Address;
events: string[];
}[];
fromBlock: number; // inclusive
toBlock: number; // inclusive
};
Expand Down
7 changes: 6 additions & 1 deletion src/worker/queues/processTransactionReceiptsQueue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Queue } from "bullmq";
import SuperJSON from "superjson";
import { Address } from "thirdweb";
import { getConfig } from "../../utils/cache/getConfig";
import { redis } from "../../utils/redis/redis";
import { defaultJobOptions } from "./queues";
Expand All @@ -18,9 +19,13 @@ const _queue = redis
})
: null;

// Each job handles a block range for a given chain, filtered by addresses + events.
export type EnqueueProcessTransactionReceiptsData = {
chainId: number;
contractAddresses: string[];
filters: {
address: Address;
functions: string[];
}[];
fromBlock: number; // inclusive
toBlock: number; // inclusive
};
Expand Down
67 changes: 41 additions & 26 deletions src/worker/tasks/chainIndexer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { StaticJsonRpcBatchProvider } from "@thirdweb-dev/sdk";
import { Address } from "thirdweb";
import { getBlockForIndexing } from "../../db/chainIndexers/getChainIndexer";
import { upsertChainIndexer } from "../../db/chainIndexers/upsertChainIndexer";
import { prisma } from "../../db/client";
Expand Down Expand Up @@ -32,7 +33,7 @@ export const createChainIndexerTask = async (args: {
const currentBlockNumber =
(await provider.getBlockNumber()) - toBlockOffset;

// Limit toBlock to avoid hitting rate or file size limits when querying logs.
// Limit toBlock to avoid hitting rate or block range limits when querying logs.
const toBlock = Math.min(
currentBlockNumber,
fromBlock + maxBlocksToIndex,
Expand All @@ -43,11 +44,9 @@ export const createChainIndexerTask = async (args: {
return;
}

// Ensuring that the block data exists.
// Sometimes the RPC providers nodes are aware of the latest block
// but the block data is not available yet.
// Ensure that the block data exists.
// Sometimes the RPC nodes do not yet return data for the latest block.
const block = await provider.getBlockWithTransactions(toBlock);

if (!block) {
logger({
service: "worker",
Expand All @@ -59,32 +58,48 @@ export const createChainIndexerTask = async (args: {
return;
}

// Get contract addresses to filter event logs and transaction receipts by.
const contractSubscriptions = await getContractSubscriptionsByChainId(
chainId,
true,
);
const contractAddresses = [
...new Set<string>(
contractSubscriptions.map(
(subscription) => subscription.contractAddress,
),
),
];

await enqueueProcessEventLogs({
chainId,
fromBlock,
toBlock,
contractAddresses,
});
// Identify contract addresses + event names to parse event logs, if any.
const eventLogFilters: {
address: Address;
events: string[];
}[] = contractSubscriptions
.filter((c) => c.processEventLogs)
.map((c) => ({
address: c.contractAddress as Address,
events: c.filterEvents,
}));
if (eventLogFilters.length > 0) {
await enqueueProcessEventLogs({
chainId,
fromBlock,
toBlock,
filters: eventLogFilters,
});
}

await enqueueProcessTransactionReceipts({
chainId,
fromBlock,
toBlock,
contractAddresses,
});
// Identify addresses + function names to parse transaction receipts, if any.
const transactionReceiptFilters: {
address: Address;
functions: string[];
}[] = contractSubscriptions
.filter((c) => c.processTransactionReceipts)
.map((c) => ({
address: c.contractAddress as Address,
functions: c.filterFunctions,
}));
if (transactionReceiptFilters.length > 0) {
await enqueueProcessTransactionReceipts({
chainId,
fromBlock,
toBlock,
filters: transactionReceiptFilters,
});
}

// Update the latest block number.
try {
Expand All @@ -103,7 +118,7 @@ export const createChainIndexerTask = async (args: {
}
},
{
timeout: 5 * 60000, // 3 minutes timeout
timeout: 60 * 1000, // 1 minute timeout
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lowering timeout since this worker simply enqueues jobs now.

},
);
} catch (err: any) {
Expand Down
Loading
Loading