refactor(analytics-core): add Delayed Event type#1836
refactor(analytics-core): add Delayed Event type#1836daniel-graham-amplitude wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Delayed events ignore flush backoff
- Delayed events are now only batched for immediate upload when their timeout has expired, preserving retry and throttle backoff behavior.
Or push these changes by commenting:
@cursor push 30149af37d
Preview (30149af37d)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -32,6 +32,7 @@
import { IDiagnosticsClient } from '../diagnostics/diagnostics-client';
import { isSuccessStatusCode } from '../utils/status-code';
import { getStacktrace } from '../utils/debug';
+import { DelayedPayload, Payload } from '../types/payload';
export interface Context {
event: Event;
@@ -198,8 +199,18 @@
this.resetSchedule();
const list: Context[] = [];
+ const delayed: Record<string, Context[]> = {};
const later: Context[] = [];
- this.queue.forEach((context) => (context.timeout === 0 ? list.push(context) : later.push(context)));
+ this.queue.forEach((context) => {
+ if (context.timeout === 0 && context.event.delay_id) {
+ delayed[context.event.delay_id] = delayed[context.event.delay_id] || [];
+ delayed[context.event.delay_id].push(context);
+ } else if (context.timeout === 0) {
+ list.push(context);
+ } else {
+ later.push(context);
+ }
+ });
const batches = chunk(list, this.config.flushQueueSize);
@@ -210,18 +221,26 @@
return await this.send(batch, useRetry);
}, Promise.resolve());
+ for (const delayId of Object.keys(delayed)) {
+ const delayedBatches = chunk(delayed[delayId], this.config.flushQueueSize);
+ await delayedBatches.reduce(async (promise, batch) => {
+ await promise;
+ return await this.send(batch, useRetry, delayId);
+ }, Promise.resolve());
+ }
+
// Mark current flush is done
this.flushId = null;
this.scheduleEvents(this.queue);
}
- async send(list: Context[], useRetry = true) {
+ async send(list: Context[], useRetry = true, delayId?: string) {
if (!this.config.apiKey) {
return this.fulfillRequest(list, 400, MISSING_API_KEY_MESSAGE);
}
- const payload = {
+ const payload: Payload = {
api_key: this.config.apiKey,
events: list.map((context) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
@@ -237,11 +256,15 @@
this.config.requestMetadata = new RequestMetadata();
try {
- const { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
+ let { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
const shouldCompressUploadBody = shouldCompressUploadBodyForRequest(
serverUrl,
this.config.enableRequestBodyCompression,
);
+ if (delayId) {
+ serverUrl = `${serverUrl}/delayed`;
+ (payload as DelayedPayload).id = delayId;
+ }
const res = await this.config.transportProvider.send(serverUrl, payload, shouldCompressUploadBody);
if (res === null) {
this.fulfillRequest(list, 0, UNEXPECTED_ERROR_MESSAGE);
diff --git a/packages/analytics-core/src/types/event/base-event.ts b/packages/analytics-core/src/types/event/base-event.ts
--- a/packages/analytics-core/src/types/event/base-event.ts
+++ b/packages/analytics-core/src/types/event/base-event.ts
@@ -52,4 +52,5 @@
android_app_set_id?: string;
extra?: { [key: string]: any };
groups?: { [key: string]: any } | undefined;
+ delay_id?: string;
}
diff --git a/packages/analytics-core/src/types/payload.ts b/packages/analytics-core/src/types/payload.ts
--- a/packages/analytics-core/src/types/payload.ts
+++ b/packages/analytics-core/src/types/payload.ts
@@ -12,3 +12,7 @@
client_upload_time?: string;
request_metadata?: RequestMetadata;
}
+
+export interface DelayedPayload extends Payload {
+ id: string;
+}
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -1700,4 +1700,198 @@
expect(result).toBe('');
});
});
+
+ describe('delayed events', () => {
+ const successResponse = {
+ status: Status.Success,
+ statusCode: 200,
+ body: {
+ eventsIngested: 1,
+ payloadSizeBytes: 1,
+ serverUploadTime: 1,
+ },
+ };
+
+ test('should send delayed events to /delayed endpoint', async () => {
+ const destination = new Destination();
+ const callback = jest.fn();
+ const delayId = 'delay-123';
+ const event = {
+ event_type: 'delayed_event',
+ delay_id: delayId,
+ };
+ const context = {
+ attempts: 0,
+ callback,
+ event,
+ timeout: 0,
+ };
+ const transportProvider = {
+ send: jest.fn().mockResolvedValue(successResponse),
+ };
+ await destination.setup({
+ ...useDefaultConfig(),
+ transportProvider,
+ apiKey: API_KEY,
+ serverUrl: AMPLITUDE_SERVER_URL,
+ });
+ destination.queue = [context];
+ await destination.flush(true);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ `${AMPLITUDE_SERVER_URL}/delayed`,
+ expect.objectContaining({
+ id: delayId,
+ events: [expect.objectContaining({ event_type: 'delayed_event', delay_id: delayId })],
+ }),
+ true,
+ );
+ expect(callback).toHaveBeenCalledWith({
+ event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ });
+
+ test('should send delayed events later when timeout is non-zero', async () => {
+ const destination = new Destination();
+ const delayId = 'delay-123';
+ destination.config = {
+ ...useDefaultConfig(),
+ };
+ destination.queue = [
+ {
+ attempts: 0,
+ callback: jest.fn(),
+ event: { event_type: 'delayed_event', delay_id: delayId },
+ timeout: 1000,
+ },
+ ];
+ const send = jest.spyOn(destination, 'send').mockReturnValueOnce(Promise.resolve());
+ const schedule = jest.spyOn(destination, 'schedule').mockImplementation(jest.fn);
+
+ await destination.flush(true);
+
+ expect(send).toHaveBeenCalledTimes(0);
+ expect(schedule).toHaveBeenCalledWith(1000);
+ });
+
+ test('should send /delayed and regular events on same flush', async () => {
+ const destination = new Destination();
+ const regularCallback = jest.fn();
+ const delayedCallback = jest.fn();
+ const delayId = 'delay-123';
+ const regularContext = {
+ attempts: 0,
+ callback: regularCallback,
+ event: { event_type: 'regular_event' },
+ timeout: 0,
+ };
+ const delayedContext = {
+ attempts: 0,
+ callback: delayedCallback,
+ event: { event_type: 'delayed_event', delay_id: delayId },
+ timeout: 0,
+ };
+ const transportProvider = {
+ send: jest.fn().mockResolvedValue(successResponse),
+ };
+ await destination.setup({
+ ...useDefaultConfig(),
+ transportProvider,
+ apiKey: API_KEY,
+ serverUrl: AMPLITUDE_SERVER_URL,
+ });
+ destination.queue = [regularContext, delayedContext];
+ await destination.flush(true);
+
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ AMPLITUDE_SERVER_URL,
+ expect.objectContaining({
+ events: [expect.objectContaining({ event_type: 'regular_event' })],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ `${AMPLITUDE_SERVER_URL}/delayed`,
+ expect.objectContaining({
+ id: delayId,
+ events: [expect.objectContaining({ event_type: 'delayed_event', delay_id: delayId })],
+ }),
+ true,
+ );
+ expect(regularCallback).toHaveBeenCalledWith({
+ event: regularContext.event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ expect(delayedCallback).toHaveBeenCalledWith({
+ event: delayedContext.event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ });
+
+ test('should send delayed events with different delay_ids on same flush', async () => {
+ const destination = new Destination();
+ const callbackA = jest.fn();
+ const callbackB = jest.fn();
+ const delayedContextA = {
+ attempts: 0,
+ callback: callbackA,
+ event: { event_type: 'delayed_event_a', delay_id: 'delay-a' },
+ timeout: 0,
+ };
+ const delayedContextB = {
+ attempts: 0,
+ callback: callbackB,
+ event: { event_type: 'delayed_event_b', delay_id: 'delay-b' },
+ timeout: 0,
+ };
+ const transportProvider = {
+ send: jest.fn().mockResolvedValue(successResponse),
+ };
+ await destination.setup({
+ ...useDefaultConfig(),
+ transportProvider,
+ apiKey: API_KEY,
+ serverUrl: AMPLITUDE_SERVER_URL,
+ });
+ destination.queue = [delayedContextA, delayedContextB];
+ await destination.flush(true);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ `${AMPLITUDE_SERVER_URL}/delayed`,
+ expect.objectContaining({
+ id: 'delay-a',
+ events: [expect.objectContaining({ event_type: 'delayed_event_a', delay_id: 'delay-a' })],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ `${AMPLITUDE_SERVER_URL}/delayed`,
+ expect.objectContaining({
+ id: 'delay-b',
+ events: [expect.objectContaining({ event_type: 'delayed_event_b', delay_id: 'delay-b' })],
+ }),
+ true,
+ );
+ expect(callbackA).toHaveBeenCalledWith({
+ event: delayedContextA.event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ expect(callbackB).toHaveBeenCalledWith({
+ event: delayedContextB.event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ });
+ });
});
diff --git a/test-server/analytics-snippet/index.html b/test-server/analytics-snippet/index.html
--- a/test-server/analytics-snippet/index.html
+++ b/test-server/analytics-snippet/index.html
@@ -16,6 +16,8 @@
const config = {
autocapture: true,
}
+ amplitude.track('DUMMY EVENT', { event_properties: { name: 'HTML' } })
+ .promise.then((evt) => console.log(`DUMMY_EVENT: `, evt));
amplitude.init(import.meta.env.VITE_AMPLITUDE_API_KEY, config);
amplitude.track('DUMMY EVENT', { name: 'HTML' });
amplitude.identify(new amplitude.Identify().set('role', 'engineer'));
diff --git a/test-server/autocapture/element-interactions.html b/test-server/autocapture/element-interactions.html
--- a/test-server/autocapture/element-interactions.html
+++ b/test-server/autocapture/element-interactions.html
@@ -273,7 +273,7 @@
import.meta.env.VITE_AMPLITUDE_API_KEY,
import.meta.env.VITE_AMPLITUDE_USER_ID || 'amplitude-typescript test user',
{
- fetchRemoteConfig: false,
+ fetchRemoteConfig: true,
logLevel: 'debug',
autocapture: {
attribution: {You can send follow-ups to the cloud agent here.
8043945 to
cd89127
Compare
Session Replay Browser E2E ResultsDetails
Failed testschromium › e2e/trc-url-rule.spec.ts › TRC URL rule — happy path › starts recording after SPA navigation to a matching URL Flaky testschromium › e2e/idb.spec.ts › IDB transaction abort handling › getSequencesToSend cursor abort: AbortError logged at debug not warn |
size-limit report 📦
|
05a12b8 to
a870427
Compare
a870427 to
52701ea
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: Shared request metadata race
- Flush now awaits regular and delayed sends sequentially so each request captures and resets metadata without overlap.
- ✅ Fixed: Dedup drops non-delayed queue items
- Delayed-event deduplication now filters only queued delayed events with the same insert_id, preserving regular queued events.
- ✅ Fixed: Silent empty catch block
- The delayed batch helper no longer catches and ignores errors while building delayed batches.
Or push these changes by commenting:
@cursor push ab8fa3d1ff
Preview (ab8fa3d1ff)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -129,7 +129,9 @@
};
// remove any delayed events with the same insert_id
if (event.delay?.id) {
- this.queue = this.queue.filter((queuedContext) => queuedContext.event.insert_id !== event.insert_id);
+ this.queue = this.queue.filter(
+ (queuedContext) => !queuedContext.event.delay?.id || queuedContext.event.insert_id !== event.insert_id,
+ );
}
this.queue.push(context);
this.schedule(this.config.flushIntervalMillis);
@@ -220,38 +222,32 @@
const batches = chunk(list, this.config.flushQueueSize);
- // Promise.all() doesn't guarantee resolve order.
// Sequentially resolve to make sure backend receives events in order
- const regularEventBatch = batches.reduce(async (promise, batch) => {
- await promise;
- return await this.send(batch, useRetry);
- }, Promise.resolve());
- const eventPromises = [regularEventBatch];
+ // and each request gets its own metadata snapshot.
+ for (const batch of batches) {
+ await this.send(batch, useRetry);
+ }
- const delayedEventBatches = this.getDelayedEventsBatches(delayed, useRetry);
- eventPromises.push(...delayedEventBatches);
+ const delayedEventBatches = this.getDelayedEventsBatches(delayed);
+ for (const [delay, batch] of delayedEventBatches) {
+ await this.send(batch, useRetry, delay);
+ }
- await Promise.all(eventPromises);
-
// Mark current flush is done
this.flushId = null;
this.scheduleEvents(this.queue);
}
- getDelayedEventsBatches(delayed: Record<string, [Delay, Context[]]>, useRetry: boolean) {
- const eventPromises = [];
- try {
- for (const [delay, contexts] of Object.values(delayed)) {
- const delayedBatches = chunk(contexts, this.config.flushQueueSize);
- const delayedEventBatch = delayedBatches.reduce(async (promise, batch) => {
- await promise;
- return await this.send(batch, useRetry, delay);
- }, Promise.resolve());
- eventPromises.push(delayedEventBatch);
- }
- } catch (e) {}
- return eventPromises;
+ getDelayedEventsBatches(delayed: Record<string, [Delay, Context[]]>) {
+ const eventBatches: [Delay, Context[]][] = [];
+ for (const [delay, contexts] of Object.values(delayed)) {
+ const delayedBatches = chunk(contexts, this.config.flushQueueSize);
+ delayedBatches.forEach((batch) => {
+ eventBatches.push([delay, batch]);
+ });
+ }
+ return eventBatches;
}
translatePayloadToDelayedPayload(payload: Payload, list: Context[], delay: Delay): DelayedPayload {
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -136,6 +136,27 @@
expect(destination.queue.length).toBe(1);
expect(destination.queue[0].event).toEqual(expectedEvent);
});
+
+ test('should keep regular events when deduplicating delayed events with the same insert_id', async () => {
+ const destination = new Destination();
+ destination.config = useDefaultConfig();
+ const regularEvent = {
+ event_type: 'regular',
+ insert_id: '123',
+ };
+ const delayedEvent = {
+ event_type: 'delayed',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+
+ void destination.execute(regularEvent);
+ void destination.execute(delayedEvent);
+
+ expect(destination.queue.length).toBe(2);
+ expect(destination.queue[0].event).toEqual(regularEvent);
+ expect(destination.queue[1].event).toEqual(delayedEvent);
+ });
});
describe('removeEventsExceedFlushMaxRetries', () => {
@@ -1727,7 +1748,7 @@
});
describe('delayed events', () => {
- const successResponse = {
+ const successResponse: Response = {
status: Status.Success,
statusCode: 200,
body: {
@@ -1870,6 +1891,36 @@
expectSuccess(delayedCallback, delayedContext.event);
});
+ test('should wait for regular event upload before sending delayed events', async () => {
+ const delayId = 'delay-123';
+ const regularContext = createContext({ event_type: 'regular_event' });
+ const delayedContext = createContext({ event_type: 'delayed_event', delay: { id: delayId, timeout: 5000 } });
+ let resolveFirstSend: (response: Response) => void = (_response: Response) => {
+ throw new Error('First send was not started.');
+ };
+ let firstSendResolved = false;
+ transportProvider.send.mockImplementation(() => {
+ if (transportProvider.send.mock.calls.length === 1) {
+ return new Promise<Response>((resolve) => {
+ resolveFirstSend = (response) => {
+ firstSendResolved = true;
+ resolve(response);
+ };
+ });
+ }
+ expect(firstSendResolved).toBe(true);
+ return Promise.resolve(successResponse);
+ });
+
+ const flushPromise = flushQueue([regularContext, delayedContext]);
+ await Promise.resolve();
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ resolveFirstSend(successResponse);
+ await flushPromise;
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ });
+
test('should send delayed events with different delay_ids on same flush', async () => {
const delayTimeout = 5000;
const callbackA = jest.fn();You can send follow-ups to the cloud agent here.
258b70f to
0b71439
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Deduped execute promises never resolve
- Deduplicated delayed contexts now resolve their execute promises with a skipped result before being removed from the queue.
Or push these changes by commenting:
@cursor push 0ab8ae497c
Preview (0ab8ae497c)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -129,11 +129,17 @@
};
// remove any delayed events with the same insert_id
if (event.delay?.id) {
- /* istanbul ignore next */
- this.queue = this.queue.filter((queuedContext) => (
- queuedContext.event.insert_id !== event.insert_id &&
- queuedContext.event.delay?.id !== event.delay?.id
- ));
+ const dedupedContexts = this.queue.filter(
+ (queuedContext) =>
+ queuedContext.event.insert_id === event.insert_id || queuedContext.event.delay?.id === event.delay?.id,
+ );
+ this.queue = this.queue.filter(
+ (queuedContext) =>
+ queuedContext.event.insert_id !== event.insert_id && queuedContext.event.delay?.id !== event.delay?.id,
+ );
+ dedupedContexts.forEach((queuedContext) =>
+ queuedContext.callback(buildResult(queuedContext.event, 0, Status.Skipped)),
+ );
}
this.queue.push(context);
this.schedule(this.config.flushIntervalMillis);
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -130,11 +130,16 @@
insert_id: '123',
delay: { id: 'delay-123' },
};
- void destination.execute(event1);
+ const result = destination.execute(event1);
void destination.execute(event2);
expect(destination.queue.length).toBe(1);
expect(destination.queue[0].event).toEqual(expectedEvent);
+ await expect(result).resolves.toEqual({
+ event: event1,
+ code: 0,
+ message: Status.Skipped,
+ });
});
test('should deduplicate events with the same delay_id but different insert_id', async () => {
@@ -155,11 +160,16 @@
insert_id: '456',
delay: { id: 'delay-123' },
};
- void destination.execute(event1);
+ const result = destination.execute(event1);
void destination.execute(event2);
expect(destination.queue.length).toBe(1);
expect(destination.queue[0].event).toEqual(expectedEvent);
+ await expect(result).resolves.toEqual({
+ event: event1,
+ code: 0,
+ message: Status.Skipped,
+ });
});
});You can send follow-ups to the cloud agent here.
0b71439 to
a9cc1b8
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Parallel flush breaks event order
- Regular batches are now awaited before delayed batch promises are created, preserving regular-before-delayed flush ordering.
Or push these changes by commenting:
@cursor push 3b32ae82ce
Preview (3b32ae82ce)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -237,13 +237,11 @@
await promise;
return await this.send(batch, useRetry);
}, Promise.resolve());
- const eventPromises = [regularEventBatch];
+ await regularEventBatch;
const delayedEventBatches = this.getDelayedEventsBatches(delayed, useRetry);
- eventPromises.push(...delayedEventBatches);
+ await Promise.all(delayedEventBatches);
- await Promise.all(eventPromises);
-
// Mark current flush is done
this.flushId = null;
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -1759,7 +1759,7 @@
});
describe('delayed events', () => {
- const successResponse = {
+ const successResponse: Response = {
status: Status.Success,
statusCode: 200,
body: {
@@ -1902,6 +1902,50 @@
expectSuccess(delayedCallback, delayedContext.event);
});
+ test('should wait for regular events before sending delayed events on same flush', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const regularContext = createContext({ event_type: 'regular_event' });
+ const delayedContext = createContext({
+ event_type: 'delayed_event',
+ delay: { id: delayId, timeout: delayTimeout },
+ });
+ let resolveRegularSend: (response: Response) => void = () => undefined;
+ const regularSend = new Promise<Response>((resolve) => {
+ resolveRegularSend = resolve;
+ });
+ transportProvider.send.mockImplementationOnce(() => regularSend).mockResolvedValueOnce(successResponse);
+
+ const flushPromise = flushQueue([regularContext, delayedContext]);
+ await Promise.resolve();
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ AMPLITUDE_SERVER_URL,
+ expect.objectContaining({
+ events: [expect.objectContaining({ event_type: 'regular_event' })],
+ }),
+ true,
+ );
+
+ resolveRegularSend(successResponse);
+ await flushPromise;
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ });
+
test('should send delayed events with different delay_ids on same flush', async () => {
const delayTimeout = 5000;
const callbackA = jest.fn();You can send follow-ups to the cloud agent here.
a9cc1b8 to
0a18292
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Queue dedupe ignores in-flight sends
- Tracked in-flight and overwritten contexts so delayed dedupe no longer marks active sends as cancelled, skips stale queued batches, and drops overwritten retries safely.
Or push these changes by commenting:
@cursor push 9bf99a5132
Preview (9bf99a5132)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -1,5 +1,6 @@
import { DestinationPlugin } from '../types/plugin';
import { Event } from '../types/event/event';
+import { Delay } from '../types/event/base-event';
import { Result } from '../types/result';
import { Status } from '../types/status';
import {
@@ -32,6 +33,7 @@
import { IDiagnosticsClient } from '../diagnostics/diagnostics-client';
import { isSuccessStatusCode } from '../utils/status-code';
import { getStacktrace } from '../utils/debug';
+import { DelayedPayload, Payload } from '../types/payload';
export interface Context {
event: Event;
@@ -94,6 +96,8 @@
// When flush resolves, set `flushId` to null
flushId: ReturnType<typeof setTimeout> | null = null;
queue: Context[] = [];
+ inFlightContexts = new WeakSet<Context>();
+ overwrittenContexts = new WeakSet<Context>();
diagnosticsClient: IDiagnosticsClient | undefined;
constructor(context?: { diagnosticsClient: IDiagnosticsClient }) {
@@ -125,6 +129,26 @@
callback: (result: Result) => resolve(result),
timeout: 0,
};
+ // remove delayed events with the same insert_id and the same delay.id
+ if (event.delay?.id) {
+ const duplicatedEvents: Context[] = [];
+ const queue: Context[] = [];
+ /* istanbul ignore next */
+ this.queue.forEach((queuedContext) => {
+ if (queuedContext.event.insert_id === event.insert_id || queuedContext.event.delay?.id === event.delay?.id) {
+ this.overwrittenContexts.add(queuedContext);
+ if (this.inFlightContexts.has(queuedContext)) {
+ queue.push(queuedContext);
+ } else {
+ duplicatedEvents.push(queuedContext);
+ }
+ } else {
+ queue.push(queuedContext);
+ }
+ });
+ duplicatedEvents.forEach((context) => context.callback(buildResult(context.event, 0, Status.Overwritten)));
+ this.queue = queue;
+ }
this.queue.push(context);
this.schedule(this.config.flushIntervalMillis);
this.saveEvents();
@@ -133,6 +157,10 @@
removeEventsExceedFlushMaxRetries(list: Context[]) {
return list.filter((context) => {
+ if (this.overwrittenContexts.has(context)) {
+ void this.fulfillRequest([context], 0, Status.Overwritten);
+ return false;
+ }
context.attempts += 1;
if (context.attempts < this.config.flushMaxRetries) {
return true;
@@ -198,50 +226,107 @@
this.resetSchedule();
const list: Context[] = [];
+ const delayed: Record<string, [Delay, Context[]]> = {};
const later: Context[] = [];
- this.queue.forEach((context) => (context.timeout === 0 ? list.push(context) : later.push(context)));
+ this.queue.forEach((context) => {
+ if (context.timeout !== 0) {
+ later.push(context);
+ } else if (context.event.delay?.id) {
+ const delay = context.event.delay;
+ delayed[delay.id] = delayed[delay.id] || [delay, []];
+ delayed[delay.id][1].push(context);
+ } else {
+ list.push(context);
+ }
+ });
const batches = chunk(list, this.config.flushQueueSize);
// Promise.all() doesn't guarantee resolve order.
// Sequentially resolve to make sure backend receives events in order
- await batches.reduce(async (promise, batch) => {
+ const regularEventBatch = batches.reduce(async (promise, batch) => {
await promise;
return await this.send(batch, useRetry);
}, Promise.resolve());
+ const eventPromises = [regularEventBatch];
+ const delayedEventBatches = this.getDelayedEventsBatches(delayed, useRetry);
+ eventPromises.push(...delayedEventBatches);
+
+ await Promise.all(eventPromises);
+
// Mark current flush is done
this.flushId = null;
this.scheduleEvents(this.queue);
}
- async send(list: Context[], useRetry = true) {
+ getDelayedEventsBatches(delayed: Record<string, [Delay, Context[]]>, useRetry: boolean) {
+ const eventPromises = [];
+ try {
+ for (const [delay, contexts] of Object.values(delayed)) {
+ const delayedBatches = chunk(contexts, this.config.flushQueueSize);
+ const delayedEventBatch = delayedBatches.reduce(async (promise, batch) => {
+ await promise;
+ return await this.send(batch, useRetry, delay);
+ }, Promise.resolve());
+ eventPromises.push(delayedEventBatch);
+ }
+ } catch (e) {}
+ return eventPromises;
+ }
+
+ translatePayloadToDelayedPayload(payload: Payload, list: Context[], delay: Delay): DelayedPayload {
+ const delayedPayload: DelayedPayload = {
+ ...payload,
+ id: delay.id,
+ timeout: list.reduce((max, context) => Math.max(max, context.event.delay!.timeout || 0), 0),
+ instant_events: payload.events.filter((_event, index) => !list[index].event.delay!.timeout),
+ events: payload.events.filter((_event, index) => list[index].event.delay!.timeout),
+ };
+ return delayedPayload;
+ }
+
+ async send(list: Context[], useRetry = true, delay?: Delay) {
+ list = list.filter((context) => !this.overwrittenContexts.has(context));
+ if (list.length === 0) {
+ return;
+ }
+
if (!this.config.apiKey) {
return this.fulfillRequest(list, 400, MISSING_API_KEY_MESSAGE);
}
- const payload = {
+ list.forEach((context) => this.inFlightContexts.add(context));
+
+ let payload: Payload = {
api_key: this.config.apiKey,
events: list.map((context) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
- const { extra, ...eventWithoutExtra } = context.event;
+ const { extra, delay, ...eventWithoutExtra } = context.event;
return eventWithoutExtra;
}),
options: {
min_id_length: this.config.minIdLength,
},
client_upload_time: new Date().toISOString(),
- request_metadata: this.config.requestMetadata,
+ request_metadata: delay ? undefined : this.config.requestMetadata,
};
- this.config.requestMetadata = new RequestMetadata();
+ if (!delay) {
+ this.config.requestMetadata = new RequestMetadata();
+ }
try {
- const { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
+ let { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
const shouldCompressUploadBody = shouldCompressUploadBodyForRequest(
serverUrl,
this.config.enableRequestBodyCompression,
);
+ if (delay) {
+ serverUrl = `${serverUrl}/delayed`;
+ payload = this.translatePayloadToDelayedPayload(payload, list, delay);
+ // TODO: if /delayed can't handle compression, then turn it off here
+ }
const res = await this.config.transportProvider.send(serverUrl, payload, shouldCompressUploadBody);
if (res === null) {
this.fulfillRequest(list, 0, UNEXPECTED_ERROR_MESSAGE);
@@ -267,6 +352,8 @@
});
this.handleResponse({ status: Status.Failed, statusCode: 0 }, list);
+ } finally {
+ list.forEach((context) => this.inFlightContexts.delete(context));
}
}
@@ -437,9 +524,8 @@
* This is called on response comes back for a request
*/
removeEvents(eventsToRemove: Context[]) {
- this.queue = this.queue.filter(
- (queuedContext) => !eventsToRemove.some((context) => context.event.insert_id === queuedContext.event.insert_id),
- );
+ const eventsToRemoveSet = new Set(eventsToRemove);
+ this.queue = this.queue.filter((queuedContext) => !eventsToRemoveSet.has(queuedContext));
this.saveEvents();
}
diff --git a/packages/analytics-core/src/types/event/base-event.ts b/packages/analytics-core/src/types/event/base-event.ts
--- a/packages/analytics-core/src/types/event/base-event.ts
+++ b/packages/analytics-core/src/types/event/base-event.ts
@@ -1,6 +1,11 @@
import { Plan } from './plan';
import { IngestionMetadataEventProperty } from './ingestion-metadata';
+export interface Delay {
+ id: string;
+ timeout?: number;
+}
+
export interface BaseEvent extends EventOptions {
event_type: string;
event_properties?: { [key: string]: any } | undefined;
@@ -52,4 +57,5 @@
android_app_set_id?: string;
extra?: { [key: string]: any };
groups?: { [key: string]: any } | undefined;
+ delay?: Delay;
}
diff --git a/packages/analytics-core/src/types/payload.ts b/packages/analytics-core/src/types/payload.ts
--- a/packages/analytics-core/src/types/payload.ts
+++ b/packages/analytics-core/src/types/payload.ts
@@ -12,3 +12,9 @@
client_upload_time?: string;
request_metadata?: RequestMetadata;
}
+
+export interface DelayedPayload extends Payload {
+ id: string;
+ timeout: number;
+ instant_events?: readonly Event[];
+}
diff --git a/packages/analytics-core/src/types/status.ts b/packages/analytics-core/src/types/status.ts
--- a/packages/analytics-core/src/types/status.ts
+++ b/packages/analytics-core/src/types/status.ts
@@ -18,4 +18,6 @@
Timeout = 'Timeout',
/** NodeJS runtime environment error.. E.g. disconnected from network */
SystemError = 'SystemError',
+ /** The event was overwritten by a new event with the same insert_id and delay.id. */
+ Overwritten = 'overwritten',
}
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -111,6 +111,63 @@
expect(schedule).toHaveBeenCalledTimes(1);
expect(saveEvents).toHaveBeenCalledTimes(1);
});
+
+ test('should deduplicate events with the same insert_id and delay_id', async () => {
+ const destination = new Destination();
+ destination.config = useDefaultConfig();
+ const event1 = {
+ event_type: 'before',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const event2 = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const expectedEvent = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const staleResult = destination.execute(event1);
+ void destination.execute(event2);
+
+ expect(destination.queue.length).toBe(1);
+ expect(destination.queue[0].event).toEqual(expectedEvent);
+
+ await staleResult;
+ expect(staleResult).resolves.toEqual({
+ event: event1,
+ code: 0,
+ message: Status.Overwritten,
+ });
+ });
+
+ test('should deduplicate events with the same delay_id but different insert_id', async () => {
+ const destination = new Destination();
+ destination.config = useDefaultConfig();
+ const event1 = {
+ event_type: 'before',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const event2 = {
+ event_type: 'after',
+ insert_id: '456',
+ delay: { id: 'delay-123' },
+ };
+ const expectedEvent = {
+ event_type: 'after',
+ insert_id: '456',
+ delay: { id: 'delay-123' },
+ };
+ void destination.execute(event1);
+ void destination.execute(event2);
+
+ expect(destination.queue.length).toBe(1);
+ expect(destination.queue[0].event).toEqual(expectedEvent);
+ });
});
describe('removeEventsExceedFlushMaxRetries', () => {
@@ -1700,4 +1757,309 @@
expect(result).toBe('');
});
});
+
+ describe('delayed events', () => {
+ const successResponse = {
+ status: Status.Success,
+ statusCode: 200,
+ body: {
+ eventsIngested: 1,
+ payloadSizeBytes: 1,
+ serverUploadTime: 1,
+ },
+ };
+ const delayedUrl = `${AMPLITUDE_SERVER_URL}/delayed`;
+
+ let destination: Destination;
+ let transportProvider: { send: jest.Mock };
+
+ const createContext = (event: Context['event'], callback = jest.fn(), timeout = 0): Context => ({
+ attempts: 0,
+ callback,
+ event,
+ timeout,
+ });
+
+ const flushQueue = async (contexts: Context[]) => {
+ destination.queue = contexts;
+ await destination.flush(true);
+ };
+
+ const expectSuccess = (callback: jest.Mock, event: Context['event']) => {
+ expect(callback).toHaveBeenCalledWith({
+ event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ };
+
+ beforeEach(async () => {
+ destination = new Destination();
+ transportProvider = {
+ send: jest.fn().mockResolvedValue(successResponse),
+ };
+ await destination.setup({
+ ...useDefaultConfig(),
+ transportProvider,
+ apiKey: API_KEY,
+ serverUrl: AMPLITUDE_SERVER_URL,
+ });
+ });
+
+ test('should send delayed events to /delayed endpoint', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should not include delay in the sent events', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ const sentPayload = transportProvider.send.mock.calls[0][1] as Payload;
+ const sentEvent = sentPayload.events[0];
+ expect(sentEvent).not.toHaveProperty('delay');
+ expectSuccess(callback, event);
+ });
+
+ test('should send instant_events to /delayed endpoint when delay_timeout is not set', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'instant_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: 0,
+ events: [],
+ instant_events: [expect.objectContaining({ event_type: 'instant_event' })],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should send /delayed and regular events on same flush', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const regularCallback = jest.fn();
+ const delayedCallback = jest.fn();
+ const regularContext = createContext({ event_type: 'regular_event' }, regularCallback);
+ const delayedContext = createContext(
+ { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } },
+ delayedCallback,
+ );
+
+ await flushQueue([regularContext, delayedContext]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ AMPLITUDE_SERVER_URL,
+ expect.objectContaining({
+ events: [expect.objectContaining({ event_type: 'regular_event' })],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(regularCallback, regularContext.event);
+ expectSuccess(delayedCallback, delayedContext.event);
+ });
+
+ test('should send delayed events with different delay_ids on same flush', async () => {
+ const delayTimeout = 5000;
+ const callbackA = jest.fn();
+ const callbackB = jest.fn();
+ const delayedContextA = createContext(
+ { event_type: 'delayed_event_a', delay: { id: 'delay-a', timeout: delayTimeout } },
+ callbackA,
+ );
+ const delayedContextB = createContext(
+ { event_type: 'delayed_event_b', delay: { id: 'delay-b', timeout: delayTimeout } },
+ callbackB,
+ );
+
+ await flushQueue([delayedContextA, delayedContextB]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-a',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_a',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-b',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_b',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callbackA, delayedContextA.event);
+ expectSuccess(callbackB, delayedContextB.event);
+ });
+
+ test('should not send delayed events while backoff timeout is active', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'delayed_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ const context = createContext(event, callback, 1000);
+
+ destination.queue = [context];
+ await destination.flush(true);
+
+ expect(transportProvider.send).not.toHaveBeenCalled();
+ expect(callback).not.toHaveBeenCalled();
+ expect(destination.queue).toEqual([context]);
+ });
+
+ test('should not overwrite delayed events that are already in flight', async () => {
+ const delayId = 'delay-123';
+ const staleEvent = { event_type: 'before', insert_id: '123', delay: { id: delayId } };
+ const replacementEvent = { event_type: 'after', insert_id: '123', delay: { id: delayId } };
+ const staleCallback = jest.fn();
+ const staleContext = createContext(staleEvent, staleCallback);
+ let resolveSend: (response: typeof successResponse) => void = jest.fn();
+ transportProvider.send.mockReturnValueOnce(
+ new Promise((resolve) => {
+ resolveSend = resolve;
+ }),
+ );
+ jest.spyOn(destination, 'schedule').mockImplementation(jest.fn);
+
+ destination.queue = [staleContext];
+ const flushPromise = destination.flush(true);
+ await Promise.resolve();
+
+ void destination.execute(replacementEvent);
+
+ expect(staleCallback).not.toHaveBeenCalledWith({
+ event: staleEvent,
+ code: 0,
+ message: Status.Overwritten,
+ });
+
+ resolveSend(successResponse);
+ await flushPromise;
+
+ expectSuccess(staleCallback, staleEvent);
+ expect(destination.queue).toHaveLength(1);
+ expect(destination.queue[0].event).toEqual(replacementEvent);
+ });
+
+ test('should skip delayed batch entries overwritten before send starts', async () => {
+ const delayId = 'delay-123';
+ const delayedContextA = createContext({
+ event_type: 'delayed_event_a',
+ delay: { id: delayId },
+ });
+ const delayedContextB = createContext({
+ event_type: 'delayed_event_b',
+ delay: { id: delayId },
+ });
+ let resolveSend: (response: typeof successResponse) => void = jest.fn();
+ transportProvider.send.mockReturnValueOnce(
+ new Promise((resolve) => {
+ resolveSend = resolve;
+ }),
+ );
+ jest.spyOn(destination, 'schedule').mockImplementation(jest.fn);
+ destination.config.flushQueueSize = 1;
+
+ destination.queue = [delayedContextA, delayedContextB];
+ const flushPromise = destination.flush(true);
+ await Promise.resolve();
+
+ void destination.execute({ event_type: 'replacement', delay: { id: delayId } });
+ resolveSend(successResponse);
+ await flushPromise;
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(delayedContextB.callback).toHaveBeenCalledWith({
+ event: delayedContextB.event,
+ code: 0,
+ message: Status.Overwritten,
+ });
+ });
+
+ test('should not retry in-flight delayed events that were overwritten', async () => {
+ const delayId = 'delay-123';
+ const staleEvent = { event_type: 'before', delay: { id: delayId } };
+ const replacementEvent = { event_type: 'after', delay: { id: delayId } };
+ const staleCallback = jest.fn();
+ const staleContext = createContext(staleEvent, staleCallback);
+ let resolveSend: (response: Response) => void = jest.fn();
+ transportProvider.send.mockReturnValueOnce(
+ new Promise((resolve) => {
+ resolveSend = resolve;
+ }),
+ );
+ jest.spyOn(destination, 'schedule').mockImplementation(jest.fn);
+
+ destination.queue = [staleContext];
+ const flushPromise = destination.flush(true);
+ await Promise.resolve();
+
+ void destination.execute(replacementEvent);
+ resolveSend({ status: Status.Failed, statusCode: 500 });
+ await flushPromise;
+
+ expect(staleCallback).toHaveBeenCalledWith({
+ event: staleEvent,
+ code: 0,
+ message: Status.Overwritten,
+ });
+ expect(destination.queue).toHaveLength(1);
+ expect(destination.queue[0].event).toEqual(replacementEvent);
+ });
+ });
});
diff --git a/packages/analytics-types/src/status.ts b/packages/analytics-types/src/status.ts
--- a/packages/analytics-types/src/status.ts
+++ b/packages/analytics-types/src/status.ts
@@ -18,4 +18,6 @@
Timeout = 'Timeout',
/** NodeJS runtime environment error.. E.g. disconnected from network */
SystemError = 'SystemError',
+ /** The event was overwritten by a new event with the same insert_id and delay.id. */
+ Overwritten = 'overwritten',
}You can send follow-ups to the cloud agent here.
0a18292 to
a4ef194
Compare
7d350a6 to
8036427
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
There are 4 total unresolved issues (including 2 from previous reviews).
Autofix Details
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Delay dedup ignores delay id
- Delayed queue deduplication now replaces stale queued events by matching delay id regardless of insert id.
- ✅ Fixed: Flush test assumes send order
- The mixed regular and delayed flush test now asserts both sends without relying on concurrent call order.
Or push these changes by commenting:
@cursor push 35a945dcb1
Preview (35a945dcb1)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -1,5 +1,6 @@
import { DestinationPlugin } from '../types/plugin';
import { Event } from '../types/event/event';
+import { Delay } from '../types/event/base-event';
import { Result } from '../types/result';
import { Status } from '../types/status';
import {
@@ -32,6 +33,7 @@
import { IDiagnosticsClient } from '../diagnostics/diagnostics-client';
import { isSuccessStatusCode } from '../utils/status-code';
import { getStacktrace } from '../utils/debug';
+import { DelayedPayload, Payload } from '../types/payload';
export interface Context {
event: Event;
@@ -125,6 +127,23 @@
callback: (result: Result) => resolve(result),
timeout: 0,
};
+ // remove delayed events with the same delay.id
+ if (event.delay?.id) {
+ const duplicatedEvents: Context[] = [];
+ const queue: Context[] = [];
+ /* istanbul ignore next */
+ this.queue.forEach((queuedContext) => {
+ if (queuedContext.event.delay?.id === event.delay?.id) {
+ duplicatedEvents.push(queuedContext);
+ } else {
+ queue.push(queuedContext);
+ }
+ });
+ duplicatedEvents.forEach((context) =>
+ context.callback(buildResult(context.event, 0, 'Stale event overwritten')),
+ );
+ this.queue = queue;
+ }
this.queue.push(context);
this.schedule(this.config.flushIntervalMillis);
this.saveEvents();
@@ -198,50 +217,102 @@
this.resetSchedule();
const list: Context[] = [];
+ const delayed: Record<string, [Delay, Context[]]> = {};
const later: Context[] = [];
- this.queue.forEach((context) => (context.timeout === 0 ? list.push(context) : later.push(context)));
+ this.queue.forEach((context) => {
+ if (context.timeout !== 0) {
+ later.push(context);
+ } else if (context.event.delay?.id) {
+ const delay = context.event.delay;
+ delayed[delay.id] = delayed[delay.id] || [delay, []];
+ delayed[delay.id][1].push(context);
+ } else {
+ list.push(context);
+ }
+ });
const batches = chunk(list, this.config.flushQueueSize);
// Promise.all() doesn't guarantee resolve order.
// Sequentially resolve to make sure backend receives events in order
- await batches.reduce(async (promise, batch) => {
+ const regularEventBatch = batches.reduce(async (promise, batch) => {
await promise;
return await this.send(batch, useRetry);
}, Promise.resolve());
+ const eventPromises = [regularEventBatch];
+ const delayedEventBatches = this.getDelayedEventsBatches(delayed, useRetry);
+ eventPromises.push(...delayedEventBatches);
+
+ await Promise.all(eventPromises);
+
// Mark current flush is done
this.flushId = null;
this.scheduleEvents(this.queue);
}
- async send(list: Context[], useRetry = true) {
+ getDelayedEventsBatches(delayed: Record<string, [Delay, Context[]]>, useRetry: boolean) {
+ const eventPromises = [];
+ try {
+ for (const [delay, contexts] of Object.values(delayed)) {
+ const delayedBatches = chunk(contexts, this.config.flushQueueSize);
+ const delayedEventBatch = delayedBatches.reduce(async (promise, batch) => {
+ await promise;
+ return await this.send(batch, useRetry, delay);
+ }, Promise.resolve());
+ eventPromises.push(delayedEventBatch);
+ }
+ } catch (e) {
+ // Ignore unexpected grouping errors so regular event flushing can continue.
+ }
+ return eventPromises;
+ }
+
+ translatePayloadToDelayedPayload(payload: Payload, list: Context[], delay: Delay): DelayedPayload {
+ const delayedPayload: DelayedPayload = {
+ ...payload,
+ id: delay.id,
+ timeout: list.reduce((max, context) => Math.max(max, context.event.delay!.timeout || 0), 0),
+ instant_events: payload.events.filter((_event, index) => !list[index].event.delay!.timeout),
+ events: payload.events.filter((_event, index) => list[index].event.delay!.timeout),
+ };
+ return delayedPayload;
+ }
+
+ async send(list: Context[], useRetry = true, delay?: Delay) {
if (!this.config.apiKey) {
return this.fulfillRequest(list, 400, MISSING_API_KEY_MESSAGE);
}
- const payload = {
+ let payload: Payload = {
api_key: this.config.apiKey,
events: list.map((context) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
- const { extra, ...eventWithoutExtra } = context.event;
+ const { extra, delay, ...eventWithoutExtra } = context.event;
return eventWithoutExtra;
}),
options: {
min_id_length: this.config.minIdLength,
},
client_upload_time: new Date().toISOString(),
- request_metadata: this.config.requestMetadata,
+ request_metadata: delay ? undefined : this.config.requestMetadata,
};
- this.config.requestMetadata = new RequestMetadata();
+ if (!delay) {
+ this.config.requestMetadata = new RequestMetadata();
+ }
try {
- const { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
+ let { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
const shouldCompressUploadBody = shouldCompressUploadBodyForRequest(
serverUrl,
this.config.enableRequestBodyCompression,
);
+ if (delay) {
+ serverUrl = `${serverUrl}/delayed`;
+ payload = this.translatePayloadToDelayedPayload(payload, list, delay);
+ // TODO: if /delayed can't handle compression, then turn it off here
+ }
const res = await this.config.transportProvider.send(serverUrl, payload, shouldCompressUploadBody);
if (res === null) {
this.fulfillRequest(list, 0, UNEXPECTED_ERROR_MESSAGE);
diff --git a/packages/analytics-core/src/types/event/base-event.ts b/packages/analytics-core/src/types/event/base-event.ts
--- a/packages/analytics-core/src/types/event/base-event.ts
+++ b/packages/analytics-core/src/types/event/base-event.ts
@@ -1,6 +1,11 @@
import { Plan } from './plan';
import { IngestionMetadataEventProperty } from './ingestion-metadata';
+export interface Delay {
+ id: string;
+ timeout?: number;
+}
+
export interface BaseEvent extends EventOptions {
event_type: string;
event_properties?: { [key: string]: any } | undefined;
@@ -52,4 +57,5 @@
android_app_set_id?: string;
extra?: { [key: string]: any };
groups?: { [key: string]: any } | undefined;
+ delay?: Delay;
}
diff --git a/packages/analytics-core/src/types/payload.ts b/packages/analytics-core/src/types/payload.ts
--- a/packages/analytics-core/src/types/payload.ts
+++ b/packages/analytics-core/src/types/payload.ts
@@ -12,3 +12,9 @@
client_upload_time?: string;
request_metadata?: RequestMetadata;
}
+
+export interface DelayedPayload extends Payload {
+ id: string;
+ timeout: number;
+ instant_events?: readonly Event[];
+}
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -111,6 +111,62 @@
expect(schedule).toHaveBeenCalledTimes(1);
expect(saveEvents).toHaveBeenCalledTimes(1);
});
+
+ test('should deduplicate events with the same insert_id and delay_id', async () => {
+ const destination = new Destination();
+ destination.config = useDefaultConfig();
+ const event1 = {
+ event_type: 'before',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const event2 = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const expectedEvent = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const staleResult = destination.execute(event1);
+ void destination.execute(event2);
+
+ expect(destination.queue.length).toBe(1);
+ expect(destination.queue[0].event).toEqual(expectedEvent);
+
+ await expect(staleResult).resolves.toEqual({
+ event: event1,
+ code: 0,
+ message: 'Stale event overwritten',
+ });
+ });
+
+ test('should deduplicate events with the same delay_id but different insert_id', async () => {
+ const destination = new Destination();
+ destination.config = useDefaultConfig();
+ const event1 = {
+ event_type: 'before',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const event2 = {
+ event_type: 'after',
+ insert_id: '456',
+ delay: { id: 'delay-123' },
+ };
+ const expectedEvent = {
+ event_type: 'after',
+ insert_id: '456',
+ delay: { id: 'delay-123' },
+ };
+ void destination.execute(event1);
+ void destination.execute(event2);
+
+ expect(destination.queue.length).toBe(1);
+ expect(destination.queue[0].event).toEqual(expectedEvent);
+ });
});
describe('removeEventsExceedFlushMaxRetries', () => {
@@ -1700,4 +1756,211 @@
expect(result).toBe('');
});
});
+
+ describe('delayed events', () => {
+ const successResponse = {
+ status: Status.Success,
+ statusCode: 200,
+ body: {
+ eventsIngested: 1,
+ payloadSizeBytes: 1,
+ serverUploadTime: 1,
+ },
+ };
+ const delayedUrl = `${AMPLITUDE_SERVER_URL}/delayed`;
+
+ let destination: Destination;
+ let transportProvider: { send: jest.Mock };
+
+ const createContext = (
+ event: { event_type: string; delay?: { id: string; timeout?: number } },
+ callback = jest.fn(),
+ timeout = 0,
+ ): Context => ({
+ attempts: 0,
+ callback,
+ event,
+ timeout,
+ });
+
+ const flushQueue = async (contexts: Context[]) => {
+ destination.queue = contexts;
+ await destination.flush(true);
+ };
+
+ const expectSuccess = (callback: jest.Mock, event: Context['event']) => {
+ expect(callback).toHaveBeenCalledWith({
+ event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ };
+
+ beforeEach(async () => {
+ destination = new Destination();
+ transportProvider = {
+ send: jest.fn().mockResolvedValue(successResponse),
+ };
+ await destination.setup({
+ ...useDefaultConfig(),
+ transportProvider,
+ apiKey: API_KEY,
+ serverUrl: AMPLITUDE_SERVER_URL,
+ });
+ });
+
+ test('should send delayed events to /delayed endpoint', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should not include delay in the sent events', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ const sentPayload = transportProvider.send.mock.calls[0][1] as Payload;
+ const sentEvent = sentPayload.events[0];
+ expect(sentEvent).not.toHaveProperty('delay');
+ expectSuccess(callback, event);
+ });
+
+ test('should send instant_events to /delayed endpoint when delay_timeout is not set', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'instant_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: 0,
+ events: [],
+ instant_events: [expect.objectContaining({ event_type: 'instant_event' })],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should send /delayed and regular events on same flush', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const regularCallback = jest.fn();
+ const delayedCallback = jest.fn();
+ const regularContext = createContext({ event_type: 'regular_event' }, regularCallback);
+ const delayedContext = createContext(
+ { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } },
+ delayedCallback,
+ );
+
+ await flushQueue([regularContext, delayedContext]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ AMPLITUDE_SERVER_URL,
+ expect.objectContaining({
+ events: [expect.objectContaining({ event_type: 'regular_event' })],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(regularCallback, regularContext.event);
+ expectSuccess(delayedCallback, delayedContext.event);
+ });
+
+ test('should send delayed events with different delay_ids on same flush', async () => {
+ const delayTimeout = 5000;
+ const callbackA = jest.fn();
+ const callbackB = jest.fn();
+ const delayedContextA = createContext(
+ { event_type: 'delayed_event_a', delay: { id: 'delay-a', timeout: delayTimeout } },
+ callbackA,
+ );
+ const delayedContextB = createContext(
+ { event_type: 'delayed_event_b', delay: { id: 'delay-b', timeout: delayTimeout } },
+ callbackB,
+ );
+
+ await flushQueue([delayedContextA, delayedContextB]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-a',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_a',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-b',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_b',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callbackA, delayedContextA.event);
+ expectSuccess(callbackB, delayedContextB.event);
+ });
+
+ test('should not send delayed events while backoff timeout is active', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'delayed_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ const context = createContext(event, callback, 1000);
+
+ destination.queue = [context];
+ await destination.flush(true);
+
+ expect(transportProvider.send).not.toHaveBeenCalled();
+ expect(callback).not.toHaveBeenCalled();
+ expect(destination.queue).toEqual([context]);
+ });
+ });
});You can send follow-ups to the cloud agent here.
4e44458 to
b97d312
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 5 total unresolved issues (including 4 from previous reviews).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Same delay id chunked overwrites
- Delayed events sharing a delay id are now sent in a single /delayed request so later chunks cannot overwrite earlier events.
Or push these changes by commenting:
@cursor push ed0a2e4762
Preview (ed0a2e4762)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -256,12 +256,7 @@
const eventPromises = [];
try {
for (const [delay, contexts] of Object.values(delayed)) {
- const delayedBatches = chunk(contexts, this.config.flushQueueSize);
- const delayedEventBatch = delayedBatches.reduce(async (promise, batch) => {
- await promise;
- return await this.send(batch, useRetry, delay);
- }, Promise.resolve());
- eventPromises.push(delayedEventBatch);
+ eventPromises.push(Promise.resolve().then(() => this.send(contexts, useRetry, delay)));
}
} catch (e) {
// swallow error
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -1839,6 +1839,40 @@
expectSuccess(callback, event);
});
+ test('should send delayed events with the same delay_id in one request', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ destination.config.flushQueueSize = 2;
+ const callbacks = [jest.fn(), jest.fn(), jest.fn()];
+ const contexts = callbacks.map((callback, index) =>
+ createContext(
+ { event_type: `delayed_event_${index}`, delay: { id: delayId, timeout: delayTimeout } },
+ callback,
+ ),
+ );
+
+ await flushQueue(contexts);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({ event_type: 'delayed_event_0' }),
+ expect.objectContaining({ event_type: 'delayed_event_1' }),
+ expect.objectContaining({ event_type: 'delayed_event_2' }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ contexts.forEach((context, index) => {
+ expectSuccess(callbacks[index], context.event);
+ });
+ });
+
test('should send /delayed and regular events on same flush', async () => {
const delayId = 'delay-123';
const delayTimeout = 5000;You can send follow-ups to the cloud agent here.
b97d312 to
f6bfa19
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: In-flight dedupe drops replacement event
- Changed queue removal to drop only the exact fulfilled contexts so delayed replacements sharing an insert_id remain queued and are flushed.
Or push these changes by commenting:
@cursor push 55926db00f
Preview (55926db00f)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -1,5 +1,6 @@
import { DestinationPlugin } from '../types/plugin';
import { Event } from '../types/event/event';
+import { Delay } from '../types/event/base-event';
import { Result } from '../types/result';
import { Status } from '../types/status';
import {
@@ -32,6 +33,7 @@
import { IDiagnosticsClient } from '../diagnostics/diagnostics-client';
import { isSuccessStatusCode } from '../utils/status-code';
import { getStacktrace } from '../utils/debug';
+import { DelayedPayload, Payload } from '../types/payload';
export interface Context {
event: Event;
@@ -125,6 +127,23 @@
callback: (result: Result) => resolve(result),
timeout: 0,
};
+ // remove delayed events with the same insert_id and the same delay.id
+ if (event.delay?.id) {
+ const duplicatedEvents: Context[] = [];
+ const queue: Context[] = [];
+ /* istanbul ignore next */
+ this.queue.forEach((queuedContext) => {
+ if (queuedContext.event.insert_id === event.insert_id && queuedContext.event.delay?.id === event.delay?.id) {
+ duplicatedEvents.push(queuedContext);
+ } else {
+ queue.push(queuedContext);
+ }
+ });
+ duplicatedEvents.forEach((context) =>
+ context.callback(buildResult(context.event, 0, 'Stale event overwritten')),
+ );
+ this.queue = queue;
+ }
this.queue.push(context);
this.schedule(this.config.flushIntervalMillis);
this.saveEvents();
@@ -198,50 +217,97 @@
this.resetSchedule();
const list: Context[] = [];
+ const delayed: Record<string, [Delay, Context[]]> = {};
const later: Context[] = [];
- this.queue.forEach((context) => (context.timeout === 0 ? list.push(context) : later.push(context)));
+ this.queue.forEach((context) => {
+ if (context.timeout !== 0) {
+ later.push(context);
+ } else if (context.event.delay?.id) {
+ const delay = context.event.delay;
+ delayed[delay.id] = delayed[delay.id] || [delay, []];
+ delayed[delay.id][1].push(context);
+ } else {
+ list.push(context);
+ }
+ });
const batches = chunk(list, this.config.flushQueueSize);
// Promise.all() doesn't guarantee resolve order.
// Sequentially resolve to make sure backend receives events in order
- await batches.reduce(async (promise, batch) => {
+ const regularEventBatch = batches.reduce(async (promise, batch) => {
await promise;
return await this.send(batch, useRetry);
}, Promise.resolve());
+ const eventPromises = [regularEventBatch];
+ const delayedEventsBatches = this.getDelayedEventsBatches(delayed, useRetry);
+ eventPromises.push(...delayedEventsBatches);
+
+ await Promise.all(eventPromises);
+
// Mark current flush is done
this.flushId = null;
this.scheduleEvents(this.queue);
}
- async send(list: Context[], useRetry = true) {
+ getDelayedEventsBatches(delayed: Record<string, [Delay, Context[]]>, useRetry: boolean) {
+ const eventPromises = [];
+ try {
+ for (const [delay, contexts] of Object.values(delayed)) {
+ eventPromises.push(this.send(contexts, useRetry, delay));
+ }
+ } catch (e) {
+ // swallow error
+ }
+ return eventPromises;
+ }
+
+ translatePayloadToDelayedPayload(payload: Payload, list: Context[], delay: Delay): DelayedPayload {
+ const delayedPayload: DelayedPayload = {
+ ...payload,
+ id: delay.id,
+ timeout: list.reduce((max, context) => Math.max(max, context.event.delay!.timeout || 0), 0),
+ instant_events: payload.events.filter((_event, index) => !list[index].event.delay!.timeout),
+ events: payload.events.filter((_event, index) => list[index].event.delay!.timeout),
+ };
+ return delayedPayload;
+ }
+
+ async send(list: Context[], useRetry = true, delay?: Delay) {
if (!this.config.apiKey) {
return this.fulfillRequest(list, 400, MISSING_API_KEY_MESSAGE);
}
- const payload = {
+ let payload: Payload = {
api_key: this.config.apiKey,
events: list.map((context) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
- const { extra, ...eventWithoutExtra } = context.event;
+ const { extra, delay, ...eventWithoutExtra } = context.event;
return eventWithoutExtra;
}),
options: {
min_id_length: this.config.minIdLength,
},
client_upload_time: new Date().toISOString(),
- request_metadata: this.config.requestMetadata,
+ request_metadata: delay ? undefined : this.config.requestMetadata,
};
- this.config.requestMetadata = new RequestMetadata();
+ if (!delay) {
+ this.config.requestMetadata = new RequestMetadata();
+ }
try {
- const { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
+ let { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
const shouldCompressUploadBody = shouldCompressUploadBodyForRequest(
serverUrl,
this.config.enableRequestBodyCompression,
);
+ if (delay) {
+ serverUrl = `${serverUrl}/delayed`;
+ payload = this.translatePayloadToDelayedPayload(payload, list, delay);
+ // TODO: if /delayed can't handle compression, then turn it off here
+ }
const res = await this.config.transportProvider.send(serverUrl, payload, shouldCompressUploadBody);
if (res === null) {
this.fulfillRequest(list, 0, UNEXPECTED_ERROR_MESSAGE);
@@ -437,9 +503,8 @@
* This is called on response comes back for a request
*/
removeEvents(eventsToRemove: Context[]) {
- this.queue = this.queue.filter(
- (queuedContext) => !eventsToRemove.some((context) => context.event.insert_id === queuedContext.event.insert_id),
- );
+ const eventsToRemoveSet = new Set(eventsToRemove);
+ this.queue = this.queue.filter((queuedContext) => !eventsToRemoveSet.has(queuedContext));
this.saveEvents();
}
diff --git a/packages/analytics-core/src/types/event/base-event.ts b/packages/analytics-core/src/types/event/base-event.ts
--- a/packages/analytics-core/src/types/event/base-event.ts
+++ b/packages/analytics-core/src/types/event/base-event.ts
@@ -1,6 +1,11 @@
import { Plan } from './plan';
import { IngestionMetadataEventProperty } from './ingestion-metadata';
+export interface Delay {
+ id: string;
+ timeout?: number;
+}
+
export interface BaseEvent extends EventOptions {
event_type: string;
event_properties?: { [key: string]: any } | undefined;
@@ -52,4 +57,5 @@
android_app_set_id?: string;
extra?: { [key: string]: any };
groups?: { [key: string]: any } | undefined;
+ delay?: Delay;
}
diff --git a/packages/analytics-core/src/types/payload.ts b/packages/analytics-core/src/types/payload.ts
--- a/packages/analytics-core/src/types/payload.ts
+++ b/packages/analytics-core/src/types/payload.ts
@@ -12,3 +12,9 @@
client_upload_time?: string;
request_metadata?: RequestMetadata;
}
+
+export interface DelayedPayload extends Payload {
+ id: string;
+ timeout: number;
+ instant_events?: readonly Event[];
+}
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -111,6 +111,37 @@
expect(schedule).toHaveBeenCalledTimes(1);
expect(saveEvents).toHaveBeenCalledTimes(1);
});
+
+ test('should deduplicate events with the same insert_id and delay_id', async () => {
+ const destination = new Destination();
+ destination.config = useDefaultConfig();
+ const event1 = {
+ event_type: 'before',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const event2 = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const expectedEvent = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const staleResult = destination.execute(event1);
+ void destination.execute(event2);
+
+ expect(destination.queue.length).toBe(1);
+ expect(destination.queue[0].event).toEqual(expectedEvent);
+
+ await expect(staleResult).resolves.toEqual({
+ event: event1,
+ code: 0,
+ message: 'Stale event overwritten',
+ });
+ });
});
describe('removeEventsExceedFlushMaxRetries', () => {
@@ -1700,4 +1731,247 @@
expect(result).toBe('');
});
});
+
+ describe('delayed events', () => {
+ const successResponse = {
+ status: Status.Success,
+ statusCode: 200,
+ body: {
+ eventsIngested: 1,
+ payloadSizeBytes: 1,
+ serverUploadTime: 1,
+ },
+ };
+ const delayedUrl = `${AMPLITUDE_SERVER_URL}/delayed`;
+
+ let destination: Destination;
+ let transportProvider: { send: jest.Mock };
+
+ const createContext = (
+ event: { event_type: string; delay?: { id: string; timeout?: number } },
+ callback = jest.fn(),
+ timeout = 0,
+ ): Context => ({
+ attempts: 0,
+ callback,
+ event,
+ timeout,
+ });
+
+ const flushQueue = async (contexts: Context[]) => {
+ destination.queue = contexts;
+ await destination.flush(true);
+ };
+
+ const expectSuccess = (callback: jest.Mock, event: Context['event']) => {
+ expect(callback).toHaveBeenCalledWith({
+ event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ };
+
+ beforeEach(async () => {
+ destination = new Destination();
+ transportProvider = {
+ send: jest.fn().mockResolvedValue(successResponse),
+ };
+ await destination.setup({
+ ...useDefaultConfig(),
+ transportProvider,
+ apiKey: API_KEY,
+ serverUrl: AMPLITUDE_SERVER_URL,
+ });
+ });
+
+ test('should send delayed events to /delayed endpoint', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should not include delay in the sent events', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ const sentPayload = transportProvider.send.mock.calls[0][1] as Payload;
+ const sentEvent = sentPayload.events[0];
+ expect(sentEvent).not.toHaveProperty('delay');
+ expectSuccess(callback, event);
+ });
+
+ test('should send instant_events to /delayed endpoint when delay_timeout is not set', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'instant_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: 0,
+ events: [],
+ instant_events: [expect.objectContaining({ event_type: 'instant_event' })],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should keep delayed replacement queued when overwritten event is in flight', async () => {
+ const delayId = 'delay-123';
+ const event1 = { event_type: 'before', insert_id: '123', delay: { id: delayId } };
+ const event2 = { event_type: 'after', insert_id: '123', delay: { id: delayId } };
+ jest.spyOn(destination, 'schedule').mockImplementation(jest.fn);
+ let resolveSend!: (response: typeof successResponse) => void;
+ transportProvider.send.mockReturnValueOnce(
+ new Promise<typeof successResponse>((resolve) => {
+ resolveSend = resolve;
+ }),
+ );
+
+ const staleResult = destination.execute(event1);
+ const flushPromise = destination.flush(true);
+ const replacementResult = destination.execute(event2);
+
+ await expect(staleResult).resolves.toEqual({
+ event: event1,
+ code: 0,
+ message: 'Stale event overwritten',
+ });
+
+ const replacementContext = destination.queue[0];
+ resolveSend(successResponse);
+ await flushPromise;
+
+ expect(destination.queue).toEqual([replacementContext]);
+
+ await destination.flush(true);
+ await expect(replacementResult).resolves.toEqual({
+ event: event2,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ });
+
+ test('should send /delayed and regular events on same flush', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const regularCallback = jest.fn();
+ const delayedCallback = jest.fn();
+ const regularContext = createContext({ event_type: 'regular_event' }, regularCallback);
+ const delayedContext = createContext(
+ { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } },
+ delayedCallback,
+ );
+
+ await flushQueue([regularContext, delayedContext]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ AMPLITUDE_SERVER_URL,
+ expect.objectContaining({
+ events: [expect.objectContaining({ event_type: 'regular_event' })],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(regularCallback, regularContext.event);
+ expectSuccess(delayedCallback, delayedContext.event);
+ });
+
+ test('should send delayed events with different delay_ids on same flush', async () => {
+ const delayTimeout = 5000;
+ const callbackA = jest.fn();
+ const callbackB = jest.fn();
+ const delayedContextA = createContext(
+ { event_type: 'delayed_event_a', delay: { id: 'delay-a', timeout: delayTimeout } },
+ callbackA,
+ );
+ const delayedContextB = createContext(
+ { event_type: 'delayed_event_b', delay: { id: 'delay-b', timeout: delayTimeout } },
+ callbackB,
+ );
+
+ await flushQueue([delayedContextA, delayedContextB]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-a',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_a',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-b',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_b',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callbackA, delayedContextA.event);
+ expectSuccess(callbackB, delayedContextB.event);
+ });
+
+ test('should not send delayed events while backoff timeout is active', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'delayed_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ const context = createContext(event, callback, 1000);
+
+ destination.queue = [context];
+ await destination.flush(true);
+
+ expect(transportProvider.send).not.toHaveBeenCalled();
+ expect(callback).not.toHaveBeenCalled();
+ expect(destination.queue).toEqual([context]);
+ });
+ });
});You can send follow-ups to the cloud agent here.
98cf6ee to
5184c66
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 4 total unresolved issues (including 3 from previous reviews).
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Delayed flush ignores payload splitting
- Delayed event groups now use flushQueueSize chunking before sending so 413 retries resend smaller delayed batches.
Or push these changes by commenting:
@cursor push eeabfda9f3
Preview (eeabfda9f3)
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -1,5 +1,6 @@
import { DestinationPlugin } from '../types/plugin';
import { Event } from '../types/event/event';
+import { Delay } from '../types/event/base-event';
import { Result } from '../types/result';
import { Status } from '../types/status';
import {
@@ -32,6 +33,7 @@
import { IDiagnosticsClient } from '../diagnostics/diagnostics-client';
import { isSuccessStatusCode } from '../utils/status-code';
import { getStacktrace } from '../utils/debug';
+import { DelayedPayload, Payload } from '../types/payload';
export interface Context {
event: Event;
@@ -40,6 +42,13 @@
timeout: number;
}
+type DelayedEventGroup = {
+ delay: Delay;
+ contexts: Context[];
+};
+
+type DelayedEventsById = Record<string, DelayedEventGroup>;
+
const DEFAULT_AMPLITUDE_SERVER_URLS = new Set([
AMPLITUDE_SERVER_URL,
EU_AMPLITUDE_SERVER_URL,
@@ -125,12 +134,31 @@
callback: (result: Result) => resolve(result),
timeout: 0,
};
+ this.removeStaleDelayedEvents(event);
this.queue.push(context);
this.schedule(this.config.flushIntervalMillis);
this.saveEvents();
});
}
+ private removeStaleDelayedEvents(event: Event) {
+ if (!event.delay?.id) {
+ return;
+ }
+ const duplicatedEvents: Context[] = [];
+ const queue: Context[] = [];
+ /* istanbul ignore next */
+ this.queue.forEach((queuedContext) => {
+ if (queuedContext.event.insert_id === event.insert_id && queuedContext.event.delay?.id === event.delay?.id) {
+ duplicatedEvents.push(queuedContext);
+ } else {
+ queue.push(queuedContext);
+ }
+ });
+ duplicatedEvents.forEach((context) => context.callback(buildResult(context.event, 0, 'Stale event overwritten')));
+ this.queue = queue;
+ }
+
removeEventsExceedFlushMaxRetries(list: Context[]) {
return list.filter((context) => {
context.attempts += 1;
@@ -198,50 +226,105 @@
this.resetSchedule();
const list: Context[] = [];
+ const delayed: DelayedEventsById = {};
const later: Context[] = [];
- this.queue.forEach((context) => (context.timeout === 0 ? list.push(context) : later.push(context)));
+ this.queue.forEach((context) => {
+ if (context.timeout !== 0) {
+ later.push(context);
+ } else if (context.event.delay?.id) {
+ const delay = context.event.delay;
+ delayed[delay.id] = delayed[delay.id] || { delay, contexts: [] };
+ delayed[delay.id].contexts.push(context);
+ } else {
+ list.push(context);
+ }
+ });
const batches = chunk(list, this.config.flushQueueSize);
// Promise.all() doesn't guarantee resolve order.
// Sequentially resolve to make sure backend receives events in order
- await batches.reduce(async (promise, batch) => {
+ const regularEventBatch = batches.reduce(async (promise, batch) => {
await promise;
return await this.send(batch, useRetry);
}, Promise.resolve());
+ const eventPromises = [regularEventBatch];
+ if (Object.keys(delayed).length > 0) {
+ eventPromises.push(
+ this.getDelayedEventsBatches(delayed, useRetry).reduce(async (promise, batch) => {
+ await promise;
+ return await batch;
+ }, Promise.resolve()),
+ );
+ }
+
+ await Promise.all(eventPromises);
+
// Mark current flush is done
this.flushId = null;
this.scheduleEvents(this.queue);
}
- async send(list: Context[], useRetry = true) {
+ getDelayedEventsBatches(delayed: DelayedEventsById, useRetry: boolean) {
+ const eventPromises: Promise<void>[] = [];
+ try {
+ for (const { delay, contexts } of Object.values(delayed)) {
+ chunk(contexts, this.config.flushQueueSize).forEach((batch) => {
+ eventPromises.push(this.send(batch, useRetry, delay));
+ });
+ }
+ } catch (e) {
+ // swallow error
+ }
+ return eventPromises;
+ }
+
+ translatePayloadToDelayedPayload(payload: Payload, list: Context[], delay: Delay): DelayedPayload {
+ const delayedPayload: DelayedPayload = {
+ ...payload,
+ id: delay.id,
+ timeout: list.reduce((max, context) => Math.max(max, context.event.delay!.timeout || 0), 0),
+ instant_events: payload.events.filter((_event, index) => !list[index].event.delay!.timeout),
+ events: payload.events.filter((_event, index) => list[index].event.delay!.timeout),
+ };
+ return delayedPayload;
+ }
+
+ async send(list: Context[], useRetry = true, delay?: Delay) {
if (!this.config.apiKey) {
return this.fulfillRequest(list, 400, MISSING_API_KEY_MESSAGE);
}
- const payload = {
+ let payload: Payload = {
api_key: this.config.apiKey,
events: list.map((context) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
- const { extra, ...eventWithoutExtra } = context.event;
+ const { extra, delay, ...eventWithoutExtra } = context.event;
return eventWithoutExtra;
}),
options: {
min_id_length: this.config.minIdLength,
},
client_upload_time: new Date().toISOString(),
- request_metadata: this.config.requestMetadata,
+ request_metadata: delay ? undefined : this.config.requestMetadata,
};
- this.config.requestMetadata = new RequestMetadata();
+ if (!delay) {
+ this.config.requestMetadata = new RequestMetadata();
+ }
try {
- const { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
+ let { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
const shouldCompressUploadBody = shouldCompressUploadBodyForRequest(
serverUrl,
this.config.enableRequestBodyCompression,
);
+ if (delay) {
+ serverUrl = `${serverUrl}/delayed`;
+ payload = this.translatePayloadToDelayedPayload(payload, list, delay);
+ // TODO: if /delayed can't handle compression, then turn it off here
+ }
const res = await this.config.transportProvider.send(serverUrl, payload, shouldCompressUploadBody);
if (res === null) {
this.fulfillRequest(list, 0, UNEXPECTED_ERROR_MESSAGE);
diff --git a/packages/analytics-core/src/types/event/base-event.ts b/packages/analytics-core/src/types/event/base-event.ts
--- a/packages/analytics-core/src/types/event/base-event.ts
+++ b/packages/analytics-core/src/types/event/base-event.ts
@@ -1,6 +1,11 @@
import { Plan } from './plan';
import { IngestionMetadataEventProperty } from './ingestion-metadata';
+export interface Delay {
+ id: string;
+ timeout?: number;
+}
+
export interface BaseEvent extends EventOptions {
event_type: string;
event_properties?: { [key: string]: any } | undefined;
@@ -52,4 +57,5 @@
android_app_set_id?: string;
extra?: { [key: string]: any };
groups?: { [key: string]: any } | undefined;
+ delay?: Delay;
}
diff --git a/packages/analytics-core/src/types/payload.ts b/packages/analytics-core/src/types/payload.ts
--- a/packages/analytics-core/src/types/payload.ts
+++ b/packages/analytics-core/src/types/payload.ts
@@ -12,3 +12,9 @@
client_upload_time?: string;
request_metadata?: RequestMetadata;
}
+
+export interface DelayedPayload extends Payload {
+ id: string;
+ timeout: number;
+ instant_events?: readonly Event[];
+}
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -111,6 +111,37 @@
expect(schedule).toHaveBeenCalledTimes(1);
expect(saveEvents).toHaveBeenCalledTimes(1);
});
+
+ test('should deduplicate events with the same insert_id and delay_id', async () => {
+ const destination = new Destination();
+ destination.config = useDefaultConfig();
+ const event1 = {
+ event_type: 'before',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const event2 = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const expectedEvent = {
+ event_type: 'after',
+ insert_id: '123',
+ delay: { id: 'delay-123' },
+ };
+ const staleResult = destination.execute(event1);
+ void destination.execute(event2);
+
+ expect(destination.queue.length).toBe(1);
+ expect(destination.queue[0].event).toEqual(expectedEvent);
+
+ await expect(staleResult).resolves.toEqual({
+ event: event1,
+ code: 0,
+ message: 'Stale event overwritten',
+ });
+ });
});
describe('removeEventsExceedFlushMaxRetries', () => {
@@ -1700,4 +1731,246 @@
expect(result).toBe('');
});
});
+
+ describe('delayed events', () => {
+ const successResponse = {
+ status: Status.Success,
+ statusCode: 200,
+ body: {
+ eventsIngested: 1,
+ payloadSizeBytes: 1,
+ serverUploadTime: 1,
+ },
+ };
+ const delayedUrl = `${AMPLITUDE_SERVER_URL}/delayed`;
+
+ let destination: Destination;
+ let transportProvider: { send: jest.Mock };
+
+ const createContext = (
+ event: { event_type: string; delay?: { id: string; timeout?: number } },
+ callback = jest.fn(),
+ timeout = 0,
+ ): Context => ({
+ attempts: 0,
+ callback,
+ event,
+ timeout,
+ });
+
+ const flushQueue = async (contexts: Context[]) => {
+ destination.queue = contexts;
+ await destination.flush(true);
+ };
+
+ const expectSuccess = (callback: jest.Mock, event: Context['event']) => {
+ expect(callback).toHaveBeenCalledWith({
+ event,
+ code: 200,
+ message: SUCCESS_MESSAGE,
+ });
+ };
+
+ beforeEach(async () => {
+ destination = new Destination();
+ transportProvider = {
+ send: jest.fn().mockResolvedValue(successResponse),
+ };
+ await destination.setup({
+ ...useDefaultConfig(),
+ transportProvider,
+ apiKey: API_KEY,
+ serverUrl: AMPLITUDE_SERVER_URL,
+ });
+ });
+
+ test('should send delayed events to /delayed endpoint', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should not include delay in the sent events', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const event = { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ const sentPayload = transportProvider.send.mock.calls[0][1] as Payload;
+ const sentEvent = sentPayload.events[0];
+ expect(sentEvent).not.toHaveProperty('delay');
+ expectSuccess(callback, event);
+ });
+
+ test('should send instant_events to /delayed endpoint when delay_timeout is not set', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'instant_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ await flushQueue([createContext(event, callback)]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(1);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: 0,
+ events: [],
+ instant_events: [expect.objectContaining({ event_type: 'instant_event' })],
+ }),
+ true,
+ );
+ expectSuccess(callback, event);
+ });
+
+ test('should send /delayed and regular events on same flush', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ const regularCallback = jest.fn();
+ const delayedCallback = jest.fn();
+ const regularContext = createContext({ event_type: 'regular_event' }, regularCallback);
+ const delayedContext = createContext(
+ { event_type: 'delayed_event', delay: { id: delayId, timeout: delayTimeout } },
+ delayedCallback,
+ );
+
+ await flushQueue([regularContext, delayedContext]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ AMPLITUDE_SERVER_URL,
+ expect.objectContaining({
+ events: [expect.objectContaining({ event_type: 'regular_event' })],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenCalledWith(
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ timeout: delayTimeout,
+ events: [expect.objectContaining({ event_type: 'delayed_event' })],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(regularCallback, regularContext.event);
+ expectSuccess(delayedCallback, delayedContext.event);
+ });
+
+ test('should send delayed events with different delay_ids on same flush', async () => {
+ const delayTimeout = 5000;
+ const callbackA = jest.fn();
+ const callbackB = jest.fn();
+ const delayedContextA = createContext(
+ { event_type: 'delayed_event_a', delay: { id: 'delay-a', timeout: delayTimeout } },
+ callbackA,
+ );
+ const delayedContextB = createContext(
+ { event_type: 'delayed_event_b', delay: { id: 'delay-b', timeout: delayTimeout } },
+ callbackB,
+ );
+
+ await flushQueue([delayedContextA, delayedContextB]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-a',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_a',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ delayedUrl,
+ expect.objectContaining({
+ id: 'delay-b',
+ timeout: delayTimeout,
+ events: [
+ expect.objectContaining({
+ event_type: 'delayed_event_b',
+ }),
+ ],
+ instant_events: [],
+ }),
+ true,
+ );
+ expectSuccess(callbackA, delayedContextA.event);
+ expectSuccess(callbackB, delayedContextB.event);
+ });
+
+ test('should split delayed events by flushQueueSize', async () => {
+ const delayId = 'delay-123';
+ const delayTimeout = 5000;
+ destination.config.flushQueueSize = 2;
+
+ await flushQueue([
+ createContext({ event_type: 'delayed_event_a', delay: { id: delayId, timeout: delayTimeout } }),
+ createContext({ event_type: 'delayed_event_b', delay: { id: delayId, timeout: delayTimeout } }),
+ createContext({ event_type: 'delayed_event_c', delay: { id: delayId, timeout: delayTimeout } }),
+ ]);
+
+ expect(transportProvider.send).toHaveBeenCalledTimes(2);
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 1,
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ events: [
+ expect.objectContaining({ event_type: 'delayed_event_a' }),
+ expect.objectContaining({ event_type: 'delayed_event_b' }),
+ ],
+ }),
+ true,
+ );
+ expect(transportProvider.send).toHaveBeenNthCalledWith(
+ 2,
+ delayedUrl,
+ expect.objectContaining({
+ id: delayId,
+ events: [expect.objectContaining({ event_type: 'delayed_event_c' })],
+ }),
+ true,
+ );
+ });
+
+ test('should not send delayed events while backoff timeout is active', async () => {
+ const delayId = 'delay-123';
+ const event = { event_type: 'delayed_event', delay: { id: delayId } };
+ const callback = jest.fn();
+ const context = createContext(event, callback, 1000);
+
+ destination.queue = [context];
+ await destination.flush(true);
+
+ expect(transportProvider.send).not.toHaveBeenCalled();
+ expect(callback).not.toHaveBeenCalled();
+ expect(destination.queue).toEqual([context]);
+ });
+ });
});You can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 5184c66. Configure here.
| const eventPromises = []; | ||
| try { | ||
| for (const { delay, contexts } of Object.values(delayed)) { | ||
| eventPromises.push(this.send(contexts, useRetry, delay)); |
There was a problem hiding this comment.
Delayed flush ignores payload splitting
Medium Severity
Delayed events grouped by delay.id are sent in one /delayed request with no flushQueueSize chunking, unlike regular events. On a 413 response, handlePayloadTooLargeResponse halves flushQueueSize but retries still resend the full delayed group, so recovery can fail until max retries drop the events.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 5184c66. Configure here.
There was a problem hiding this comment.
This is by design. We can't split payloads in the delayed events endpoint. The heartbeat.ts class (next PR) will manage the payload size to ensure that it's well under the limit.
… AMP3-151283-delayed-events-types
5184c66 to
8da6d5b
Compare
|
|
||
| export interface Delay { | ||
| id: string; | ||
| timeout?: number; |
There was a problem hiding this comment.
Do I understand it right that timeout here is optional because
- if event.delay.id && !event.delay.timeout, it's an instant event
- if event.delay.id && event.delay.timeout, it's a delayed event
There was a problem hiding this comment.
That's correct yes
| eventPromises.push(this.send(contexts, useRetry, delay)); | ||
| } | ||
| } catch (e) { | ||
| // swallow error |
There was a problem hiding this comment.
what error can we catch here?
There was a problem hiding this comment.
There's no errors that can be caught, this is just to be extra safe in case a future change introduces an exception.
| type DelayedEventGroup = { | ||
| delay: Delay; | ||
| contexts: Context[]; | ||
| }; | ||
|
|
||
| type DelayedEventsById = Record<string, DelayedEventGroup>; |
There was a problem hiding this comment.
How about this?
export interface Context {
event: Event;
attempts: number;
callback: EventCallback;
timeout: number;
+ delay: Delay; // add delay here
}
// delayed id: context
type DelayedEventsById = Record<string, Context>;


Summary
Checklist
Note
Medium Risk
Changes the core destination flush/send path and introduces a new
/delayedAPI contract, but behavior is additive behind an optional field and is covered by extensive tests.Overview
Adds optional
delaymetadata on events (id, optionaltimeout) and teaches the destination plugin to handle delayed ingestion separately from normal uploads.On flush, ready events are split into regular batches (unchanged path) and delayed groups keyed by
delay.id. Delayed groups POST to{serverUrl}/delayedas aDelayedPayload(id, aggregatedtimeout, timedeventsvsinstant_eventswhentimeoutis unset). Thedelayfield is stripped from wire events;request_metadatais omitted and not reset for delayed sends. Re-enqueueing the sameinsert_id+delay.idreplaces the queued copy and resolves the prior promise with "Stale event overwritten".Tests cover deduplication, payload shape, mixed flush, multiple delay IDs, and backoff
timeoutbehavior.Reviewed by Cursor Bugbot for commit 8da6d5b. Bugbot is set up for automated code reviews on this repo. Configure here.