Skip to content

Commit eefb96c

Browse files
matt-aitkenclaude
andauthored
fix(webapp): recover from ClickHouse JSON parse failures in runs replication (#3708)
## Summary On a ClickHouse `Cannot parse JSON object` rejection, `RunsReplicationService` now sanitizes lone UTF-16 surrogates across the failing batch via the existing `sanitizeRows` helper and retries once. If the sanitizer found nothing or the retry also fails, the batch is dropped loudly with a counter increment, so the surrounding `#insertWithRetry` layer doesn't spin three more times on a deterministic failure. Non-parse errors propagate unchanged. Mirrors the pattern from #3659 (for `ClickhouseEventRepository`) — same root cause (lone UTF-16 surrogates in user-provided JSON), same recovery shape, **reusing the same shared helpers** (`sanitizeRows`, `isClickHouseJsonParseError`, `parseRowNumberFromError`). Fixes the customer-facing symptom from [TRI-9755](https://linear.app/triggerdotdev/issue/TRI-9755): a single row's poisoned `output` JSON used to take down the `COMPLETED_SUCCESSFULLY` UPDATE events for its 50+ batch-mates, stranding them in `EXECUTING` in ClickHouse forever and inflating "Running" counts on the Tasks page. Confirmed in production this is ongoing — ~120k stale rows accumulated in a single 5-hour burst on 2026-05-18; smaller continuous leak before and after. ## What changed `apps/webapp/app/services/runsReplicationService.server.ts`: - Imports the three helpers from `~/v3/eventRepository/sanitizeRowsOnParseError.server` (no duplication; no move). - New private `#insertWithJsonParseRecovery<T>(rows, doInsert, contextLabel, attempt)` — generic over `TaskRunInsertArray[]` and `PayloadInsertArray[]`, structurally identical to `ClickhouseEventRepository.#insertWithJsonParseRecovery`. Try → on parse error sanitize the whole batch (the `at row N` hint is logged but not used to slice — semantics under `input_format_parallel_parsing` aren't stable) → retry once → drop with loud log if sanitizer found nothing OR retry still fails. - `#insertTaskRunInserts` and `#insertPayloadInserts` extract a `doInsert` closure and hand it to the wrapper. Existing error logging, span recording, and `recordSpanError` are preserved inside the closure. - New `private _permanentlyDroppedBatches = 0` counter with a public getter, for ops dashboards and tests (matches the events-repo convention). One shared counter for both insert sites — granularity comes from the `contextLabel` (`task_runs_v2` / `raw_task_runs_payload_v1`) on every log line. `.server-changes/runs-replication-utf16-recovery.md` — release notes entry. ## Why no new tests The shared helpers already have full unit + real-ClickHouse contract coverage from #3659 (`apps/webapp/test/sanitizeRowsOnParseError.test.ts`, `apps/webapp/test/otlpUtf16Sanitization.integration.test.ts`). The new wrapper is a line-for-line structural port. Adding a parallel integration test would require synthesizing bad data that *escapes* the preemptive `detectBadJsonStrings` check in `#prepareJson` but still trips ClickHouse — non-trivial without hand-crafted fixtures and wouldn't cover any new logic. ## What this does NOT do - Doesn't touch the ~120k existing stale `EXECUTING` rows in production. That needs a reconciliation/backfill sweep (separate ticket — TRI-9755 fix #3). - Doesn't sanitize the `error` column path (`runsReplicationService.server.ts:932 const errorData = { data: run.error };`). Reactive recovery will catch it if it ever poisons a batch, but feeding it through `#prepareJson` like `output` is a cheap follow-up. ## Test plan - [x] `pnpm run typecheck --filter webapp` — clean - [ ] Post-deploy: confirm `permanentlyDroppedBatches` counter stays at zero (or near-zero) in `/stp/trigger-app-prod/ecs/replication/service-container/process-logs`, and watch for `Sanitizing batch after ClickHouse JSON parse error` warns to confirm recovery is firing on real traffic - [ ] Post-deploy: confirm the rate of new "EXECUTING-but-actually-COMPLETED" zombies in ClickHouse flattens (current rate ≈ tens-to-hundreds per hour platform-wide) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 9f64bf4 commit eefb96c

2 files changed

Lines changed: 189 additions & 38 deletions

File tree

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Recover from ClickHouse `JSONEachRow` parse failures in the runs
7+
replication path. `RunsReplicationService` now wraps its task-run and
8+
payload inserts with the same reactive-sanitisation pattern used by
9+
`ClickhouseEventRepository` since #3659: on `Cannot parse JSON object`,
10+
sanitize lone UTF-16 surrogates across the batch (via the shared
11+
`sanitizeRows` helper) and retry once. If the sanitiser found nothing
12+
or the retry also fails, the batch is dropped, `permanentlyDroppedBatches`
13+
increments, and a loud error log is emitted — preventing the surrounding
14+
`#insertWithRetry` layer from spinning on the same deterministic
15+
failure. Non-parse errors propagate unchanged.
16+
17+
Stops the bleeding behind the customer-visible "Tasks page shows a huge
18+
Running count" symptom: one row with bad output JSON used to take down
19+
the COMPLETED updates for its 50+ batch-mates, leaving every one of
20+
them stranded in `EXECUTING` in ClickHouse forever (Postgres unaffected).

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 169 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ import EventEmitter from "node:events";
3838
import pLimit from "p-limit";
3939
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
4040
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
41+
import {
42+
isClickHouseJsonParseError,
43+
parseRowNumberFromError,
44+
sanitizeRows,
45+
} from "~/v3/eventRepository/sanitizeRowsOnParseError.server";
4146

4247
interface TransactionEvent<T = any> {
4348
tag: "insert" | "update" | "delete";
@@ -129,6 +134,15 @@ export class RunsReplicationService {
129134
private _disablePayloadInsert: boolean;
130135
private _disableErrorFingerprinting: boolean;
131136

137+
/**
138+
* Counts batches that hit a ClickHouse `Cannot parse JSON object` failure
139+
* that survived one sanitize-retry. These batches are dropped on the floor
140+
* (returning success-ish to the caller so the retry layer doesn't spin on
141+
* the same deterministic failure), and we track the drop count for
142+
* observability. Counter only — does not gate behaviour.
143+
*/
144+
private _permanentlyDroppedBatches = 0;
145+
132146
// Metrics
133147
private _replicationLagHistogram: Histogram;
134148
private _batchesFlushedCounter: Counter;
@@ -283,6 +297,11 @@ export class RunsReplicationService {
283297
this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000;
284298
}
285299

300+
/** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */
301+
get permanentlyDroppedBatches() {
302+
return this._permanentlyDroppedBatches;
303+
}
304+
286305
public async shutdown() {
287306
if (this._isShuttingDown) return;
288307

@@ -658,7 +677,7 @@ export class RunsReplicationService {
658677
combinedTaskRunInserts.push(...group.taskRunInserts);
659678
combinedPayloadInserts.push(...group.payloadInserts);
660679

661-
const [trErr] = await this.#insertWithRetry(
680+
const [trErr, trOutcome] = await this.#insertWithRetry(
662681
(attempt) => this.#insertTaskRunInserts(clickhouse, group.taskRunInserts, attempt),
663682
"task run inserts",
664683
flushId
@@ -667,7 +686,7 @@ export class RunsReplicationService {
667686
taskRunError = trErr;
668687
}
669688

670-
const [plErr] = await this.#insertWithRetry(
689+
const [plErr, plOutcome] = await this.#insertWithRetry(
671690
(attempt) => this.#insertPayloadInserts(clickhouse, group.payloadInserts, attempt),
672691
"payload inserts",
673692
flushId
@@ -676,10 +695,14 @@ export class RunsReplicationService {
676695
payloadError = plErr;
677696
}
678697

679-
if (!trErr) {
698+
// Only count rows that actually landed in ClickHouse. `kind: "dropped"`
699+
// means the recovery wrapper bailed (sanitizer no-op or sanitize-retry
700+
// still failed) — those rows never made it, so they must not show up
701+
// as successful inserts in the per-batch counter.
702+
if (!trErr && trOutcome?.kind !== "dropped") {
680703
this._taskRunsInsertedCounter.add(group.taskRunInserts.length);
681704
}
682-
if (!plErr) {
705+
if (!plErr && plOutcome?.kind !== "dropped") {
683706
this._payloadsInsertedCounter.add(group.payloadInserts.length);
684707
}
685708
}
@@ -837,24 +860,28 @@ export class RunsReplicationService {
837860
return;
838861
}
839862
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
840-
const [insertError, insertResult] =
841-
await clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
842-
params: {
843-
clickhouse_settings: this.#getClickhouseInsertSettings(),
844-
},
845-
});
846-
847-
if (insertError) {
848-
this.logger.error("Error inserting task run inserts attempt", {
849-
error: insertError,
850-
attempt,
851-
});
852-
853-
recordSpanError(span, insertError);
854-
throw insertError;
855-
}
856-
857-
return insertResult;
863+
const doInsert = async () => {
864+
const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays(
865+
taskRunInserts,
866+
{ params: { clickhouse_settings: this.#getClickhouseInsertSettings() } }
867+
);
868+
if (insertError) {
869+
this.logger.error("Error inserting task run inserts attempt", {
870+
error: insertError,
871+
attempt,
872+
});
873+
recordSpanError(span, insertError);
874+
throw insertError;
875+
}
876+
return insertResult;
877+
};
878+
879+
return await this.#insertWithJsonParseRecovery(
880+
taskRunInserts,
881+
doInsert,
882+
"task_runs_v2",
883+
attempt
884+
);
858885
});
859886
}
860887

@@ -867,25 +894,129 @@ export class RunsReplicationService {
867894
return;
868895
}
869896
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
870-
const [insertError, insertResult] =
871-
await clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
872-
params: {
873-
clickhouse_settings: this.#getClickhouseInsertSettings(),
874-
},
875-
});
876-
877-
if (insertError) {
878-
this.logger.error("Error inserting payload inserts attempt", {
879-
error: insertError,
880-
attempt,
881-
});
897+
const doInsert = async () => {
898+
const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays(
899+
payloadInserts,
900+
{ params: { clickhouse_settings: this.#getClickhouseInsertSettings() } }
901+
);
902+
if (insertError) {
903+
this.logger.error("Error inserting payload inserts attempt", {
904+
error: insertError,
905+
attempt,
906+
});
907+
recordSpanError(span, insertError);
908+
throw insertError;
909+
}
910+
return insertResult;
911+
};
912+
913+
return await this.#insertWithJsonParseRecovery(
914+
payloadInserts,
915+
doInsert,
916+
"raw_task_runs_payload_v1",
917+
attempt
918+
);
919+
});
920+
}
882921

