Skip to content

feat(session-replay-browser): custom transport hooks for authenticated proxies (SR-4497)#1817

Open
bravecod wants to merge 7 commits into
mainfrom
SR-4497-custom-transport-hooks
Open

feat(session-replay-browser): custom transport hooks for authenticated proxies (SR-4497)#1817
bravecod wants to merge 7 commits into
mainfrom
SR-4497-custom-transport-hooks

Conversation

@bravecod

@bravecod bravecod commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds two optional callbacks to the Session Replay Browser SDK that let customers fully own the outbound HTTP call — for example, attach a JWT Authorization header and route through an authenticating proxy — while the SDK keeps batching, retry/backoff, URL resolution, compression, serialization and error handling.

  • handleSendEvents — replaces the SDK's internal fetch for replay event uploads (POST).
  • handleFetchConfig — replaces the SDK's internal fetch for the remote-config fetch (GET).

Both default to the existing internal fetch, so existing integrations are unchanged. The callback receives a fully-formed request (resolved URL respecting trackServerUrl / configServerUrl, default headers, serialized body) and its only job is to execute it and return a Response. The callback is invoked once per attempt; retry stays in the SDK around it.

Closes SR-4497.

Motivation

A customer is blocked deploying Session Replay for SaaS because their security team requires JWT auth on all outbound SR requests. trackServerUrl / configServerUrl can only redirect the endpoint, not modify the request. These hooks let the customer attach auth and forward through their own proxy, which validates the JWT and forwards to Amplitude. Amplitude never validates the JWT — auth happens entirely in the customer's proxy.

API

type SessionReplayOptions = {
  // ...existing options
  handleSendEvents?: (request: SendEventsRequest) => Promise<Response>;
  handleFetchConfig?: (request: FetchConfigRequest) => Promise<Response>;
};
sessionReplay.init(API_KEY, {
  trackServerUrl: 'https://my-proxy.example.com/sr-events',
  handleSendEvents: async ({ url, method, headers, body, keepalive }) =>
    fetch(url, { method, headers: { ...headers, Authorization: `Bearer ${getJwt()}` }, body, keepalive }),
  handleFetchConfig: async ({ url, method, headers, signal }) =>
    fetch(url, { method, headers: { ...headers, Authorization: `Bearer ${getJwt()}` }, signal }),
});

Notable design points

  • Web Worker mode (useWebWorker) — a function can't cross postMessage, so the worker keeps compressing off the main thread and delegates only the network call back to the main thread via a fetch-request / fetch-response protocol. Retry stays inside the worker, so worker and main-thread retry semantics are identical and the callback is still invoked once per attempt.
  • Page exitnavigator.sendBeacon can't carry custom headers, so when handleSendEvents is set the final unload batch is routed through the callback with keepalive: true instead, preserving auth on exit. With no callback, the beacon path is unchanged.
  • handleFetchConfig is wired through RemoteConfigClient via a new optional, instance-scoped customFetch constructor param in analytics-core. Other RemoteConfigClient consumers pass nothing and are unaffected.
  • Compressed body — when transport compression is active, body is the gzipped Uint8Array and headers already includes Content-Encoding: gzip; the callback forwards both unchanged.
  • Plugin pass-through — added to plugin-session-replay-browser; the segment plugin inherits it via the standalone options type (spread).

Backward compatibility

  • All new options are optional and default to the verbatim existing fetch path.
  • New RemoteConfigClient constructor param is appended and optional.
  • The full existing test suites pass with no assertion changes (the only test edit adds the new optional 5th constructor arg to two RemoteConfigClient call assertions).

Testing

  • Unit — new custom-transport.test.ts (callback substitution, retry contract, page-exit path, worker delegation main-thread side), worker delegation protocol tests, and RemoteConfigClient customFetch tests. Both packages pass at 100% coverage (session-replay-browser: 1029 tests; analytics-core: 799 tests). Plugin suites green.
  • End-to-end — added test-server/session-replay-browser/sr-custom-transport.html and a /api/sr-echo echo endpoint in mock-api.js. Verified in a real browser that both the config GET and event-track POST carry the injected Authorization: Bearer <jwt> header, in both main-thread and useWebWorker: true modes (42/42 outbound requests carried the JWT, 0 missing).

Out of scope (follow-ups)

  • Same hooks in @amplitude/unified, the Node SDK, and other SR transports.
  • A getHeaders() convenience wrapper on top of handleSendEvents.

🤖 Generated with Claude Code


Note

Medium Risk
Changes replay upload, remote-config fetch, and unload/beacon paths when callbacks are set; default behavior is unchanged but mistakes in custom transports or proxy forwarding could drop events or break config.

Overview
Adds optional handleSendEvents and handleFetchConfig callbacks so customers can run replay uploads and remote-config fetches through their own HTTP layer (e.g. JWT + proxy) while the SDK still handles batching, compression, retries, and URL resolution.

