refactor(analytics-core): implement a Heartbeat class#1837
refactor(analytics-core): implement a Heartbeat class#1837daniel-graham-amplitude wants to merge 12 commits into
Conversation
6f48e2c to
3267a7a
Compare
size-limit report 📦
|
98d1898 to
5a4d443
Compare
28fc40f to
2dad03e
Compare
5a4d443 to
f033e3c
Compare
981cd51 to
5f19257
Compare
4b4fb2e to
d74674b
Compare
5f19257 to
2e3d833
Compare
d74674b to
05a12b8
Compare
2e3d833 to
f21c2e6
Compare
a870427 to
52701ea
Compare
4794393 to
8baca8d
Compare
111b793 to
c72e911
Compare
… AMP3-151283-delayed-events-types
7d350a6 to
8036427
Compare
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
c72e911 to
9c2fba5
Compare
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Coalesced reset skips new events
- Heartbeat now drains events added while an in-flight reset batch is awaiting, so coalesced callers receive results for newly queued events.
Or push these changes by commenting:
@cursor push ee91217966
Preview (ee91217966)
diff --git a/packages/analytics-core/src/heartbeat.ts b/packages/analytics-core/src/heartbeat.ts
--- a/packages/analytics-core/src/heartbeat.ts
+++ b/packages/analytics-core/src/heartbeat.ts
@@ -15,7 +15,7 @@
private delayId: string;
private interval: NodeJS.Timeout | null = null;
private resetPromise: Promise<Result[]> | null = null;
-
+
constructor(
private client: CoreClient,
private pulse: number,
@@ -27,32 +27,41 @@
}
private async heartbeat() {
- // stop sending heartbeats if no events are queued
- if (this.events.size === 0) {
- this.stop();
- return [];
- }
+ const trackedResults: Result[] = [];
+ const trackedEventIds = new Set<string>();
+ let events = [...this.events.entries()];
- // track all of the delayed events
- const trackedEvents = [];
- for (const event of this.events.values()) {
- const { event_type, event_properties, ...eventOptions } = event;
- const eventPromise = this.client.track(event_type, event_properties, eventOptions).promise;
+ while (events.length > 0) {
+ // track all of the delayed events
+ const trackedEvents = [];
+ for (const [insertId, event] of events) {
+ trackedEventIds.add(insertId);
+ const { event_type, event_properties, ...eventOptions } = event;
+ const eventPromise = this.client.track(event_type, event_properties, eventOptions).promise;
- // if the event has no timeout, then it's instant ingested and we can
- // delete it after it was ingested
- if (this.isInstantEvent(event)) {
- eventPromise.finally(() => this.events.delete(event.insert_id!));
+ // if the event has no timeout, then it's instant ingested and we can
+ // delete it after it was ingested
+ if (this.isInstantEvent(event)) {
+ eventPromise.finally(() => this.events.delete(insertId));
+ }
+ trackedEvents.push(eventPromise);
}
- trackedEvents.push(eventPromise);
+
+ trackedResults.push(...(await Promise.all(trackedEvents)));
+ events = [...this.events.entries()].filter(([insertId]) => !trackedEventIds.has(insertId));
}
- return await Promise.all(trackedEvents);
+
+ // stop sending heartbeats if no events are queued
+ if (trackedEventIds.size === 0) {
+ this.stop();
+ }
+ return trackedResults;
}
private async resetHeartbeat() {
// if a reset is already in progress, return the existing promise
if (this.resetPromise) return await this.resetPromise;
-
+
// reset the heartbeat interval
if (this.interval) clearInterval(this.interval);
this.interval = setInterval(() => void this.heartbeat(), this.pulse);
@@ -63,7 +72,7 @@
resolve(this.heartbeat());
}, 0);
});
- this.resetPromise.finally(() => this.resetPromise = null);
+ this.resetPromise.finally(() => (this.resetPromise = null));
return await this.resetPromise;
}
diff --git a/packages/analytics-core/test/heartbeat.test.ts b/packages/analytics-core/test/heartbeat.test.ts
--- a/packages/analytics-core/test/heartbeat.test.ts
+++ b/packages/analytics-core/test/heartbeat.test.ts
@@ -325,6 +325,50 @@
await Promise.all([res1, res2]);
expect(heartbeatMock).toHaveBeenCalledTimes(1);
});
+
+ test('should flush events added while a reset heartbeat is in flight', async () => {
+ const result1 = { event: { insert_id: '1', event_id: 1 }, code: 200, message: 'OK' };
+ const result2 = { event: { insert_id: '2', event_id: 2 }, code: 200, message: 'OK' };
+ let resolve1!: (result: typeof result1) => void;
+ let resolve2!: (result: typeof result2) => void;
+ trackMock.mockImplementation((...args) => {
+ const eventOptions = args[2];
+ return {
+ promise: new Promise((resolve) => {
+ if (eventOptions.insert_id === '1') {
+ resolve1 = resolve;
+ } else {
+ resolve2 = resolve;
+ }
+ }),
+ };
+ });
+
+ const res1 = heartbeat.trackNoDelay({ insert_id: '1', event_type: 'test', event_properties: { test: 'test' } });
+ await jest.advanceTimersByTimeAsync(0);
+ expect(trackMock).toHaveBeenCalledTimes(1);
+
+ const res2 = heartbeat.trackNoDelay({ insert_id: '2', event_type: 'test', event_properties: { test: 'test' } });
+ expect(trackMock).toHaveBeenCalledTimes(1);
+
+ resolve1(result1);
+ await Promise.resolve();
+ await Promise.resolve();
+ expect(trackMock).toHaveBeenCalledTimes(2);
+
+ resolve2(result2);
+ await expect(res1).resolves.toEqual(result1);
+ await expect(res2).resolves.toEqual(result2);
+ expect(trackMock).toHaveBeenNthCalledWith(
+ 2,
+ 'test',
+ { test: 'test' },
+ {
+ insert_id: '2',
+ delay: { id: expect.any(String) },
+ },
+ );
+ });
});
describe('kitchen sink', () => {You can send follow-ups to the cloud agent here.
… AMP3-151283-delayed-events-types
8036427 to
4e44458
Compare
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
55eb654 to
0fe30a2
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Overlapping heartbeat interval runs
- Added heartbeat in-flight serialization so interval ticks skip active runs while reset-triggered heartbeats queue safely without concurrent tracking.
Or push these changes by commenting:
@cursor push b5ba9aaee7
Preview (b5ba9aaee7)
diff --git a/packages/analytics-core/src/heartbeat.ts b/packages/analytics-core/src/heartbeat.ts
--- a/packages/analytics-core/src/heartbeat.ts
+++ b/packages/analytics-core/src/heartbeat.ts
@@ -14,8 +14,10 @@
private events: Map<string, DelayedEvent>;
private delayId: string;
private interval: NodeJS.Timeout | null = null;
+ private heartbeatPromise: Promise<Result[]> | null = null;
+ private queuedHeartbeatPromise: Promise<Result[]> | null = null;
private resetPromise: Promise<Result[]> | null = null;
-
+
constructor(
private client: CoreClient,
private pulse: number,
@@ -26,7 +28,7 @@
this.delayId = UUID();
}
- private async heartbeat(): Promise<Result[]> {
+ private async runHeartbeat(): Promise<Result[]> {
// stop sending heartbeats if no events are queued
if (this.events.size === 0) {
this.stop();
@@ -49,10 +51,38 @@
return await Promise.all(trackedEvents);
}
+ private async executeHeartbeat(): Promise<Result[]> {
+ const heartbeatPromise = this.runHeartbeat();
+ this.heartbeatPromise = heartbeatPromise;
+ try {
+ return await heartbeatPromise;
+ } finally {
+ if (this.heartbeatPromise === heartbeatPromise) {
+ this.heartbeatPromise = null;
+ }
+ }
+ }
+
+ private async heartbeat(queueIfBusy = false): Promise<Result[]> {
+ if (this.heartbeatPromise) {
+ if (!queueIfBusy) return [];
+ if (!this.queuedHeartbeatPromise) {
+ this.queuedHeartbeatPromise = this.heartbeatPromise
+ .catch(() => undefined)
+ .then(() => {
+ this.queuedHeartbeatPromise = null;
+ return this.executeHeartbeat();
+ });
+ }
+ return await this.queuedHeartbeatPromise;
+ }
+ return await this.executeHeartbeat();
+ }
+
private async resetHeartbeat() {
// if a reset is already in progress, return the existing promise
if (this.resetPromise) return await this.resetPromise;
-
+
// reset the heartbeat interval
if (this.interval) clearInterval(this.interval);
this.interval = setInterval(() => void this.heartbeat(), this.pulse);
@@ -60,10 +90,10 @@
// invoke heartbeat on the next macrotask tick
this.resetPromise = new Promise((resolve) => {
setTimeout(() => {
- resolve(this.heartbeat());
+ resolve(this.heartbeat(true));
}, 0);
});
- this.resetPromise.finally(() => this.resetPromise = null);
+ this.resetPromise.finally(() => (this.resetPromise = null));
return await this.resetPromise;
}
diff --git a/packages/analytics-core/test/heartbeat.test.ts b/packages/analytics-core/test/heartbeat.test.ts
--- a/packages/analytics-core/test/heartbeat.test.ts
+++ b/packages/analytics-core/test/heartbeat.test.ts
@@ -333,9 +333,40 @@
await jest.advanceTimersByTimeAsync(0);
const [response1, response2] = await Promise.all([res1, res2]);
expect(heartbeatMock).toHaveBeenCalledTimes(1);
- expect(response1).toEqual({ event: { insert_id: '1', event_type: 'test', event_properties: { test: 'test1' } }, code: 200, message: 'success' });
- expect(response2).toEqual({ event: { insert_id: '2', event_type: 'test', event_properties: { test: 'test2' } }, code: 200, message: 'success' });
+ expect(response1).toEqual({
+ event: { insert_id: '1', event_type: 'test', event_properties: { test: 'test1' } },
+ code: 200,
+ message: 'success',
+ });
+ expect(response2).toEqual({
+ event: { insert_id: '2', event_type: 'test', event_properties: { test: 'test2' } },
+ code: 200,
+ message: 'success',
+ });
});
+
+ test('should not start another interval heartbeat while one is in progress', async () => {
+ let resolveTrack: () => void = () => undefined;
+ trackMock.mockReturnValue({
+ promise: new Promise((resolve) => {
+ resolveTrack = () => resolve({ event: { insert_id: '1' }, code: 200 });
+ }),
+ });
+
+ const resultPromise = heartbeat.trackNoDelay({
+ insert_id: '1',
+ event_type: 'test',
+ event_properties: { test: 'test' },
+ });
+ await jest.advanceTimersByTimeAsync(0);
+ expect(trackMock).toHaveBeenCalledTimes(1);
+
+ await jest.advanceTimersByTimeAsync(1000);
+ expect(trackMock).toHaveBeenCalledTimes(1);
+
+ resolveTrack();
+ await resultPromise;
+ });
});
describe('kitchen sink', () => {
@@ -350,7 +381,7 @@
}
expect(trackMock).toHaveBeenCalledTimes(events.length * 2);
jest.clearAllMocks();
- jest.advanceTimersByTime(1000);
+ await jest.advanceTimersByTimeAsync(1000);
expect(trackMock).toHaveBeenCalledTimes(events.length);
// update the properties on event 2
@@ -359,7 +390,7 @@
// advance to the next heartbeat
jest.clearAllMocks();
- jest.advanceTimersByTime(1000);
+ await jest.advanceTimersByTimeAsync(1000);
// check that the event was updated
expect(trackMock).toHaveBeenCalledTimes(events.length);
@@ -404,7 +435,7 @@
delay: { id: expect.any(String), timeout: 1000 },
},
);
- jest.advanceTimersByTime(1000);
+ await jest.advanceTimersByTimeAsync(1000);
expect(trackMock).toHaveBeenCalledTimes(5);
});
});You can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 0fe30a2. Configure here.
… AMP3-151283-delayed-events-types
4e44458 to
b97d312
Compare
f261b83 to
0bcde65
Compare
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
0bcde65 to
0743d2c
Compare
5184c66 to
8da6d5b
Compare
21272af to
e7204f6
Compare
| delay: Delay; | ||
| }; | ||
|
|
||
| const EVENTS_SIZE_LIMIT = 4 * 10_000; // 4KB |
There was a problem hiding this comment.
could you explain more about this? Looks like we take the json string length as a rough estimation of the size. standard English letters and numbers are 1 byte, some other language characters are 2 or 3 bytes and emojis are 4 bytes. I'd take const EVENTS_SIZE_LIMIT = 4 * 1_000 or const EVENTS_SIZE_LIMIT = 4 * 1_000 / 4; // 4KB to be safer
There was a problem hiding this comment.
That was a typo, it should be 4* 10_0000... and yeah it's not really 4kb just a string length of 4k which should be close enough (even in worst case, it could only reach 16kb which would still be under the limit).
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
e7204f6 to
c2ee17e
Compare


Summary
Example usage
Checklist
Note
Medium Risk
Changes core event-ingestion timing and batching semantics; the 2s default interval marked DO NOT MERGE could cause excessive traffic if shipped accidentally.
Overview
Adds a
Heartbeathelper inanalytics-corethat queues analytics events (byinsert_id), attaches shareddelaymetadata, and periodically re-sends the whole queue throughCoreClient.trackso delayed ingestion can stay atomic across updates.trackrequiresinsert_id, applies default delay timeout, kicks off an immediate heartbeat (with de-dupedresetHeartbeat), and enforces a ~4KB in-memory queue cap.updatemutates queued events;trackNoDelaystrips the timeout for immediate ingest while still using the delay group id;stopclears the timer and queue.getHeartbeatInstanceexposes one heartbeat per client and is re-exported from the package entry.Note: the default pulse is temporarily
2_000ms with an explicit “DO NOT MERGE” TODO (intended production value appears to be 60s). Broad Jest coverage documents track/update/flush/stop behavior.Reviewed by Cursor Bugbot for commit e7204f6. Bugbot is set up for automated code reviews on this repo. Configure here.