883-
recordSpanError(span, insertError);
884-
throw insertError;
922+
/**
923+
* Wraps a ClickHouse insert with reactive UTF-16 sanitization for
924+
* `Cannot parse JSON object` rejections. Mirrors the pattern from
925+
* `ClickhouseEventRepository.#insertWithJsonParseRecovery` introduced
926+
* in #3659 — same root cause (lone UTF-16 surrogates in user-provided
927+
* JSON), same recovery shape:
928+
*
929+
* 1. Try the insert. Healthy batches pay zero scan cost.
930+
* 2. On parse error, walk the whole batch via `sanitizeRows` and
931+
* replace any lone-surrogate string with `"[invalid-utf16]"`.
932+
* 3. Retry once. If the sanitizer found nothing or the retry also
933+
* fails with the same error class, drop the batch loudly and
934+
* return — do NOT rethrow, otherwise the surrounding
935+
* `#insertWithRetry` layer would spin three more times on the
936+
* same deterministic failure.
937+
* 4. Non-parse errors propagate unchanged so the existing
938+
* transient-retry path still handles them.
939+
*
940+
* The whole-batch scan (rather than slicing on the `at row N` hint) is
941+
* deliberate: `at row N` semantics under `input_format_parallel_parsing`
942+
* aren't stable enough to safely skip rows. The cost is bounded because
943+
* `detectBadJsonStrings` exits in O(1) for clean strings.
944+
*/
945+
async #insertWithJsonParseRecovery<T extends object>(
946+
rows: T[],
947+
doInsert: () => Promise<unknown>,
948+
contextLabel: string,
949+
attempt: number
950+
): Promise<
951+
| { kind: "inserted"; insertResult: unknown }
952+
| { kind: "sanitized"; insertResult: unknown }
953+
| { kind: "dropped" }
954+
> {
955+
try {
956+
return { kind: "inserted", insertResult: await doInsert() };
957+
} catch (firstError) {
958+
if (!isClickHouseJsonParseError(firstError)) throw firstError;
959+
960+
const firstMessage =
961+
typeof firstError === "object" && firstError !== null && "message" in firstError
962+
? String((firstError as { message?: unknown }).message ?? "")
963+
: String(firstError);
964+
965+
const rowHint = parseRowNumberFromError(firstMessage);
966+
const { rowsTouched, fieldsSanitized } = sanitizeRows(rows);
967+
968+
if (fieldsSanitized === 0) {
969+
this._permanentlyDroppedBatches += 1;
970+
this.logger.error(
971+
"Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix",
972+
{
973+
contextLabel,
974+
attempt,
975+
batchSize: rows.length,
976+
clickhouseRowHint: rowHint,
977+
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
978+
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
979+
clickhouseError: firstMessage.split("\n")[0],
980+
}
981+
);
982+
return { kind: "dropped" };
885983
}
886984

887-
return insertResult;
888-
});
985+
this.logger.warn("Sanitizing batch after ClickHouse JSON parse error", {
986+
contextLabel,
987+
attempt,
988+
batchSize: rows.length,
989+
clickhouseRowHint: rowHint,
990+
rowsTouched,
991+
fieldsSanitized,
992+
clickhouseError: firstMessage.split("\n")[0],
993+
});
994+
995+
try {
996+
return { kind: "sanitized", insertResult: await doInsert() };
997+
} catch (retryError) {
998+
if (!isClickHouseJsonParseError(retryError)) throw retryError;
999+
1000+
this._permanentlyDroppedBatches += 1;
1001+
const retryMessage =
1002+
typeof retryError === "object" && retryError !== null && "message" in retryError
1003+
? String((retryError as { message?: unknown }).message ?? "")
1004+
: String(retryError);
1005+
this.logger.error(
1006+
"Dropped batch after sanitize-retry still hit ClickHouse JSON parse error",
1007+
{
1008+
contextLabel,
1009+
attempt,
1010+
batchSize: rows.length,
1011+
permanentlyDroppedBatches: this._permanentlyDroppedBatches,
1012+
sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024),
1013+
firstError: firstMessage.split("\n")[0],
1014+
retryError: retryMessage.split("\n")[0],
1015+
}
1016+
);
1017+
return { kind: "dropped" };
1018+
}
1019+
}
8891020
}
8901021

8911022
async #prepareRunInserts(

0 commit comments

Comments
 (0)