feat(analytics-browser): add heartbeat service to video tracker#1839
Draft
daniel-graham-amplitude wants to merge 14 commits into
Draft
feat(analytics-browser): add heartbeat service to video tracker#1839daniel-graham-amplitude wants to merge 14 commits into
daniel-graham-amplitude wants to merge 14 commits into
Conversation
Contributor
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Heartbeat survives capture stop
- VideoCapture.stop now flushes and clears the heartbeat so pending stop events are finalized and the interval is disposed.
- ✅ Fixed: Debug heartbeat interval shipped
- The heartbeat interval is restored to the intended 60,000 ms production pulse.
Or push these changes by commenting:
@cursor push d3969ced8b
Preview (d3969ced8b)
diff --git a/packages/analytics-browser/src/video-capture/video-capture.ts b/packages/analytics-browser/src/video-capture/video-capture.ts
--- a/packages/analytics-browser/src/video-capture/video-capture.ts
+++ b/packages/analytics-browser/src/video-capture/video-capture.ts
@@ -5,14 +5,19 @@
EmbeddedVideoPlayer,
VideoVendor,
UUID,
+ BaseEvent,
+ Heartbeat,
} from '@amplitude/analytics-core';
+const DEFAULT_HEARTBEAT_INTERVAL = 60_000;
+const DEFAULT_HEARTBEAT_DELAY_TIMEOUT = 3_600_000; // 1 hour
export class VideoCapture {
private videoEl: HTMLVideoElement | null = null;
+ private heartbeat: Heartbeat | null = null;
private embeddedVideoPlayer: EmbeddedVideoPlayer | null = null;
private vendor?: VideoVendor;
private extraEventProperties: Record<string, string | number | boolean> = {};
-
+ private stopEvent: BaseEvent | null = null;
private listeners: ((previousState: VideoState, nextState: VideoState) => void)[] = [];
private onRemoveListeners: (() => void)[] = [];
@@ -68,10 +73,37 @@
captureVideoStarted(): VideoCapture {
this.listeners.push((previousState, nextState) => {
if (previousState.playbackState !== 'playing' && nextState.playbackState === 'playing') {
- // TODO: placeholder for Heartbeat Start Event
- this.amplitude.track('Video Content Started', {
- ...nextState.lastEvent,
- ...this.extraEventProperties,
+ // Queue up a stop event to go along with the start event
+ const startEvent = {
+ event_type: 'Video Content Started',
+ event_properties: {
+ ...nextState.lastEvent,
+ ...this.extraEventProperties,
+ },
+ };
+ this.stopEvent = {
+ ...startEvent,
+ event_type: 'Video Content Stopped',
+ event_properties: {
+ ...nextState.lastEvent,
+ watch_duration: nextState.watchTime,
+ position: nextState.position,
+ percent_completed: ((nextState.position ?? 0) / (nextState.lastEvent?.duration ?? 0)) * 100,
+ ...this.extraEventProperties,
+ },
+ };
+ const start = this.heartbeat?.trackNoDelay(startEvent);
+ const stop = this.heartbeat?.track(this.stopEvent);
+
+ // if either start or stop fails, stop capturing
+ void Promise.all([start, stop]).then((results) => {
+ results.forEach((result) => {
+ if (result && (result.code < 200 || result.code >= 400)) {
+ // delayed event service is down, stop capturing
+ // TODO: add a logging event here
+ this.stop();
+ }
+ });
});
}
});
@@ -84,12 +116,26 @@
*/
captureVideoStopped(): VideoCapture {
this.listeners.push((previousState, nextState) => {
- if (previousState.playbackState === 'playing' && nextState.playbackState !== 'playing') {
- // placeholder for Heartbeat Stop Event
- this.amplitude.track('Video Content Stopped', {
- ...nextState.lastEvent,
+ // update the delayed event properties to have
+ // the most up-to-date values
+ if (this.stopEvent) {
+ this.stopEvent.event_properties = {
+ ...this.stopEvent.event_properties,
watch_duration: nextState.watchTime,
+ position: nextState.position,
+ percent_completed: ((nextState.position ?? 0) / (nextState.lastEvent?.duration ?? 0)) * 100,
...this.extraEventProperties,
+ };
+ }
+ if (previousState.playbackState === 'playing' && nextState.playbackState !== 'playing') {
+ void this.heartbeat?.flush().then((results) => {
+ results.forEach((result) => {
+ if (result.code < 200 || result.code >= 400) {
+ // delayed event service is down, stop capturing
+ // TODO: add a logging event here
+ this.stop();
+ }
+ });
});
}
});
@@ -106,6 +152,7 @@
* @throws An error if the video element is not specified.
*/
start(): VideoCapture {
+ this.heartbeat = new Heartbeat(this.amplitude, DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_DELAY_TIMEOUT);
const videoEl = this.videoEl ?? this.embeddedVideoPlayer;
if (!videoEl) {
throw new Error(
@@ -135,6 +182,9 @@
stop() {
this.onRemoveListeners.forEach((listener) => listener());
this.onRemoveListeners = [];
+ void this.heartbeat?.flush();
+ this.heartbeat = null;
+ this.stopEvent = null;
}
}
diff --git a/packages/analytics-browser/test/video-capture/video-capture.test.ts b/packages/analytics-browser/test/video-capture/video-capture.test.ts
--- a/packages/analytics-browser/test/video-capture/video-capture.test.ts
+++ b/packages/analytics-browser/test/video-capture/video-capture.test.ts
@@ -4,6 +4,18 @@
import { VideoCapture, trackVideo } from '../../src/video-capture/video-capture';
import { currentVideoObserver, resetMockVideoObserver } from './mock-video-observer';
+const createTrackResult = (...args: unknown[]) => {
+ const event =
+ typeof args[0] === 'string'
+ ? {
+ event_type: args[0],
+ event_properties: args[1],
+ ...((args[2] as Record<string, unknown>) ?? {}),
+ }
+ : args[0];
+ return { promise: Promise.resolve({ code: 200, event }) };
+};
+
jest.mock('@amplitude/analytics-core', () => {
const actual = jest.requireActual<typeof import('@amplitude/analytics-core')>('@amplitude/analytics-core');
const { MockVideoObserver } = jest.requireActual<typeof import('./mock-video-observer')>('./mock-video-observer');
@@ -19,7 +31,7 @@
beforeEach(() => {
resetMockVideoObserver();
mockAmplitude = {
- track: jest.fn(),
+ track: jest.fn(createTrackResult),
} as unknown as AmplitudeBrowser;
});
@@ -39,23 +51,34 @@
let previousState: VideoState = { playbackState: 'paused', lastEvent: undefined };
let nextState: VideoState = { playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined } };
currentVideoObserver!.emitStateChange(previousState, nextState);
- expect(mockAmplitude.track).toHaveBeenCalledWith('Video Content Started', {
- duration: 10,
- hello: 'world',
- number: 123,
- });
+ expect(mockAmplitude.track).toHaveBeenCalledWith(
+ expect.objectContaining({
+ event_type: 'Video Content Started',
+ event_properties: {
+ duration: 10,
+ hello: 'world',
+ number: 123,
+ },
+ }),
+ );
// mock a pause event
previousState = nextState;
- nextState = { playbackState: 'paused', lastEvent: { duration: 10, last_position: 5 } };
+ nextState = { playbackState: 'paused', lastEvent: { duration: 10, last_position: 5 }, watchTime: 5, position: 5 };
currentVideoObserver!.emitStateChange(previousState, nextState);
- expect(mockAmplitude.track).toHaveBeenCalledWith('Video Content Stopped', {
- duration: 10,
- last_position: 5,
- hello: 'world',
- number: 123,
- });
- expect(mockAmplitude.track).toHaveBeenCalledTimes(2);
+ expect(mockAmplitude.track).toHaveBeenCalledWith(
+ 'Video Content Stopped',
+ expect.objectContaining({
+ duration: 10,
+ hello: 'world',
+ number: 123,
+ watch_duration: 5,
+ position: 5,
+ percent_completed: 50,
+ }),
+ expect.objectContaining({ delay_timeout: 0 }),
+ );
+ expect(mockAmplitude.track).toHaveBeenCalledTimes(3);
// stop the capture
capture.stop();
@@ -66,7 +89,60 @@
currentVideoObserver!.emitStateChange(previousState, nextState);
// assert that the track method was not called again
+ expect(mockAmplitude.track).toHaveBeenCalledTimes(3);
+ });
+
+ it('should flush the pending stop event when capture stops while playing', () => {
+ jest.useFakeTimers();
+ const capture = new VideoCapture(mockAmplitude)
+ .withVideoElement(document.createElement('video'))
+ .captureVideoStarted()
+ .captureVideoStopped()
+ .start();
+
+ currentVideoObserver!.emitStateChange(
+ { playbackState: 'paused', lastEvent: undefined },
+ { playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined }, watchTime: 5, position: 5 },
+ );
+ capture.stop();
+
+ expect(mockAmplitude.track).toHaveBeenCalledWith(
+ 'Video Content Stopped',
+ expect.objectContaining({
+ watch_duration: 5,
+ position: 5,
+ percent_completed: 50,
+ }),
+ expect.objectContaining({ delay_timeout: 0 }),
+ );
+ expect(mockAmplitude.track).toHaveBeenCalledTimes(3);
+
+ jest.advanceTimersByTime(60_000);
+ expect(mockAmplitude.track).toHaveBeenCalledTimes(3);
+ jest.useRealTimers();
+ });
+
+ it('should pulse the delayed stop event once per minute', () => {
+ jest.useFakeTimers();
+ const capture = new VideoCapture(mockAmplitude)
+ .withVideoElement(document.createElement('video'))
+ .captureVideoStarted()
+ .start();
+
+ currentVideoObserver!.emitStateChange(
+ { playbackState: 'paused', lastEvent: undefined },
+ { playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined } },
+ );
expect(mockAmplitude.track).toHaveBeenCalledTimes(2);
+
+ jest.advanceTimersByTime(500);
+ expect(mockAmplitude.track).toHaveBeenCalledTimes(2);
+
+ jest.advanceTimersByTime(59_500);
+ expect(mockAmplitude.track).toHaveBeenCalledTimes(3);
+
+ capture.stop();
+ jest.useRealTimers();
});
});
@@ -117,29 +193,40 @@
{ playbackState: 'paused', lastEvent: undefined },
{ playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined } },
);
- expect(mockAmplitude.track).toHaveBeenCalledWith('Video Content Started', {
- duration: 10,
- hello: 'world',
- number: 123,
- view_session_id: expect.any(String),
- });
+ expect(mockAmplitude.track).toHaveBeenCalledWith(
+ expect.objectContaining({
+ event_type: 'Video Content Started',
+ event_properties: {
+ duration: 10,
+ hello: 'world',
+ number: 123,
+ view_session_id: expect.any(String),
+ },
+ }),
+ );
currentVideoObserver!.emitStateChange(
{ playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined } },
- { playbackState: 'paused', lastEvent: { duration: 10, last_position: 5 } },
+ { playbackState: 'paused', lastEvent: { duration: 10, last_position: 5 }, watchTime: 5, position: 5 },
);
- expect(mockAmplitude.track).toHaveBeenCalledWith('Video Content Stopped', {
- duration: 10,
- last_position: 5,
- hello: 'world',
- number: 123,
- view_session_id: expect.any(String),
- });
+ expect(mockAmplitude.track).toHaveBeenCalledWith(
+ 'Video Content Stopped',
+ expect.objectContaining({
+ duration: 10,
+ hello: 'world',
+ number: 123,
+ view_session_id: expect.any(String),
+ watch_duration: 5,
+ position: 5,
+ percent_completed: 50,
+ }),
+ expect.objectContaining({ delay_timeout: 0 }),
+ );
typeof stopVideoCapture === 'function' && stopVideoCapture();
currentVideoObserver!.emitStateChange(
{ playbackState: 'paused', lastEvent: { duration: 10, last_position: 5 } },
{ playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined } },
);
- expect(mockAmplitude.track).toHaveBeenCalledTimes(2);
+ expect(mockAmplitude.track).toHaveBeenCalledTimes(3);
});
it('should capture start and stop events with embedded video player', () => {
@@ -153,19 +240,30 @@
{ playbackState: 'paused', lastEvent: undefined },
{ playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined } },
);
- expect(mockAmplitude.track).toHaveBeenCalledWith('Video Content Started', {
- duration: 10,
- view_session_id: expect.any(String),
- });
+ expect(mockAmplitude.track).toHaveBeenCalledWith(
+ expect.objectContaining({
+ event_type: 'Video Content Started',
+ event_properties: {
+ duration: 10,
+ view_session_id: expect.any(String),
+ },
+ }),
+ );
currentVideoObserver!.emitStateChange(
{ playbackState: 'playing', lastEvent: { duration: 10, last_position: undefined } },
- { playbackState: 'paused', lastEvent: { duration: 10, last_position: 5 } },
+ { playbackState: 'paused', lastEvent: { duration: 10, last_position: 5 }, watchTime: 5, position: 5 },
);
- expect(mockAmplitude.track).toHaveBeenCalledWith('Video Content Stopped', {
- duration: 10,
- last_position: 5,
- view_session_id: expect.any(String),
- });
+ expect(mockAmplitude.track).toHaveBeenCalledWith(
+ 'Video Content Stopped',
+ expect.objectContaining({
+ duration: 10,
+ view_session_id: expect.any(String),
+ watch_duration: 5,
+ position: 5,
+ percent_completed: 50,
+ }),
+ expect.objectContaining({ delay_timeout: 0 }),
+ );
typeof stopVideoCapture === 'function' && stopVideoCapture();
});
diff --git a/packages/analytics-core/src/heartbeat.ts b/packages/analytics-core/src/heartbeat.ts
new file mode 100644
--- /dev/null
+++ b/packages/analytics-core/src/heartbeat.ts
@@ -1,0 +1,70 @@
+import { CoreClient } from './types/client/core-client';
+import { BaseEvent } from './types/event/base-event';
+import { UUID } from './utils/uuid';
+
+export default class Heartbeat {
+ private events: Map<string, BaseEvent>;
+ private delayId: string;
+ private interval: NodeJS.Timeout | null = null;
+
+ constructor(private client: CoreClient, private pulse: number, private delayTimeout: number) {
+ this.events = new Map<string, BaseEvent>();
+ this.delayId = UUID();
+ }
+
+ private async resetHeartbeat() {
+ if (this.interval) clearInterval(this.interval);
+ this.interval = setInterval(() => void this.heartbeat(), this.pulse);
+ return await this.heartbeat();
+ }
+
+ private async heartbeat() {
+ const trackedEvents = [];
+ for (const event of this.events.values()) {
+ const { event_type, event_properties, ...eventOptions } = event;
+ const eventPromise = this.client.track(event_type, event_properties, eventOptions).promise;
+ trackedEvents.push(eventPromise);
+ }
+ return await Promise.all(trackedEvents);
+ }
+
+ async track(event: BaseEvent) {
+ event.insert_id = event.insert_id || UUID();
+ event.delay_id = event.delay_id || this.delayId;
+ event.delay_timeout = event.delay_timeout || this.delayTimeout;
+ this.events.set(event.insert_id, event);
+
+ // emit a heartbeat and restart the interval
+ const heartbeatResult = await this.resetHeartbeat();
+
+ // return the result for the event that was just tracked
+ return heartbeatResult.find((result) => result.event.insert_id === event.insert_id);
+ }
+
+ async trackNoDelay(event: BaseEvent) {
+ event.insert_id = event.insert_id || UUID();
+ event.delay_id = event.delay_id || this.delayId;
+ delete event.delay_timeout;
+ return this.client.track(event).promise;
+ }
+
+ async flush() {
+ const trackedEvents = [];
+ for (const event of this.events.values()) {
+ const { event_type, event_properties, ...eventOptions } = event;
+ eventOptions.delay_timeout = 0;
+ const eventPromise = this.client.track(event_type, event_properties, eventOptions).promise;
+ trackedEvents.push(eventPromise);
+ }
+ this.interval && clearInterval(this.interval);
+ this.interval = null;
+ this.events.clear();
+ return Promise.all(trackedEvents);
+ }
+
+ async update(event: BaseEvent) {
+ if (event.insert_id && this.events.has(event.insert_id)) {
+ this.events.set(event.insert_id, event);
+ }
+ }
+}
diff --git a/packages/analytics-core/src/index.ts b/packages/analytics-core/src/index.ts
--- a/packages/analytics-core/src/index.ts
+++ b/packages/analytics-core/src/index.ts
@@ -171,4 +171,5 @@
export { VideoObserver, State as VideoState, type VideoObserverParams } from './observers/video';
export { EmbeddedVideoPlayer, type Vendor as VideoVendor } from './video-analytics/types';
+export { default as Heartbeat } from './heartbeat';
export { isChromeExtension } from './utils/environment';
diff --git a/packages/analytics-core/src/plugins/destination.ts b/packages/analytics-core/src/plugins/destination.ts
--- a/packages/analytics-core/src/plugins/destination.ts
+++ b/packages/analytics-core/src/plugins/destination.ts
@@ -32,6 +32,7 @@
import { IDiagnosticsClient } from '../diagnostics/diagnostics-client';
import { isSuccessStatusCode } from '../utils/status-code';
import { getStacktrace } from '../utils/debug';
+import { DelayedPayload, Payload } from '../types/payload';
export interface Context {
event: Event;
@@ -198,30 +199,52 @@
this.resetSchedule();
const list: Context[] = [];
+ const delayed: Record<string, Context[]> = {};
const later: Context[] = [];
- this.queue.forEach((context) => (context.timeout === 0 ? list.push(context) : later.push(context)));
+ this.queue.forEach((context) => {
+ if (context.timeout !== 0) {
+ later.push(context);
+ } else if (context.event.delay_id) {
+ delayed[context.event.delay_id] = delayed[context.event.delay_id] || [];
+ delayed[context.event.delay_id].push(context);
+ } else {
+ list.push(context);
+ }
+ });
const batches = chunk(list, this.config.flushQueueSize);
// Promise.all() doesn't guarantee resolve order.
// Sequentially resolve to make sure backend receives events in order
- await batches.reduce(async (promise, batch) => {
+ const regularEventBatch = batches.reduce(async (promise, batch) => {
await promise;
return await this.send(batch, useRetry);
}, Promise.resolve());
+ const eventPromises = [regularEventBatch];
+ for (const delayId of Object.keys(delayed)) {
+ const delayedBatches = chunk(delayed[delayId], this.config.flushQueueSize);
+ const delayedEventBatch = delayedBatches.reduce(async (promise, batch) => {
+ await promise;
+ return await this.send(batch, useRetry, delayId);
+ }, Promise.resolve());
+ eventPromises.push(delayedEventBatch);
+ }
+
+ await Promise.all(eventPromises);
+
// Mark current flush is done
this.flushId = null;
this.scheduleEvents(this.queue);
}
- async send(list: Context[], useRetry = true) {
+ async send(list: Context[], useRetry = true, delayId?: string) {
if (!this.config.apiKey) {
return this.fulfillRequest(list, 400, MISSING_API_KEY_MESSAGE);
}
- const payload = {
+ const payload: Payload = {
api_key: this.config.apiKey,
events: list.map((context) => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
@@ -237,11 +260,21 @@
this.config.requestMetadata = new RequestMetadata();
try {
- const { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
+ let { serverUrl } = createServerConfig(this.config.serverUrl, this.config.serverZone, this.config.useBatch);
const shouldCompressUploadBody = shouldCompressUploadBodyForRequest(
serverUrl,
this.config.enableRequestBodyCompression,
);
+ if (delayId) {
+ serverUrl = `${serverUrl}/delayed`;
+ const delayedPayload = payload as DelayedPayload;
+ delayedPayload.id = delayId;
+ const instantEvents = payload.events.filter((event) => !event.delay_timeout);
+ const delayedEvents = payload.events.filter((event) => event.delay_timeout);
+ delayedPayload.events = delayedEvents;
+ delayedPayload.instant_events = instantEvents;
+ delayedPayload.timeout = delayedEvents[0]?.delay_timeout || 0;
+ }
const res = await this.config.transportProvider.send(serverUrl, payload, shouldCompressUploadBody);
if (res === null) {
this.fulfillRequest(list, 0, UNEXPECTED_ERROR_MESSAGE);
diff --git a/packages/analytics-core/src/types/event/base-event.ts b/packages/analytics-core/src/types/event/base-event.ts
--- a/packages/analytics-core/src/types/event/base-event.ts
+++ b/packages/analytics-core/src/types/event/base-event.ts
@@ -52,4 +52,6 @@
android_app_set_id?: string;
extra?: { [key: string]: any };
groups?: { [key: string]: any } | undefined;
+ delay_id?: string;
+ delay_timeout?: number;
}
diff --git a/packages/analytics-core/src/types/payload.ts b/packages/analytics-core/src/types/payload.ts
--- a/packages/analytics-core/src/types/payload.ts
+++ b/packages/analytics-core/src/types/payload.ts
@@ -12,3 +12,9 @@
client_upload_time?: string;
request_metadata?: RequestMetadata;
}
+
+export interface DelayedPayload extends Payload {
+ id: string;
+ timeout: number;
+ instant_events?: readonly Event[];
+}
diff --git a/packages/analytics-core/test/heartbeat.test.ts b/packages/analytics-core/test/heartbeat.test.ts
new file mode 100644
--- /dev/null
+++ b/packages/analytics-core/test/heartbeat.test.ts
@@ -1,0 +1,197 @@
+import Heartbeat from '../src/heartbeat';
+import { CoreClient } from '../src/types/client/core-client';
+
+describe('heartbeat', () => {
+ let mockClient: CoreClient;
+ let trackMock: jest.Mock;
+ let heartbeat: Heartbeat;
+
+ beforeEach(() => {
+ jest.useFakeTimers();
+ trackMock = jest.fn().mockReturnValue({
+ promise: Promise.resolve({ event: { event_id: 1 } }),
+ });
+ mockClient = {
+ track: trackMock,
+ } as unknown as CoreClient;
+ heartbeat = new Heartbeat(mockClient, 1000, 1000);
+ });
+
+ afterEach(() => {
+ jest.useRealTimers();
+ });
+
+ describe('track, update and flush', () => {
+ test('should track an event', async () => {
+ const event = {
+ event_type: 'test',
+ event_properties: { test: 'test' },
+ };
+ await heartbeat.track(event);
+ expect(trackMock).toHaveBeenCalledWith(event.event_type, event.event_properties, {
+ insert_id: expect.any(String),
+ delay_id: expect.any(String),
+ delay_timeout: 1000,
+ });
+ expect(trackMock).toHaveBeenCalledTimes(1);
+ });
+
+ test('should be able to update a previously tracked event', async () => {
+ const event = {
+ insert_id: '12345',
+ event_type: 'test',
+ event_properties: { test: 'stale' },
+ };
+ await heartbeat.track(event);
+ expect(trackMock).toHaveBeenCalledWith(
+ event.event_type,
+ { test: 'stale' },
+ {
+ insert_id: '12345',
+ delay_id: expect.any(String),
+ delay_timeout: 1000,
+ },
+ );
+ expect(trackMock).toHaveBeenCalledTimes(1);
+ jest.advanceTimersByTime(10);
+ event.event_properties.test = 'updated';
+ await heartbeat.update(event);
+ jest.advanceTimersByTime(1001);
+ expect(trackMock).toHaveBeenCalledTimes(2);
+ expect(trackMock).toHaveBeenNthCalledWith(
+ 2,
+ event.event_type,
+ { test: 'updated' },
+ {
+ insert_id: '12345',
+ delay_id: expect.any(String) as string,
+ delay_timeout: 1000,
+ },
+ );
+ });
+
+ test('does nothing if updating an event that is not tracked', async () => {
+ const event = {
+ insert_id: '12345',
+ event_type: 'test',
+ event_properties: { test: 'test' },
+ };
+ await heartbeat.update(event);
+ expect(trackMock).not.toHaveBeenCalled();
+ });
+
+ test('should preserve existing insert_id, delay_id and delay_timeout on track', async () => {
+ const event = {
+ insert_id: 'existing-id',
+ delay_id: 'existing-delay',
+ delay_timeout: 2000,
+ event_type: 'test',
+ event_properties: { test: 'test' },
+ };
+ await heartbeat.track(event);
+ expect(trackMock).toHaveBeenCalledWith(event.event_type, event.event_properties, {
+ insert_id: 'existing-id',
+ delay_id: 'existing-delay',
+ delay_timeout: 2000,
+ });
+ });
+
+ test('should return the track result for the tracked event', async () => {
+ const event = {
+ insert_id: 'abc',
+ event_type: 'test',
+ event_properties: { test: 'test' },
+ };
+ const trackResult = { event: { insert_id: 'abc', event_id: 1 }, code: 200 };
+ trackMock.mockReturnValue({ promise: Promise.resolve(trackResult) });
+ const result = await heartbeat.track(event);
+ expect(result).toEqual(trackResult);
+ });
+
+ test('should track an event without delay via trackNoDelay', async () => {
+ const event = {
+ event_type: 'instant',
+ event_properties: { test: 'test' },
+ };
+ await heartbeat.trackNoDelay(event);
+ expect(trackMock).toHaveBeenCalledWith({
+ insert_id: expect.any(String),
+ delay_id: expect.any(String),
+ event_type: 'instant',
+ event_properties: { test: 'test' },
+ });
+ });
+
+ test('should preserve existing insert_id and delay_id on trackNoDelay', async () => {
+ const event = {
+ insert_id: 'instant-1',
+ delay_id: 'delay-1',
+ delay_timeout: 1000,
+ event_type: 'instant',
+ event_properties: { test: 'test' },
+ };
+ await heartbeat.trackNoDelay(event);
+ expect(event.delay_timeout).toBeUndefined();
+ expect(trackMock).toHaveBeenCalledWith({
+ insert_id: 'instant-1',
+ delay_id: 'delay-1',
+ event_type: 'instant',
+ event_properties: { test: 'test' },
+ });
+ });
+
+ test('should handle flush when no events are tracked', async () => {
+ const result = await heartbeat.flush();
+ expect(result).toEqual([]);
+ expect(trackMock).not.toHaveBeenCalled();
+ });
+ });
+
+ describe('kitchen sink', () => {
+ test('should be able to track, update and flush a series of events', async () => {
+ const events = [
+ { insert_id: '1', event_type: 'test1', event_properties: { test: 'test1' } },
+ { insert_id: '2', event_type: 'test2', event_properties: { test: 'test2' } },
+ { insert_id: '3', event_type: 'test3', event_properties: { test: 'test3' } },
+ ];
+ for (const event of events) {
+ await heartbeat.track(event);
+ }
+ expect(trackMock).toHaveBeenCalledTimes(events.length * 2);
+ jest.clearAllMocks();
+ jest.advanceTimersByTime(1000);
+ expect(trackMock).toHaveBeenCalledTimes(events.length);
+
+ // update event 2
+ events[1].event_properties.test = 'test2-updated';
+ await heartbeat.update(events[1]);
+ expect(trackMock).toHaveBeenNthCalledWith(
+ 2,
+ events[1].event_type,
+ { test: 'test2-updated' },
+ {
+ insert_id: '2',
+ delay_id: expect.any(String) as string,
+ delay_timeout: 1000,
+ },
+ );
+
+ // flush all events with delay_timeout 0
+ jest.clearAllMocks();
+ await heartbeat.flush();
+ expect(trackMock).toHaveBeenCalledTimes(events.length);
+ for (const event of events) {
+ expect(trackMock).toHaveBeenCalledWith(
+ event.event_type,
+ event.event_properties,
+ expect.objectContaining({
+ insert_id: event.insert_id,
+ delay_timeout: 0,
+ }),
+ );
+ }
+ jest.advanceTimersByTime(1000);
+ expect(trackMock).toHaveBeenCalledTimes(events.length);
+ });
+ });
+});
diff --git a/packages/analytics-core/test/plugins/destination.test.ts b/packages/analytics-core/test/plugins/destination.test.ts
--- a/packages/analytics-core/test/plugins/destination.test.ts
+++ b/packages/analytics-core/test/plugins/destination.test.ts
@@ -1700,4 +1700,207 @@
expect(result).toBe('');
});
});
+
+ describe('delayed events', () => {
+ const successResponse = {
+ status: Status.Success,
+ statusCode: 200,
+ body: {
+ eventsIngested: 1,
+ payloadSizeBytes: 1,
+ serverUploadTime: 1,
+ },
+ };
+ const delayedUrl = `${AMPLITUDE_SERVER_URL}/delayed`;
+
+ let destination: Destination;
+ let transportProvider: { send: jest.Mock };
+
+ const createContext = (
+ event: { event_type: string; delay_id?: string; delay_timeout?: number },
+ callback = jest.fn(),
+ timeout = 0,
+ ): Context => ({
+ attempts: 0,
+ callback,
+ event,
+ timeout,
+ });
+
+ const flushQueue = async (contexts: Context[]) => {
+ destination.queue = contexts;
+ await destination.flush(true);
+ };
+
+ const expectSuccess = (callback: jest.Mock, event: Context['event']) => {
... diff truncated: showing 800 of 1002 linesYou can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 853e9fe. Configure here.
3267a7a to
28fc40f
Compare
853e9fe to
2de386b
Compare
28fc40f to
2dad03e
Compare
0502fd1 to
92df290
Compare
size-limit report 📦
|
c76fffa to
18f080e
Compare
981cd51 to
5f19257
Compare
18f080e to
de1608d
Compare
5f19257 to
2e3d833
Compare
f70b8f5 to
89020fd
Compare
2e3d833 to
f21c2e6
Compare
6dc08af to
ee7c6d5
Compare
f21c2e6 to
f68171a
Compare
ee7c6d5 to
aae8455
Compare
f68171a to
c1de51a
Compare
8a56440 to
5b75600
Compare
c1de51a to
d7f8723
Compare
5b75600 to
0e753d2
Compare
3338cfe to
835f95a
Compare
feb5de5 to
9aef82d
Compare
835f95a to
78ea4e3
Compare
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
26cb91c to
f6d3e67
Compare
724bfcb to
59f8fdb
Compare
111b793 to
c72e911
Compare
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
59f8fdb to
c581647
Compare
c72e911 to
9c2fba5
Compare
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
55eb654 to
0fe30a2
Compare
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
0bcde65 to
0743d2c
Compare
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
…de/Amplitude-TypeScript into AMP3-151283-heartbeat-class
e7204f6 to
c2ee17e
Compare
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
236b5fd to
77e4e73
Compare
…plitude-TypeScript into AMP3-151283-add-heartbeats-to-track-video
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Summary
Checklist
Note
Medium Risk
Changes event upload shape for the /delayed endpoint and replaces Heartbeat.cancel with flush (constructor now requires delayTimeout). Video capture depends on delayed-ingestion success codes to keep running.
Overview
Wires browser video tracking through the shared Heartbeat helper so Video Content Started sends immediately while Video Content Stopped is held as a delayed event, refreshed on playback updates and flushed when playback ends. Failed delayed-service responses stop capture.
Heartbeat now takes a delay timeout, sets
delay_timeouton queued events, addstrackNoDelayandflush(replacingcancel), and returns the track result for the event just added.The destination plugin splits
/delayeduploads intoevents(with timeout),instant_events(nodelay_timeout), and a payloadtimeout.delay_timeoutis added on event options andDelayedPayload.Tests cover the new heartbeat and delayed-payload behavior; the HTML video test page tweaks flush interval and comments out Mux monitor.
Reviewed by Cursor Bugbot for commit 853e9fe. Bugbot is set up for automated code reviews on this repo. Configure here.