Skip to content
Open
5 changes: 5 additions & 0 deletions .changeset/add-onerror-retry-backoff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/client': patch
---

Add exponential backoff to onError-driven retries to prevent tight loops on persistent 4xx errors (e.g. expired auth tokens returning 403). The backoff uses jitter with a 100ms base and 30s cap, and is abort-aware so stream teardown remains responsive.
37 changes: 37 additions & 0 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ export class ShapeStream<T extends Row<unknown> = Row>
#fastLoopBackoffMaxMs = 5_000
#fastLoopConsecutiveCount = 0
#fastLoopMaxCount = 5
// onError retry backoff: prevent tight loops when onError always returns {}
#onErrorRetryCount = 0
#onErrorBackoffBaseMs = 100
#onErrorBackoffMaxMs = 30_000
#pendingRequestShapeCacheBuster?: string
#maxSnapshotRetries = 5

Expand Down Expand Up @@ -770,6 +774,39 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#fastLoopConsecutiveCount = 0
this.#recentRequestEntries = []

// Apply exponential backoff with jitter to prevent tight retry loops
// (e.g., when onError always returns {} on persistent 4xx errors)
const retryCount = ++this.#onErrorRetryCount
const maxDelay = Math.min(
this.#onErrorBackoffMaxMs,
this.#onErrorBackoffBaseMs * Math.pow(2, retryCount - 1)
)
const delayMs = Math.floor(Math.random() * maxDelay)
if (retryCount > 1) {
console.warn(
`[Electric] onError retry backoff: waiting ${Math.round(delayMs / 1000)}s before retry ` +
`(attempt ${retryCount}). ` +
`Previous error: ${(err as Error)?.message ?? err}`
)
}
await new Promise<void>((resolve) => {
const signal = this.options.signal
const onAbort = () => {
clearTimeout(timer)
resolve()
}
const timer = setTimeout(() => {
signal?.removeEventListener(`abort`, onAbort)
resolve()
}, delayMs)
signal?.addEventListener(`abort`, onAbort, { once: true })
})

if (this.options.signal?.aborted) {
this.#teardown()
return
}

// Restart from current offset
this.#started = false
await this.#start()
Expand Down
205 changes: 205 additions & 0 deletions packages/typescript-client/test/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -706,4 +706,209 @@ describe(`ShapeStream`, () => {

warnSpy.mockRestore()
})

it(`should apply exponential backoff on onError retries for persistent 4xx errors`, async () => {
// When onError always returns {} on a persistent 4xx error, the retry
// delay should increase exponentially rather than retrying immediately.
const requestTimestamps: number[] = []
const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {})

const fetchMock = (
..._args: Parameters<typeof fetch>
): Promise<Response> => {
requestTimestamps.push(Date.now())
return Promise.resolve(new Response(`Forbidden`, { status: 403 }))
}

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
signal: aborter.signal,
fetchClient: fetchMock,
subscribe: false,
onError: () => ({}),
})

stream.subscribe(() => {})

// Wait for enough retries so we can compare early vs late gaps
await vi.waitFor(
() => {
expect(requestTimestamps.length).toBeGreaterThanOrEqual(6)
},
{ timeout: 15_000 }
)

// Verify gaps between requests grow over time (exponential backoff).
// Compare the sum of the first half vs the second half of gaps to be
// robust against jitter on any individual gap.
const gaps = requestTimestamps
.slice(1)
.map((t, i) => t - requestTimestamps[i]!)

const mid = Math.floor(gaps.length / 2)
const earlySum = gaps.slice(0, mid).reduce((a, b) => a + b, 0)
const lateSum = gaps.slice(mid).reduce((a, b) => a + b, 0)
expect(lateSum).toBeGreaterThan(earlySum)

warnSpy.mockRestore()
})

it(`should tear down immediately when aborted during onError backoff`, async () => {
// When the stream is in the middle of a backoff delay and the user
// aborts, it should tear down promptly rather than waiting for the timer.
let requestCount = 0
const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {})

const fetchMock = (
..._args: Parameters<typeof fetch>
): Promise<Response> => {
requestCount++
return Promise.resolve(new Response(`Forbidden`, { status: 403 }))
}

const localAborter = new AbortController()
const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
signal: localAborter.signal,
fetchClient: fetchMock,
subscribe: false,
onError: () => ({}),
})