@amplitude/session-replay-browser — New SendEventsRequest / FetchConfigRequest types and handlers on options/config; main-thread sends and BeaconTransport interaction beacons use the callback instead of fetch/beacon when set. Page-exit batches use keepalive through the callback so auth survives unload. With useWebWorker, the worker posts fetch-request / fetch-response to the main thread for each network attempt. README documents proxy forwarding and the API-key header gotcha.

@amplitude/analytics-coreRemoteConfigClient accepts optional customFetch; config GET uses it when provided (same retry loop as before). Types exported from the package index.

@amplitude/plugin-session-replay-browser — Forwards handleSendEvents / handleFetchConfig into standalone SR init and documents them on plugin options.

Also adds custom-transport.test.ts, worker delegation tests, /api/sr-echo in the test server, and sr-custom-transport.html for manual verification. Monorepo packages get a prerelease version bump and CDN snippet integrity updates.

Reviewed by Cursor Bugbot for commit 11697df. Bugbot is set up for automated code reviews on this repo. Configure here.

…d proxies (SR-4497)

Add two optional callbacks to the Session Replay Browser SDK so customers can
fully own the outbound HTTP call (e.g. attach a JWT Authorization header and
route through an authenticating proxy) while the SDK keeps batching, retry,
URL resolution, compression and serialization:

- handleSendEvents  — replaces the internal fetch for replay event uploads
- handleFetchConfig — replaces the internal fetch for the remote-config GET

Both default to the existing internal fetch, so existing integrations are
unchanged. The resolved URL (respecting trackServerUrl / configServerUrl) is
passed into the callback. The callback is invoked once per attempt; retry stays
in the SDK around it.

- Web Worker mode: when useWebWorker is set, the worker keeps compressing off
  the main thread and delegates only the network call back to the main thread
  (functions can't cross postMessage) via a fetch-request/fetch-response
  protocol, so retry semantics stay identical to the main-thread path.
- Page exit: navigator.sendBeacon can't carry custom headers, so when
  handleSendEvents is set the final batch is routed through the callback with
  keepalive: true instead, preserving auth on unload.
- handleFetchConfig is wired through RemoteConfigClient via a new optional,
  instance-scoped customFetch constructor param (analytics-core); other
  RemoteConfigClient consumers are unaffected.
- Pass-through added to plugin-session-replay-browser; segment plugin inherits
  it via the standalone options type.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@bravecod bravecod requested a review from a team as a code owner June 8, 2026 22:22
@linear-code

linear-code Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

SR-4497

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for both issues found in the latest run.

  • ✅ Fixed: Delegated fetch lacks rejection handler
    • Added a logger-backed catch handler to the fire-and-forget delegated fetch path so postMessage failures no longer become unhandled rejections.
  • ✅ Fixed: Exit transport swallows errors silently
    • Changed the custom page-exit transport catch paths to warn through loggerProvider instead of silently ignoring failures.

Create PR

Or push these changes by commenting:

@cursor push 78c93e34b3
Preview (78c93e34b3)
diff --git a/packages/session-replay-browser/src/track-destination.ts b/packages/session-replay-browser/src/track-destination.ts
--- a/packages/session-replay-browser/src/track-destination.ts
+++ b/packages/session-replay-browser/src/track-destination.ts
@@ -174,7 +174,9 @@
           } else if (msg.type === 'fetch-request') {
             // Worker is delegating one network attempt to the custom transport (which lives on
             // the main thread). Run it and post the result back into the worker's retry loop.
-            void this.handleDelegatedFetch(worker, msg);
+            void this.handleDelegatedFetch(worker, msg).catch((e) => {
+              loggerProvider.warn('Failed to handle delegated session replay fetch:', e);
+            });
           }
         };
         this.worker = worker;
@@ -255,16 +257,17 @@
         Accept: '*/*',
         Authorization: `Bearer ${apiKey}`,
       };
+      const warnExitSendFailure = (e: unknown) => {
+        this.loggerProvider.warn('Failed to send session replay events on page exit:', e);
+      };
       try {
         // Fire-and-forget: we cannot await during unload. The request is well under 64 KB
         // (trimmed above) and keepalive: true is requested so it survives page teardown.
         void this.handleSendEvents({ url: exitUrl, method: 'POST', headers, body: payload, keepalive: true }).catch(
-          () => {
-            // best effort on exit
-          },
+          warnExitSendFailure,
         );
-      } catch {
-        // best effort on exit
+      } catch (e) {
+        warnExitSendFailure(e);
       }
       return;
     }

diff --git a/packages/session-replay-browser/test/track-destination.test.ts b/packages/session-replay-browser/test/track-destination.test.ts
--- a/packages/session-replay-browser/test/track-destination.test.ts
+++ b/packages/session-replay-browser/test/track-destination.test.ts
@@ -1640,6 +1640,37 @@
       expect((trackDestination as any).pendingWorkerRequests.size).toBe(0);
     });
 
