-
Notifications
You must be signed in to change notification settings - Fork 93
feat: Move transaction webhooks to Redis #549
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,12 @@ | ||
import { Static } from "@sinclair/typebox"; | ||
import { PrismaTransaction } from "../../schema/prisma"; | ||
import { transactionResponseSchema } from "../../server/schemas/transaction"; | ||
import { Transactions } from "@prisma/client"; | ||
import { prisma } from "../client"; | ||
import { cleanTxs } from "./cleanTxs"; | ||
interface GetTxByIdsParams { | ||
queueIds: string[]; | ||
pgtx?: PrismaTransaction; | ||
} | ||
|
||
export const getTxByIds = async ({ | ||
queueIds, | ||
}: GetTxByIdsParams): Promise< | ||
Static<typeof transactionResponseSchema>[] | null | ||
> => { | ||
const tx = await prisma.transactions.findMany({ | ||
export const getTransactionsByQueueIds = async ( | ||
queueIds: string[], | ||
): Promise<Transactions[]> => { | ||
return await prisma.transactions.findMany({ | ||
where: { | ||
id: { | ||
in: queueIds, | ||
}, | ||
id: { in: queueIds }, | ||
}, | ||
}); | ||
|
||
if (!tx || tx.length === 0) { | ||
return null; | ||
} | ||
|
||
const cleanedTx = cleanTxs(tx); | ||
return cleanedTx; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,9 @@ import { prisma } from "../client"; | |
|
||
export const getAllWebhooks = async (): Promise<Webhooks[]> => { | ||
return await prisma.webhooks.findMany({ | ||
where: { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We never care about deleted webhooks so omit it in the query to simplify business logic. |
||
revokedAt: null, | ||
}, | ||
orderBy: { | ||
id: "asc", | ||
}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
import { Type } from "@sinclair/typebox"; | ||
import { Transactions } from "@prisma/client"; | ||
import { Static, Type } from "@sinclair/typebox"; | ||
|
||
// @TODO: rename to TransactionSchema | ||
export const transactionResponseSchema = Type.Object({ | ||
queueId: Type.Union([ | ||
Type.String({ | ||
|
@@ -198,3 +200,23 @@ export enum TransactionStatus { | |
// Tx was cancelled and will not be re-attempted. | ||
Cancelled = "cancelled", | ||
} | ||
|
||
export const toTransactionSchema = ( | ||
transaction: Transactions, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maps the internal DB "transaction" to the external "transaction" schema. |
||
): Static<typeof transactionResponseSchema> => ({ | ||
...transaction, | ||
queueId: transaction.id, | ||
queuedAt: transaction.queuedAt.toISOString(), | ||
sentAt: transaction.sentAt?.toISOString() || null, | ||
minedAt: transaction.minedAt?.toISOString() || null, | ||
cancelledAt: transaction.cancelledAt?.toISOString() || null, | ||
status: transaction.errorMessage | ||
? TransactionStatus.Errored | ||
: transaction.minedAt | ||
? TransactionStatus.Mined | ||
: transaction.cancelledAt | ||
? TransactionStatus.Cancelled | ||
: transaction.sentAt | ||
? TransactionStatus.Sent | ||
: TransactionStatus.Queued, | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,13 @@ | ||
import { | ||
ContractEventLogs, | ||
ContractTransactionReceipts, | ||
Transactions, | ||
Webhooks, | ||
} from "@prisma/client"; | ||
import { Queue } from "bullmq"; | ||
import SuperJSON from "superjson"; | ||
import { WebhooksEventTypes } from "../../schema/webhooks"; | ||
import { getWebhooksByEventType } from "../../utils/cache/getWebhook"; | ||
import { logger } from "../../utils/logger"; | ||
import { redis } from "../../utils/redis/redis"; | ||
import { defaultJobOptions } from "./queues"; | ||
|
@@ -26,8 +28,22 @@ export type EnqueueContractSubscriptionWebhookData = { | |
eventLog?: ContractEventLogs; | ||
transactionReceipt?: ContractTransactionReceipts; | ||
}; | ||
|
||
export type EnqueueTransactionWebhookData = { | ||
type: | ||
| WebhooksEventTypes.ALL_TX | ||
| WebhooksEventTypes.QUEUED_TX | ||
| WebhooksEventTypes.SENT_TX | ||
| WebhooksEventTypes.MINED_TX | ||
| WebhooksEventTypes.ERRORED_TX | ||
| WebhooksEventTypes.CANCELLED_TX; | ||
transaction: Transactions; | ||
}; | ||
|
||
// TODO: Add other webhook event types here. | ||
type EnqueueWebhookData = EnqueueContractSubscriptionWebhookData; | ||
type EnqueueWebhookData = | ||
| EnqueueContractSubscriptionWebhookData | ||
| EnqueueTransactionWebhookData; | ||
|
||
export interface WebhookJob { | ||
data: EnqueueWebhookData; | ||
|
@@ -38,15 +54,26 @@ export const enqueueWebhook = async (data: EnqueueWebhookData) => { | |
switch (data.type) { | ||
case WebhooksEventTypes.CONTRACT_SUBSCRIPTION: | ||
return enqueueContractSubscriptionWebhook(data); | ||
case WebhooksEventTypes.ALL_TX: | ||
case WebhooksEventTypes.QUEUED_TX: | ||
case WebhooksEventTypes.SENT_TX: | ||
case WebhooksEventTypes.MINED_TX: | ||
case WebhooksEventTypes.ERRORED_TX: | ||
case WebhooksEventTypes.CANCELLED_TX: | ||
return enqueueTransactionWebhook(data); | ||
default: | ||
logger({ | ||
service: "worker", | ||
level: "warn", | ||
message: `Unexpected webhook type: ${data.type}`, | ||
message: `Unexpected webhook type: ${(data as any).type}`, | ||
}); | ||
} | ||
}; | ||
|
||
/** | ||
* Contract Subscriptions webhooks | ||
*/ | ||
|
||
const enqueueContractSubscriptionWebhook = async ( | ||
data: EnqueueContractSubscriptionWebhookData, | ||
) => { | ||
|
@@ -88,3 +115,36 @@ const getContractSubscriptionWebhookIdempotencyKey = (args: { | |
} | ||
throw 'Must provide "eventLog" or "transactionReceipt".'; | ||
}; | ||
|
||
/** | ||
* Transaction webhooks | ||
*/ | ||
|
||
const enqueueTransactionWebhook = async ( | ||
data: EnqueueTransactionWebhookData, | ||
) => { | ||
if (!_queue) return; | ||
|
||
const webhooks = [ | ||
...(await getWebhooksByEventType(WebhooksEventTypes.ALL_TX)), | ||
...(await getWebhooksByEventType(data.type)), | ||
]; | ||
Comment on lines
+128
to
+131
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get all webhooks that match the exact type or |
||
|
||
for (const webhook of webhooks) { | ||
const job: WebhookJob = { data, webhook }; | ||
const serialized = SuperJSON.stringify(job); | ||
await _queue.add(`${data.type}:${webhook.id}`, serialized, { | ||
jobId: getTransactionWebhookIdempotencyKey({ | ||
webhook, | ||
eventType: data.type, | ||
queueId: data.transaction.id, | ||
}), | ||
}); | ||
} | ||
}; | ||
|
||
const getTransactionWebhookIdempotencyKey = (args: { | ||
webhook: Webhooks; | ||
eventType: WebhooksEventTypes; | ||
queueId: string; | ||
}) => `${args.webhook.url}:${args.eventType}:${args.queueId}`; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,12 @@ | ||
import { Static } from "@sinclair/typebox"; | ||
import { Job, Processor, Worker } from "bullmq"; | ||
import superjson from "superjson"; | ||
import { WebhooksEventTypes } from "../../schema/webhooks"; | ||
import { toEventLogSchema } from "../../server/schemas/eventLog"; | ||
import { | ||
toTransactionSchema, | ||
transactionResponseSchema, | ||
} from "../../server/schemas/transaction"; | ||
import { toTransactionReceiptSchema } from "../../server/schemas/transactionReceipt"; | ||
import { redis } from "../../utils/redis/redis"; | ||
import { WebhookResponse, sendWebhookRequest } from "../../utils/webhook"; | ||
|
@@ -11,33 +16,46 @@ import { | |
WebhookJob, | ||
} from "../queues/sendWebhookQueue"; | ||
|
||
interface WebhookBody { | ||
type: "event-log" | "transaction-receipt"; | ||
data: any; | ||
} | ||
|
||
const handler: Processor<any, void, string> = async (job: Job<string>) => { | ||
const { data, webhook } = superjson.parse<WebhookJob>(job.data); | ||
|
||
let resp: WebhookResponse | undefined; | ||
if (data.type === WebhooksEventTypes.CONTRACT_SUBSCRIPTION) { | ||
let webhookBody: WebhookBody; | ||
if (data.eventLog) { | ||
webhookBody = { | ||
type: "event-log", | ||
data: toEventLogSchema(data.eventLog), | ||
switch (data.type) { | ||
case WebhooksEventTypes.CONTRACT_SUBSCRIPTION: { | ||
let webhookBody: { | ||
type: "event-log" | "transaction-receipt"; | ||
data: any; | ||
}; | ||
} else if (data.transactionReceipt) { | ||
webhookBody = { | ||
type: "transaction-receipt", | ||
data: toTransactionReceiptSchema(data.transactionReceipt), | ||
}; | ||
} else { | ||
throw new Error( | ||
'Missing "eventLog" or "transactionReceipt" for CONTRACT_SUBSCRIPTION webhook.', | ||
); | ||
if (data.eventLog) { | ||
webhookBody = { | ||
type: "event-log", | ||
data: toEventLogSchema(data.eventLog), | ||
}; | ||
} else if (data.transactionReceipt) { | ||
webhookBody = { | ||
type: "transaction-receipt", | ||
data: toTransactionReceiptSchema(data.transactionReceipt), | ||
}; | ||
} else { | ||
throw new Error( | ||
'Missing "eventLog" or "transactionReceipt" for CONTRACT_SUBSCRIPTION webhook.', | ||
); | ||
} | ||
resp = await sendWebhookRequest(webhook, webhookBody); | ||
break; | ||
} | ||
|
||
case WebhooksEventTypes.ALL_TX: | ||
case WebhooksEventTypes.QUEUED_TX: | ||
case WebhooksEventTypes.SENT_TX: | ||
case WebhooksEventTypes.MINED_TX: | ||
case WebhooksEventTypes.ERRORED_TX: | ||
case WebhooksEventTypes.CANCELLED_TX: { | ||
const webhookBody: Static<typeof transactionResponseSchema> = | ||
toTransactionSchema(data.transaction); | ||
resp = await sendWebhookRequest(webhook, webhookBody); | ||
break; | ||
} | ||
resp = await sendWebhookRequest(webhook, webhookBody); | ||
} | ||
|
||
if (resp && !resp.ok) { | ||
|
@@ -52,7 +70,7 @@ const handler: Processor<any, void, string> = async (job: Job<string>) => { | |
let _worker: Worker | null = null; | ||
if (redis) { | ||
_worker = new Worker(SEND_WEBHOOK_QUEUE_NAME, handler, { | ||
concurrency: 1, | ||
concurrency: 10, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Increase concurrency now that there's more webhooks to send. Webhook calls are independent. |
||
connection: redis, | ||
}); | ||
logWorkerEvents(_worker); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified this to just return DB results.