stream.subscribe(() => {})

// Wait for at least one retry so we know backoff is active
await vi.waitFor(
() => {
expect(requestCount).toBeGreaterThanOrEqual(2)
},
{ timeout: 5_000 }
)

const countBeforeAbort = requestCount

// Abort the stream
localAborter.abort()

// Give a tick for teardown
await resolveInMacrotask(undefined)

// No more requests should have been made after abort
expect(requestCount).toBe(countBeforeAbort)

warnSpy.mockRestore()
})

it(`should warn on 2nd+ onError retry attempt`, async () => {
// The stream should log a console.warn starting from the 2nd retry
// to help developers diagnose persistent error loops.
const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {})
let requestCount = 0

const fetchMock = (
..._args: Parameters<typeof fetch>
): Promise<Response> => {
requestCount++
return Promise.resolve(new Response(`Forbidden`, { status: 403 }))
}

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
signal: aborter.signal,
fetchClient: fetchMock,
subscribe: false,
onError: () => ({}),
})

stream.subscribe(() => {})

// Wait for enough retries to trigger the warning
await vi.waitFor(
() => {
expect(requestCount).toBeGreaterThanOrEqual(3)
},
{ timeout: 15_000 }
)

expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining(`onError retry backoff`)
)

warnSpy.mockRestore()
})

it(`should clean up abort listeners after onError backoff timer expires`, async () => {
// When the backoff timer expires normally (not via abort), the abort
// listener must be removed to prevent closure accumulation on
// long-lived streams with many recoverable errors.
let requestCount = 0
const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {})
const addSpy = vi.fn()
const removeSpy = vi.fn()

const localAborter = new AbortController()
const originalAdd = localAborter.signal.addEventListener.bind(
localAborter.signal
)
const originalRemove = localAborter.signal.removeEventListener.bind(
localAborter.signal
)
localAborter.signal.addEventListener = (
...args: Parameters<typeof localAborter.signal.addEventListener>
) => {
addSpy(...args)
return originalAdd(...args)
}
localAborter.signal.removeEventListener = (
...args: Parameters<typeof localAborter.signal.removeEventListener>
) => {
removeSpy(...args)
return originalRemove(...args)
}

const fetchMock = (
..._args: Parameters<typeof fetch>
): Promise<Response> => {
requestCount++
return Promise.resolve(new Response(`Forbidden`, { status: 403 }))
}

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
signal: localAborter.signal,
fetchClient: fetchMock,
subscribe: false,
onError: () => ({}),
})

stream.subscribe(() => {})

// Wait for several retries so multiple backoff timers expire normally
await vi.waitFor(
() => {
expect(requestCount).toBeGreaterThanOrEqual(4)
},
{ timeout: 15_000 }
)

localAborter.abort()

// Each backoff cycle should have added AND removed an abort listener.
// The remove count should match the add count (minus 1 for the final
// cycle that was interrupted by abort, where { once: true } handles cleanup).
const abortAdds = addSpy.mock.calls.filter(
(args: unknown[]) => args[0] === `abort`
).length
const abortRemoves = removeSpy.mock.calls.filter(
(args: unknown[]) => args[0] === `abort`
).length
expect(abortAdds).toBeGreaterThanOrEqual(3)
expect(abortRemoves).toBeGreaterThanOrEqual(abortAdds - 1)

warnSpy.mockRestore()
})
})
6 changes: 3 additions & 3 deletions packages/typescript-client/test/wake-detection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ describe(`Wake detection`, () => {
const unsub = stream.subscribe(() => {})

// Let the stream start, hit the 400 error, and retry via onError.
// The error retry path (#start lines 767-769) calls #start() recursively
// WITHOUT calling #teardown() first, so the timer is still alive.
await vi.advanceTimersByTimeAsync(0)
// The error retry path calls #start() recursively after an exponential
// backoff delay, so we need to advance time enough to cover it.
await vi.advanceTimersByTimeAsync(0)
await vi.advanceTimersByTimeAsync(200)
await vi.advanceTimersByTimeAsync(0)

expect(fetchCallCount).toBeGreaterThanOrEqual(2)
Expand Down
Loading