+    test('worker delegated fetch logs rejected fire-and-forget handler', async () => {
+      const trackDestination = new SessionReplayTrackDestination({
+        loggerProvider: mockLoggerProvider,
+        workerScript: 'self.onmessage = () => {}',
+      });
+      const error = new Error('worker terminated');
+      const delegatedFetch = jest
+        .spyOn(
+          trackDestination as unknown as { handleDelegatedFetch: (worker: Worker, msg: unknown) => Promise<void> },
+          'handleDelegatedFetch',
+        )
+        .mockRejectedValueOnce(error);
+
+      mockWorker.onmessage?.({
+        data: {
+          type: 'fetch-request',
+          requestId: 'req-1',
+          url: 'https://example.com',
+          method: 'POST',
+          headers: {},
+          body: 'payload',
+          keepalive: false,
+        },
+      } as MessageEvent);
+      await Promise.resolve();
+
+      expect(delegatedFetch).toHaveBeenCalled();
+      // eslint-disable-next-line @typescript-eslint/unbound-method
+      expect(mockLoggerProvider.warn).toHaveBeenCalledWith('Failed to handle delegated session replay fetch:', error);
+    });
+
     test('send routes to worker when worker is present', async () => {
       const trackDestination = new SessionReplayTrackDestination({
         loggerProvider: mockLoggerProvider,
@@ -1750,5 +1781,26 @@
 
       expect(mockSendBeacon).not.toHaveBeenCalled();
     });
+
+    test('custom transport page-exit failures are logged', async () => {
+      const error = new Error('exit transport failed');
+      const handleSendEvents = jest.fn().mockRejectedValue(error);
+      const trackDestination = new SessionReplayTrackDestination({
+        loggerProvider: mockLoggerProvider,
+        handleSendEvents,
+      });
+
+      trackDestination.sendBeacon({ ...beaconArgs, events: ['e1'] });
+      await Promise.resolve();
+
+      expect(handleSendEvents).toHaveBeenCalledWith(
+        expect.objectContaining({
+          keepalive: true,
+          headers: expect.objectContaining({ Authorization: 'Bearer key-abc' }),
+        }),
+      );
+      // eslint-disable-next-line @typescript-eslint/unbound-method
+      expect(mockLoggerProvider.warn).toHaveBeenCalledWith('Failed to send session replay events on page exit:', error);
+    });
   });
 });

You can send follow-ups to the cloud agent here.

Comment thread packages/session-replay-browser/src/track-destination.ts Outdated
Comment thread packages/session-replay-browser/src/track-destination.ts
@github-actions

github-actions Bot commented Jun 8, 2026

Copy link
Copy Markdown

Session Replay Browser E2E Results

passed  140 passed

Details

stats  140 tests across 13 suites
duration  3 minutes, 17 seconds
commit  d77ed33

@github-actions

github-actions Bot commented Jun 8, 2026

Copy link
Copy Markdown

size-limit report 📦

Path Size
packages/analytics-browser/lib/scripts/amplitude-min.js.gz 58.48 KB (+0.14% 🔺)
packages/session-replay-browser/lib/scripts/session-replay-browser-min.js.gz 132.75 KB (+0.56% 🔺)
packages/unified/lib/scripts/amplitude-min.umd.js.gz 210.47 KB (+0.49% 🔺)
@amplitude/element-selector (gzipped esm) 1.75 KB (0%)

Comment thread packages/session-replay-browser/src/track-destination.ts
Comment thread packages/session-replay-browser/src/track-destination.ts Outdated
Comment thread packages/session-replay-browser/src/track-destination.ts Outdated
Comment thread packages/session-replay-browser/src/worker/track-destination.ts
Comment thread packages/session-replay-browser/src/worker/track-destination.ts Outdated
Comment thread packages/session-replay-browser/src/track-destination.ts
Comment thread packages/session-replay-browser/src/config/types.ts Outdated
Comment thread packages/plugin-session-replay-browser/src/typings/session-replay.ts Outdated
Comment thread packages/session-replay-browser/test/custom-transport.test.ts
Comment thread packages/session-replay-browser/src/track-destination.ts Outdated

