Skip to content

Commit 6d57509

Browse files
committed
fix(fair-queue): prevent cardinality explosion when reporting metrics, creating large event loop lag
1 parent 4c3dfac commit 6d57509

File tree

2 files changed

+10
-62
lines changed

2 files changed

+10
-62
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 8 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
327327
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
328328
});
329329

330-
this.telemetry.recordEnqueue(
331-
this.telemetry.messageAttributes({
332-
queueId: options.queueId,
333-
tenantId: options.tenantId,
334-
messageId,
335-
})
336-
);
330+
this.telemetry.recordEnqueue();
337331

338332
this.logger.debug("Message enqueued", {
339333
queueId: options.queueId,
@@ -431,13 +425,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
431425
[FairQueueAttributes.SHARD_ID]: shardId.toString(),
432426
});
433427

434-
this.telemetry.recordEnqueueBatch(
435-
messageIds.length,
436-
this.telemetry.messageAttributes({
437-
queueId: options.queueId,
438-
tenantId: options.tenantId,
439-
})
440-
);
428+
this.telemetry.recordEnqueueBatch(messageIds.length);
441429

442430
this.logger.debug("Batch enqueued", {
443431
queueId: options.queueId,
@@ -1387,14 +1375,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13871375

13881376
// Record queue time
13891377
const queueTime = startTime - storedMessage.timestamp;
1390-
this.telemetry.recordQueueTime(
1391-
queueTime,
1392-
this.telemetry.messageAttributes({
1393-
queueId,
1394-
tenantId: storedMessage.tenantId,
1395-
messageId: storedMessage.id,
1396-
})
1397-
);
1378+
this.telemetry.recordQueueTime(queueTime);
13981379

13991380
// Build handler context
14001381
const handlerContext: MessageHandlerContext<z.infer<TPayloadSchema>> = {
@@ -1410,21 +1391,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14101391
},
14111392
complete: async () => {
14121393
await this.#completeMessage(storedMessage, queueId, queueKey, masterQueueKey, descriptor);
1413-
this.telemetry.recordComplete(
1414-
this.telemetry.messageAttributes({
1415-
queueId,
1416-
tenantId: storedMessage.tenantId,
1417-
messageId: storedMessage.id,
1418-
})
1419-
);
1420-
this.telemetry.recordProcessingTime(
1421-
Date.now() - startTime,
1422-
this.telemetry.messageAttributes({
1423-
queueId,
1424-
tenantId: storedMessage.tenantId,
1425-
messageId: storedMessage.id,
1426-
})
1427-
);
1394+
this.telemetry.recordComplete();
1395+
this.telemetry.recordProcessingTime(Date.now() - startTime);
14281396
},
14291397
release: async () => {
14301398
await this.#releaseMessage(
@@ -1550,14 +1518,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
15501518
descriptor: QueueDescriptor,
15511519
error?: Error
15521520
): Promise<void> {
1553-
this.telemetry.recordFailure(
1554-
this.telemetry.messageAttributes({
1555-
queueId,
1556-
tenantId: storedMessage.tenantId,
1557-
messageId: storedMessage.id,
1558-
attempt: storedMessage.attempt,
1559-
})
1560-
);
1521+
this.telemetry.recordFailure();
15611522

15621523
// Check retry strategy
15631524
if (this.retryStrategy) {
@@ -1588,14 +1549,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
15881549
await this.concurrencyManager.release(descriptor, storedMessage.id);
15891550
}
15901551

1591-
this.telemetry.recordRetry(
1592-
this.telemetry.messageAttributes({
1593-
queueId,
1594-
tenantId: storedMessage.tenantId,
1595-
messageId: storedMessage.id,
1596-
attempt: storedMessage.attempt + 1,
1597-
})
1598-
);
1552+
this.telemetry.recordRetry();
15991553

16001554
this.logger.debug("Message scheduled for retry", {
16011555
messageId: storedMessage.id,
@@ -1651,14 +1605,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
16511605
pipeline.hset(dlqDataKey, storedMessage.id, JSON.stringify(dlqMessage));
16521606
await pipeline.exec();
16531607

1654-
this.telemetry.recordDLQ(
1655-
this.telemetry.messageAttributes({
1656-
queueId: storedMessage.queueId,
1657-
tenantId: storedMessage.tenantId,
1658-
messageId: storedMessage.id,
1659-
attempt: storedMessage.attempt,
1660-
})
1661-
);
1608+
this.telemetry.recordDLQ();
16621609

16631610
this.logger.info("Message moved to DLQ", {
16641611
messageId: storedMessage.id,

packages/redis-worker/src/fair-queue/telemetry.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,8 @@ export class FairQueueTelemetry {
324324
// ============================================================================
325325

326326
/**
327-
* Create standard attributes for a message operation.
327+
* Create standard attributes for a message operation (for spans/traces).
328+
* Use this for span attributes where high cardinality is acceptable.
328329
*/
329330
messageAttributes(params: {
330331
queueId?: string;

0 commit comments

Comments
 (0)