Skip to content

Commit f88d401

Browse files
authored
fix(webapp): dedupe realtimeStreams array push on stream create (#3653)
## Summary The PUT handler at `/realtime/v1/streams/:runId/:target/:streamId` ran `taskRun.update({ realtimeStreams: { push: streamId } })` on every call, even when the `streamId` was already present. SDK call patterns that re-initialize the same stream key on every chunk produce a per-write row UPDATE, duplicate entries pile up in the array, and the row-lock + TOAST rewrite cost grows unbounded on long-running stream sessions. ## Fix Mirror the sibling append handler: read the array first and only push when the `streamId` isn't already present. Identical behavior for first-time stream creation; repeat creates short-circuit to a single indexed read. The dashboard's per-run stream listing keeps working because the first create still records the entry. ## Test plan - [ ] A fresh PUT for a new `(run, streamId)` adds the entry to the array - [ ] A repeat PUT for the same pair leaves the array unchanged - [ ] 404 is returned when the run doesn't exist; 400 when the run is completed
1 parent 9623e88 commit f88d401

2 files changed

Lines changed: 25 additions & 9 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Dedupe the `realtimeStreams` array push on `PUT /realtime/v1/streams/:runId/:target/:streamId` so repeat stream-init calls for the same `(run, streamId)` skip the row UPDATE, mirroring the existing append handler.

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,39 +62,49 @@ const { action } = createActionApiRoute(
6262

6363
if (request.method === "PUT") {
6464
// This is the "create" endpoint
65-
const updatedRun = await prisma.taskRun.update({
65+
const target = await prisma.taskRun.findFirst({
6666
where: {
6767
friendlyId: targetId,
6868
runtimeEnvironmentId: authentication.environment.id,
6969
},
70-
data: {
71-
realtimeStreams: {
72-
push: params.streamId,
73-
},
74-
},
7570
select: {
71+
id: true,
72+
realtimeStreams: true,
7673
realtimeStreamsVersion: true,
7774
completedAt: true,
7875
},
7976
});
8077

81-
if (updatedRun.completedAt) {
78+
if (!target) {
79+
return new Response("Run not found", { status: 404 });
80+
}
81+
82+
if (target.completedAt) {
8283
return new Response("Cannot initialize a realtime stream on a completed run", {
8384
status: 400,
8485
});
8586
}
8687

88+
if (!target.realtimeStreams.includes(params.streamId)) {
89+
await prisma.taskRun.update({
90+
where: { id: target.id },
91+
data: {
92+
realtimeStreams: { push: params.streamId },
93+
},
94+
});
95+
}
96+
8797
const realtimeStream = getRealtimeStreamInstance(
8898
authentication.environment,
89-
updatedRun.realtimeStreamsVersion,
99+
target.realtimeStreamsVersion,
90100
basinContext
91101
);
92102

93103
const { responseHeaders } = await realtimeStream.initializeStream(targetId, params.streamId);
94104

95105
return json(
96106
{
97-
version: updatedRun.realtimeStreamsVersion,
107+
version: target.realtimeStreamsVersion,
98108
},
99109
{ status: 202, headers: responseHeaders }
100110
);

0 commit comments

Comments
 (0)