/**
* Custom transport for replay event uploads. Invoked by the SDK in place of its internal
* `fetch`. Must return a `Response` (or Response-like with `ok`, `status`, `text()`). Called

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud - Does needing to return a Response tightly couple us to fetch()? What if they want to use axios or another library?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Today the contract asks for a Response (or a Response-like object), but the SDK only ever reads three things from it: status, headers.get("X-Session-Replay-Event-Skipped") on a 2xx, and text() on a 413. So it is not strictly tied to fetch — a customer using axios (or any client) can return a thin adapter, e.g. { status: r.status, headers: { get: (n) => r.headers[n] ?? null }, text: async () => JSON.stringify(r.data) }. The README documents the "Response-like with ok/status/text()" shape for exactly this reason.

That said, requiring callers to shim does couple the ergonomics to the WHATWG Response. If we want to fully decouple, we could accept a structured result instead (e.g. { status, skipHeader?, body? }) and reconstruct internally. That is a broader API change, so I left it out of this PR — happy to track it as a follow-up if you think the axios/other-client case is common enough to warrant a first-class shape. Leaving this thread open for your call.

bravecod and others added 5 commits June 8, 2026 20:57
…nsport hooks (SR-4497)

- Add .catch + logger.warn to the fire-and-forget delegated-fetch call so a
  failed postMessage-back can't become an unhandled rejection (Bugbot).
- Surface exit-path custom-transport failures via loggerProvider.warn instead
  of swallowing them silently (Bugbot).
- Narrow the reconstructed worker Response headers.get to only answer
  EVENT_SKIPPED_HEADER (self-documenting; other branch marked unreachable).
- Document the defensive built-in-fetch fallback in handleDelegatedFetch (only
  reachable when a transport is configured).
- Comment that the worker's module-level delegation state is per-Worker scope,
  so requestIds can't collide across SPA shutdown/re-init.
- Tighten SendEventsRequest.body (and the worker request types) from BodyInit
  to string | Uint8Array to match the postMessage structured-clone constraint.
- Import SessionReplaySendEventsHandler / SessionReplayFetchConfigHandler in the
  plugin typings instead of indexed access, so a rename surfaces as an error.
- Add a test asserting handleFetchConfig wires through
  createSessionReplayJoinedConfigGenerator to RemoteConfigClient, plus tests for
  the new exit-path and delegated-fetch error-logging paths (100% coverage).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…send paths (SR-4497)

Per review: add loggerProvider.debug on the page-exit custom-transport route and
on the worker-delegated fetch (status + failure), to aid customer-side
troubleshooting of proxy/auth setups.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… custom transport (SR-4497)

Documents what a customer's authenticating proxy must do: which Amplitude
endpoints to forward to (US/EU), preserve path/query + Content-Encoding + body,
and the API-key header gotcha (recommend sending the JWT in a separate header so
the SDK's Authorization/api-key survives forwarding).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…unified (SR-4497)

The callbacks are identical across integrations; only where you pass them in
differs. Added plugin (sessionReplayPlugin) and unified (initAll) examples next
to the standalone one so customers on any path know how to wire it up.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
 - @amplitude/analytics-browser@2.43.1-SR-4497-custom-transport-hooks.0
 - @amplitude/analytics-client-common@2.4.50-SR-4497-custom-transport-hooks.0
 - @amplitude/analytics-core@2.50.0-SR-4497-custom-transport-hooks.0
 - @amplitude/analytics-node@1.5.60-SR-4497-custom-transport-hooks.0
 - @amplitude/analytics-react-native@1.5.57-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-autocapture-browser@1.27.4-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-custom-enrichment-browser@0.1.11-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-event-property-attribution-browser@0.2.3-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-experiment-browser@1.0.0-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-network-capture-browser@1.10.3-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-page-url-enrichment-browser@0.7.13-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-page-view-tracking-browser@2.11.3-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-session-replay-browser@1.32.0-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-web-attribution-browser@2.2.13-SR-4497-custom-transport-hooks.0
 - @amplitude/plugin-web-vitals-browser@1.1.35-SR-4497-custom-transport-hooks.0
 - @amplitude/segment-session-replay-plugin@0.0.27-SR-4497-custom-transport-hooks.0
 - @amplitude/session-replay-browser@1.45.0-SR-4497-custom-transport-hooks.0
 - @amplitude/unified@1.1.13-SR-4497-custom-transport-hooks.0

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Config fetch requires ok property
    • Remote config fetch success detection now falls back to 2xx status when a custom transport response omits ok, with regression test coverage.

Create PR

Or push these changes by commenting:

@cursor push fda17046a4
Preview (fda17046a4)
diff --git a/packages/analytics-core/src/remote-config/remote-config.ts b/packages/analytics-core/src/remote-config/remote-config.ts
--- a/packages/analytics-core/src/remote-config/remote-config.ts
+++ b/packages/analytics-core/src/remote-config/remote-config.ts
@@ -386,8 +386,10 @@
               signal: abortController.signal,
             });
 
+        const ok = (res as { ok?: boolean }).ok ?? (res.status >= 200 && res.status < 300);
+
         // Handle unsuccessful fetch
-        if (!res.ok) {
+        if (!ok) {
           const body = await res.text();
           this.logger.debug(`Remote config client fetch with retry time ${retries} failed with ${res.status}: ${body}`);
 

diff --git a/packages/analytics-core/test/remote-config/remote-config.test.ts b/packages/analytics-core/test/remote-config/remote-config.test.ts
--- a/packages/analytics-core/test/remote-config/remote-config.test.ts
+++ b/packages/analytics-core/test/remote-config/remote-config.test.ts
@@ -1143,6 +1143,19 @@
       expect(result.remoteConfig).toEqual({ key: 'value' });
     });
 
+    test('treats custom transport responses without ok as successful for 2xx status', async () => {
+      global.fetch = jest.fn();
+      const customFetch = jest.fn(() =>
+        Promise.resolve({ status: 200, json: () => Promise.resolve({ key: 'value' }) } as Response),
+      );
+      const customClient = new RemoteConfigClient(mockApiKey, mockLogger, 'US', undefined, customFetch);
+
+      const result = await customClient.fetch();
+
+      expect(customFetch).toHaveBeenCalledTimes(1);
+      expect(result.remoteConfig).toEqual({ key: 'value' });
+    });
+
     test('retry stays in the client around the custom transport (5xx then 200)', async () => {
       global.fetch = jest.fn();
       const customFetch = jest

You can send follow-ups to the cloud agent here.

method: 'GET',
headers,
signal: abortController.signal,
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config fetch requires ok property

Medium Severity

RemoteConfigClient treats custom handleFetchConfig responses as failures when res.ok is missing, even if status is 2xx. Event uploads use status only, so the same Response-like adapter can succeed for handleSendEvents while remote config never applies and capture may stay on defaults.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit d77ed33. Configure here.

…handleSendEvents (SR-4497)

The scroll/interaction BeaconTransport sent via navigator.sendBeacon/XHR with
api_key in the URL and no custom auth header, bypassing handleSendEvents — so
those events left unauthenticated and an authenticating proxy would reject them.

When handleSendEvents is set, route the interaction beacon through it as a
keepalive fetch (survives page unload, can carry headers): api_key moves to the
Authorization header, matching the replay event/exit paths. Falls back to the
native beacon/XHR when no custom transport is configured.

This closes the last unauthenticated outbound Session Replay path, so the custom
transport now covers replay uploads, the replay page-exit beacon, and
interaction/scroll beacons.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Worker crash duplicates delegated sends
    • Delegated worker fetches now carry the send id so the main thread can complete successful custom-transport sends after a worker crash while leaving failed sends stored for retry.

Create PR

Or push these changes by commenting:

@cursor push c3f762e50f
Preview (c3f762e50f)
diff --git a/packages/session-replay-browser/src/track-destination.ts b/packages/session-replay-browser/src/track-destination.ts
--- a/packages/session-replay-browser/src/track-destination.ts
+++ b/packages/session-replay-browser/src/track-destination.ts
@@ -52,6 +52,7 @@
 // 'fetch-response' message posted back into the worker.
 interface WorkerFetchRequestMessage {
   type: 'fetch-request';
+  id: string;
   requestId: string;
   url: string;
   method: 'POST';
@@ -66,6 +67,14 @@
   | WorkerPayloadTooLargeMessage
   | WorkerFetchRequestMessage;
 
+interface PendingWorkerRequest {
+  context: SessionReplayDestinationContext;
+  resolve: () => void;
+  delegatedFetchInFlight?: boolean;
+  delegatedFetchStatus?: number;
+  delegatedFetchSkipCode: string | null;
+}
+
 export type PayloadBatcher = ({ version, events }: { version: number; events: string[] }) => {
   version: number;
   events: unknown[];
@@ -89,7 +98,7 @@
   queue: SessionReplayDestinationContext[] = [];
   private worker?: Worker;
   private sendIdCounter = 0;
-  private pendingWorkerRequests = new Map<string, { context: SessionReplayDestinationContext; resolve: () => void }>();
+  private pendingWorkerRequests = new Map<string, PendingWorkerRequest>();
   // Server back-pressure state, fed by the X-Session-Replay-Event-Skipped header on 200s.
   // The server uses this header (instead of 4xx) to signal a deliberate no-retry drop so SDKs
   // don't retry-storm. We honor it here by slowing or stopping our flush schedule.
@@ -141,13 +150,17 @@
           worker.terminate();
           this.worker = undefined;
           // Resolve pending promises so flush() doesn't hang. Do NOT call completeRequest
-          // here — the events were never delivered, so onComplete must not fire and the
-          // IDB/memory store entries must remain intact for recovery by sendStoredEvents.
-          for (const [, pending] of this.pendingWorkerRequests) {
+          // for the worker's own fetch path — the events were never delivered, so onComplete
+          // must not fire and the IDB/memory store entries must remain intact for recovery by
+          // sendStoredEvents. Delegated custom-transport fetches can outlive the worker; when
+          // one already succeeded, finish cleanup on the main thread to avoid resending it.
+          for (const [id, pending] of this.pendingWorkerRequests) {
             loggerProvider.warn(`Session replay event send failed due to worker crash: ${e.message}`);
-            pending.resolve();
+            if (pending.delegatedFetchInFlight) {
+              continue;
+            }
+            this.resolveCrashedWorkerRequest(id, pending);
           }
-          this.pendingWorkerRequests.clear();
         };
         worker.onmessage = (e: MessageEvent<WorkerMessage>) => {
           const msg = e.data;
@@ -178,6 +191,12 @@
             // Guard the fire-and-forget call: if posting the result back fails (e.g. the worker
             // was terminated mid-flight), surface it to the logger instead of leaving an
             // unhandled rejection.
+            const pending = this.pendingWorkerRequests.get(msg.id);
+            if (pending) {
+              pending.delegatedFetchInFlight = true;
+              pending.delegatedFetchStatus = undefined;
+              pending.delegatedFetchSkipCode = null;
+            }
             void this.handleDelegatedFetch(worker, msg).catch((e) => {
               loggerProvider.warn('Failed to handle delegated session replay fetch:', e);
             });
@@ -499,7 +518,7 @@
   ): Promise<void> {
     const id = `${++this.sendIdCounter}`;
     return new Promise<void>((resolve) => {
-      this.pendingWorkerRequests.set(id, { context, resolve });
+      this.pendingWorkerRequests.set(id, { context, resolve, delegatedFetchSkipCode: null });
       worker.postMessage({
         type: 'send',
         id,
@@ -534,7 +553,8 @@
   // 413 (for WAF detection). A thrown/rejected transport is reported as an error so the worker
   // surfaces it the same way a thrown fetch would (no retry) — matching the main-thread path.
   private async handleDelegatedFetch(worker: Worker, msg: WorkerFetchRequestMessage): Promise<void> {
-    const { requestId, url, method, headers, body, keepalive } = msg;
+    const { id, requestId, url, method, headers, body, keepalive } = msg;
+    const pending = this.pendingWorkerRequests.get(id);
     try {
       // In practice this.handleSendEvents is always set here: the worker only emits
       // 'fetch-request' when useCustomTransport (= !!this.handleSendEvents) was true. The
@@ -556,10 +576,28 @@
           // best effort
         }
       }
+      if (pending) {
+        pending.delegatedFetchInFlight = false;
+        pending.delegatedFetchStatus = status;
+        pending.delegatedFetchSkipCode = skipHeader;
+      }
       this.loggerProvider.debug(`Delegated session replay fetch (request ${requestId}) returned status ${status}.`);
+      if (pending && this.worker !== worker) {
+        this.resolveCrashedWorkerRequest(id, pending);
+        return;
+      }
       worker.postMessage({ type: 'fetch-response', requestId, status, skipHeader, body: responseBody });
     } catch (e) {
       this.loggerProvider.debug(`Delegated session replay fetch (request ${requestId}) failed:`, e);
+      if (pending) {
+        pending.delegatedFetchInFlight = false;
+        pending.delegatedFetchStatus = 0;
+        pending.delegatedFetchSkipCode = null;
+      }
+      if (pending && this.worker !== worker) {
+        this.resolveCrashedWorkerRequest(id, pending);
+        return;
+      }
       worker.postMessage({
         type: 'fetch-response',
         requestId,
@@ -571,6 +609,19 @@
     }
   }
 
+  private resolveCrashedWorkerRequest(id: string, pending: PendingWorkerRequest): void {
+    if (
+      pending.delegatedFetchStatus !== undefined &&
+      pending.delegatedFetchStatus >= 200 &&
+      pending.delegatedFetchStatus < 300
+    ) {
+      this.applyServerDirective(pending.context.sessionId, pending.delegatedFetchSkipCode);
+      this.completeRequest({ context: pending.context });
+    }
+    pending.resolve();
+    this.pendingWorkerRequests.delete(id);
+  }
+
   private async sendOnMainThread(
     apiKey: string,
     deviceId: string,

diff --git a/packages/session-replay-browser/src/worker/track-destination.ts b/packages/session-replay-browser/src/worker/track-destination.ts
--- a/packages/session-replay-browser/src/worker/track-destination.ts
+++ b/packages/session-replay-browser/src/worker/track-destination.ts
@@ -83,35 +83,38 @@
 // Posts a fetch-request to the main thread and resolves once the matching fetch-response
 // arrives. Rejects if the main-thread transport errored, so doFetch's catch surfaces it the
 // same way a thrown fetch would (no retry) — matching the non-worker custom-transport path.
-const delegateRequest: DoRequest = (url, options) => {
-  const requestId = `${++delegationCounter}`;
-  return new Promise<ResponseLike>((resolve, reject) => {
-    pendingDelegations.set(requestId, (resp) => {
-      if (resp.error) {
-        reject(new Error(resp.body));
-        return;
-      }
-      // Reconstruct just the slice of Response that doFetch consumes. get() is only ever
-      // queried for EVENT_SKIPPED_HEADER today; the name check keeps it self-documenting and
-      // safe if a future reader adds another header lookup (the other branch is unreachable now).
-      resolve({
-        status: resp.status,
-        headers: {
-          get: (name: string) => (name === EVENT_SKIPPED_HEADER ? resp.skipHeader : /* istanbul ignore next */ null),
-        },
-        text: () => Promise.resolve(resp.body),
+const createDelegateRequest = (id: string): DoRequest => {
+  return (url, options) => {
+    const requestId = `${++delegationCounter}`;
+    return new Promise<ResponseLike>((resolve, reject) => {
+      pendingDelegations.set(requestId, (resp) => {
+        if (resp.error) {
+          reject(new Error(resp.body));
+          return;
+        }
+        // Reconstruct just the slice of Response that doFetch consumes. get() is only ever
+        // queried for EVENT_SKIPPED_HEADER today; the name check keeps it self-documenting and
+        // safe if a future reader adds another header lookup (the other branch is unreachable now).
+        resolve({
+          status: resp.status,
+          headers: {
+            get: (name: string) => (name === EVENT_SKIPPED_HEADER ? resp.skipHeader : /* istanbul ignore next */ null),
+          },
+          text: () => Promise.resolve(resp.body),
+        });
       });
+      postMessage({
+        type: 'fetch-request',
+        id,
+        requestId,
+        url,
+        method: 'POST',
+        headers: options.headers,
+        body: options.body,
+        keepalive: options.keepalive,
+      });
     });
-    postMessage({
-      type: 'fetch-request',
-      requestId,
-      url,
-      method: 'POST',
-      headers: options.headers,
-      body: options.body,
-      keepalive: options.keepalive,
-    });
-  });
+  };
 };
 
 async function doFetch(
@@ -259,7 +262,7 @@
     const payloadJson = JSON.stringify(payload);
     // Only enter the delegation protocol when a custom transport is configured; otherwise the
     // worker keeps doing its own fetch with no round-trip to the main thread (unchanged path).
-    const doRequest = useCustomTransport ? delegateRequest : defaultRequest;
+    const doRequest = useCustomTransport ? createDelegateRequest(id) : defaultRequest;
     await sendWithRetry(id, payloadJson, context, useRetry, doRequest);
   }
 };

