Skip to content

Commit d6458d9

Browse files
committed
Handle realtime with large payloads or outputs #1451
1 parent a95153f commit d6458d9

File tree

8 files changed

+52
-36
lines changed

8 files changed

+52
-36
lines changed

apps/webapp/app/routes/api.v1.packets.$.ts

+21-21
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
33
import { z } from "zod";
44
import { authenticateApiRequest } from "~/services/apiAuth.server";
5+
import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
56
import { generatePresignedUrl } from "~/v3/r2.server";
67

78
const ParamsSchema = z.object({
@@ -39,28 +40,27 @@ export async function action({ request, params }: ActionFunctionArgs) {
3940
return json({ presignedUrl });
4041
}
4142

42-
export async function loader({ request, params }: ActionFunctionArgs) {
43-
// Next authenticate the request
44-
const authenticationResult = await authenticateApiRequest(request);
43+
export const loader = createLoaderApiRoute(
44+
{
45+
params: ParamsSchema,
46+
allowJWT: true,
47+
corsStrategy: "all",
48+
},
49+
async ({ params, authentication }) => {
50+
const filename = params["*"];
4551

46-
if (!authenticationResult) {
47-
return json({ error: "Invalid or Missing API key" }, { status: 401 });
48-
}
52+
const presignedUrl = await generatePresignedUrl(
53+
authentication.environment.project.externalRef,
54+
authentication.environment.slug,
55+
filename,
56+
"GET"
57+
);
4958

50-
const parsedParams = ParamsSchema.parse(params);
51-
const filename = parsedParams["*"];
52-
53-
const presignedUrl = await generatePresignedUrl(
54-
authenticationResult.environment.project.externalRef,
55-
authenticationResult.environment.slug,
56-
filename,
57-
"GET"
58-
);
59+
if (!presignedUrl) {
60+
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
61+
}
5962

60-
if (!presignedUrl) {
61-
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
63+
// Caller can now use this URL to fetch that object.
64+
return json({ presignedUrl });
6265
}
63-
64-
// Caller can now use this URL to fetch that object.
65-
return json({ presignedUrl });
66-
}
66+
);

packages/core/src/v3/apiClient/index.ts

+3
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,7 @@ export class ApiClient {
598598
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, {
599599
closeOnComplete: true,
600600
headers: this.#getRealtimeHeaders(),
601+
client: this,
601602
});
602603
}
603604

@@ -611,6 +612,7 @@ export class ApiClient {
611612
{
612613
closeOnComplete: false,
613614
headers: this.#getRealtimeHeaders(),
615+
client: this,
614616
}
615617
);
616618
}
@@ -619,6 +621,7 @@ export class ApiClient {
619621
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/batches/${batchId}`, {
620622
closeOnComplete: false,
621623
headers: this.#getRealtimeHeaders(),
624+
client: this,
622625
});
623626
}
624627

packages/core/src/v3/apiClient/runStream.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
IOPacket,
99
parsePacket,
1010
} from "../utils/ioSerialization.js";
11+
import { ApiClient } from "./index.js";
1112
import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js";
1213

1314
export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
@@ -50,6 +51,7 @@ export type RunShapeStreamOptions = {
5051
fetchClient?: typeof fetch;
5152
closeOnComplete?: boolean;
5253
signal?: AbortSignal;
54+
client?: ApiClient;
5355
};
5456

5557
export type StreamPartResult<TRun, TStreams extends Record<string, any>> = {
@@ -84,6 +86,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
8486
signal: options?.signal,
8587
}
8688
),
89+
...options,
8790
};
8891

8992
return new RunSubscription<TRunTypes>($options);
@@ -306,7 +309,7 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
306309
return cachedResult;
307310
}
308311

309-
const result = await conditionallyImportAndParsePacket(packet);
312+
const result = await conditionallyImportAndParsePacket(packet, this.options.client);
310313
this.packetCache.set(`${row.friendlyId}/${key}`, result);
311314

312315
return result;

packages/core/src/v3/utils/ioSerialization.ts

+15-8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { apiClientManager } from "../apiClientManager-api.js";
77
import { zodfetch } from "../zodfetch.js";
88
import { z } from "zod";
99
import type { RetryOptions } from "../schemas/index.js";
10+
import { ApiClient } from "../apiClient/index.js";
1011

1112
export type IOPacket = {
1213
data?: string | undefined;
@@ -36,8 +37,11 @@ export async function parsePacket(value: IOPacket): Promise<any> {
3637
}
3738
}
3839

39-
export async function conditionallyImportAndParsePacket(value: IOPacket): Promise<any> {
40-
const importedPacket = await conditionallyImportPacket(value);
40+
export async function conditionallyImportAndParsePacket(
41+
value: IOPacket,
42+
client?: ApiClient
43+
): Promise<any> {
44+
const importedPacket = await conditionallyImportPacket(value, undefined, client);
4145

4246
return await parsePacket(importedPacket);
4347
}
@@ -159,19 +163,20 @@ async function exportPacket(packet: IOPacket, pathPrefix: string): Promise<IOPac
159163

160164
export async function conditionallyImportPacket(
161165
packet: IOPacket,
162-
tracer?: TriggerTracer
166+
tracer?: TriggerTracer,
167+
client?: ApiClient
163168
): Promise<IOPacket> {
164169
if (packet.dataType !== "application/store") {
165170
return packet;
166171
}
167172

168173
if (!tracer) {
169-
return await importPacket(packet);
174+
return await importPacket(packet, undefined, client);
170175
} else {
171176
const result = await tracer.startActiveSpan(
172177
"store.downloadPayload",
173178
async (span) => {
174-
return await importPacket(packet, span);
179+
return await importPacket(packet, span, client);
175180
},
176181
{
177182
attributes: {
@@ -209,16 +214,18 @@ export async function resolvePresignedPacketUrl(
209214
}
210215
}
211216

212-
async function importPacket(packet: IOPacket, span?: Span): Promise<IOPacket> {
217+
async function importPacket(packet: IOPacket, span?: Span, client?: ApiClient): Promise<IOPacket> {
213218
if (!packet.data) {
214219
return packet;
215220
}
216221

217-
if (!apiClientManager.client) {
222+
const $client = client ?? apiClientManager.client;
223+
224+
if (!$client) {
218225
return packet;
219226
}
220227

221-
const presignedResponse = await apiClientManager.client.getPayloadUrl(packet.data);
228+
const presignedResponse = await $client.getPayloadUrl(packet.data);
222229

223230
const response = await zodfetch(z.any(), presignedResponse.presignedUrl, undefined, {
224231
retry: ioRetryOptions,

references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
import RunDetails from "@/components/RunDetails";
44
import { Card, CardContent } from "@/components/ui/card";
5-
import { TriggerAuthContext, useRun } from "@trigger.dev/react-hooks";
5+
import { TriggerAuthContext, useRealtimeRun } from "@trigger.dev/react-hooks";
66
import type { exampleTask } from "@/trigger/example";
77

88
function RunDetailsWrapper({ runId }: { runId: string }) {
9-
const { run, error } = useRun<typeof exampleTask>(runId, { refreshInterval: 1000 });
9+
const { run, error } = useRealtimeRun<typeof exampleTask>(runId);
1010

1111
if (error) {
1212
return (

references/nextjs-realtime/src/components/RunDetails.tsx

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Badge } from "@/components/ui/badge";
22
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
33
import { ScrollArea } from "@/components/ui/scroll-area";
44
import { exampleTask } from "@/trigger/example";
5-
import type { RetrieveRunResult } from "@trigger.dev/sdk/v3";
5+
import type { TaskRunShape } from "@trigger.dev/sdk/v3";
66
import { AlertTriangleIcon, CheckCheckIcon, XIcon } from "lucide-react";
77

88
function formatDate(date: Date | undefined) {
@@ -17,7 +17,7 @@ function JsonDisplay({ data }: { data: any }) {
1717
);
1818
}
1919

20-
export default function RunDetails({ record }: { record: RetrieveRunResult<typeof exampleTask> }) {
20+
export default function RunDetails({ record }: { record: TaskRunShape<typeof exampleTask> }) {
2121
return (
2222
<Card className="w-full max-w-4xl mx-auto bg-gray-800 border-gray-700 text-gray-200">
2323
<CardHeader>

references/nextjs-realtime/src/trigger/example.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ export const exampleTask = schemaTask({
2323

2424
metadata.set("status", { type: "finished", progress: 1.0 });
2525

26-
return { message: "All good here!" };
26+
// Generate a return payload that is more than 128KB
27+
const bigPayload = new Array(100000).fill("a".repeat(10)).join("");
28+
29+
return { message: bigPayload };
2730
},
2831
});
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { defineConfig } from "@trigger.dev/sdk/v3";
22

33
export default defineConfig({
4-
project: "proj_xyxzzpnujsnhjiskihvs",
4+
project: "proj_bzhdaqhlymtuhlrcgbqy",
55
dirs: ["./src/trigger"],
66
});

0 commit comments

Comments
 (0)