diff --git a/packages/session-replay-browser/test/custom-transport.test.ts b/packages/session-replay-browser/test/custom-transport.test.ts
--- a/packages/session-replay-browser/test/custom-transport.test.ts
+++ b/packages/session-replay-browser/test/custom-transport.test.ts
@@ -245,6 +245,7 @@
     const EVENT_SKIPPED_HEADER = 'X-Session-Replay-Event-Skipped';
     const makeMsg = (over: Record<string, unknown> = {}) => ({
       type: 'fetch-request',
+      id: '1',
       requestId: 'r1',
       url: 'https://api-sr.amplitude.com/sessions/v2/track',
       method: 'POST',
@@ -374,6 +375,156 @@
       expect(spy).toHaveBeenCalledTimes(1);
     });
 
+    test('cleans up a delegated send when the worker crashes before reporting completion', async () => {
+      const mockWorker = {
+        postMessage: jest.fn(),
+        terminate: jest.fn(),
+        onerror: null as ((e: ErrorEvent) => void) | null,
+        onmessage: null as ((e: MessageEvent) => void) | null,
+      };
+      global.Worker = jest.fn(() => mockWorker) as unknown as typeof Worker;
+      global.URL.createObjectURL = jest.fn().mockReturnValue('blob:mock');
+
+      let resolveTransport!: (response: Response) => void;
+      const handleSendEvents = jest.fn(
+        () =>
+          new Promise<Response>((resolve) => {
+            resolveTransport = resolve;
+          }),
+      );
+      const onComplete = jest.fn();
+      const trackDestination = new SessionReplayTrackDestination({
+        loggerProvider: mockLoggerProvider,
+        workerScript: 'self.onmessage = () => {}',
+        handleSendEvents,
+      });
+
+      const sendPromise = trackDestination.send({ ...baseContext(), onComplete }, true);
+      const sendMessage = mockWorker.postMessage.mock.calls[0][0] as { id: string };
+      mockWorker.onmessage?.({
+        data: {
+          type: 'fetch-request',
+          id: sendMessage.id,
+          requestId: 'r9',
+          url: 'u',
+          method: 'POST',
+          headers: {},
+          body: '{}',
+          keepalive: true,
+        },
+      } as MessageEvent);
+
+      mockWorker.onerror?.({
+        preventDefault: jest.fn(),
+        message: 'worker crashed',
+        filename: 'blob:mock',
+        lineno: 1,
+      } as unknown as ErrorEvent);
+      expect(onComplete).not.toHaveBeenCalled();
+
+      resolveTransport({ status: 200, headers: { get: () => null } } as unknown as Response);
+      await sendPromise;
+
+      expect(onComplete).toHaveBeenCalledTimes(1);
+      const pendingWorkerRequests = (trackDestination as unknown as { pendingWorkerRequests: Map<string, unknown> })
+        .pendingWorkerRequests;
+      expect(pendingWorkerRequests.size).toBe(0);
+    });
+
+    test('posts a delegated transport error back to a live worker', async () => {
+      const mockWorker = {
+        postMessage: jest.fn(),
+        terminate: jest.fn(),
+        onerror: null as ((e: ErrorEvent) => void) | null,
+        onmessage: null as ((e: MessageEvent) => void) | null,
+      };
+      global.Worker = jest.fn(() => mockWorker) as unknown as typeof Worker;
+      global.URL.createObjectURL = jest.fn().mockReturnValue('blob:mock');
+
+      const trackDestination = new SessionReplayTrackDestination({
+        loggerProvider: mockLoggerProvider,
+        workerScript: 'self.onmessage = () => {}',
+        handleSendEvents: jest.fn(() => Promise.reject(new Error('boom'))),
+      });
+
+      const sendPromise = trackDestination.send(baseContext(), true);
+      const sendMessage = mockWorker.postMessage.mock.calls[0][0] as { id: string };
+      mockWorker.onmessage?.({
+        data: {
+          type: 'fetch-request',
+          id: sendMessage.id,
+          requestId: 'r10',
+          url: 'u',
+          method: 'POST',
+          headers: {},
+          body: '{}',
+          keepalive: true,
+        },
+      } as MessageEvent);
+      await Promise.resolve();
+
+      expect(mockWorker.postMessage).toHaveBeenCalledWith(
+        expect.objectContaining({ type: 'fetch-response', requestId: 'r10', status: 0, error: true }),
+      );
+
+      mockWorker.onmessage?.({ data: { type: 'complete', id: sendMessage.id } } as MessageEvent);
+      await sendPromise;
+    });
+
+    test('keeps a delegated send stored when the worker crashes before a transport error', async () => {
+      const mockWorker = {
+        postMessage: jest.fn(),
+        terminate: jest.fn(),
+        onerror: null as ((e: ErrorEvent) => void) | null,
+        onmessage: null as ((e: MessageEvent) => void) | null,
+      };
+      global.Worker = jest.fn(() => mockWorker) as unknown as typeof Worker;
+      global.URL.createObjectURL = jest.fn().mockReturnValue('blob:mock');
+
+      let rejectTransport!: (error: Error) => void;
+      const handleSendEvents = jest.fn(
+        () =>
+          new Promise<Response>((_resolve, reject) => {
+            rejectTransport = reject;
+          }),
+      );
+      const onComplete = jest.fn();
+      const trackDestination = new SessionReplayTrackDestination({
+        loggerProvider: mockLoggerProvider,
+        workerScript: 'self.onmessage = () => {}',
+        handleSendEvents,
+      });
+
+      const sendPromise = trackDestination.send({ ...baseContext(), onComplete }, true);
+      const sendMessage = mockWorker.postMessage.mock.calls[0][0] as { id: string };
+      mockWorker.onmessage?.({
+        data: {
+          type: 'fetch-request',
+          id: sendMessage.id,
+          requestId: 'r11',
+          url: 'u',
+          method: 'POST',
+          headers: {},
+          body: '{}',
+          keepalive: true,
+        },
+      } as MessageEvent);
+      mockWorker.onerror?.({
+        preventDefault: jest.fn(),
+        message: 'worker crashed',
+        filename: 'blob:mock',
+        lineno: 1,
+      } as unknown as ErrorEvent);
+
+      rejectTransport(new Error('boom'));
+      await sendPromise;
+
+      expect(onComplete).not.toHaveBeenCalled();
+      const pendingWorkerRequests = (trackDestination as unknown as { pendingWorkerRequests: Map<string, unknown> })
+        .pendingWorkerRequests;
+      expect(pendingWorkerRequests.size).toBe(0);
+    });
+
     test('worker.onmessage logs (does not leave an unhandled rejection) when handleDelegatedFetch rejects', async () => {
       const mockWorker = {
         postMessage: jest.fn(),

diff --git a/packages/session-replay-browser/test/worker/track-destination.test.ts b/packages/session-replay-browser/test/worker/track-destination.test.ts
--- a/packages/session-replay-browser/test/worker/track-destination.test.ts
+++ b/packages/session-replay-browser/test/worker/track-destination.test.ts
@@ -409,7 +409,14 @@
     // No fake timers in this suite, so a real macrotask reliably flushes the microtasks that
     // run between the worker receiving 'send' and posting its 'fetch-request'.
     const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
-    type Msg = { type: string; requestId?: string; url?: string; method?: string; headers?: Record<string, string> };
+    type Msg = {
+      type: string;
+      id?: string;
+      requestId?: string;
+      url?: string;
+      method?: string;
+      headers?: Record<string, string>;
+    };
     const postedOfType = (type: string): Msg[] =>
       mockPostMessage.mock.calls.map((c) => c[0] as Msg).filter((m) => m.type === type);
 
@@ -428,6 +435,7 @@
       expect(mockFetch).not.toHaveBeenCalled();
       const [request] = postedOfType('fetch-request');
       expect(request).toBeDefined();
+      expect(request.id).toBe('d1');
       expect(request.url).toContain('device_id=device-123');
       expect(request.method).toBe('POST');
       expect((request.headers as Record<string, string>).Authorization).toBe('Bearer test-api-key');

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit 11697df. Configure here.

this.payloadBatcher = payloadBatcher ? payloadBatcher : (payload) => payload;
this.trackServerUrl = trackServerUrl;
this.enableTransportCompression = enableTransportCompression ?? true;
this.handleSendEvents = handleSendEvents;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worker crash duplicates delegated sends

Medium Severity

With useWebWorker and handleSendEvents, the worker can post a fetch-request and then crash. worker.onerror resolves pending sends without calling completeRequest, assuming events were never delivered. The main thread may still finish handleDelegatedFetch and upload the batch, leaving store cleanup undone so the same events can be sent again.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 11697df. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants