Skip to content

Commit a95153f

Browse files
committed
WIP realtime streams
1 parent c9811be commit a95153f

35 files changed

+1751
-41
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { $replica } from "~/db.server";
4+
import { realtimeStreams } from "~/services/realtimeStreamsGlobal.server";
5+
import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
6+
7+
const ParamsSchema = z.object({
8+
runId: z.string(),
9+
streamId: z.string(),
10+
});
11+
12+
export async function action({ request, params }: ActionFunctionArgs) {
13+
const $params = ParamsSchema.parse(params);
14+
15+
if (!request.body) {
16+
return new Response("No body provided", { status: 400 });
17+
}
18+
19+
return realtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
20+
}
21+
22+
export const loader = createLoaderApiRoute(
23+
{
24+
params: ParamsSchema,
25+
allowJWT: true,
26+
corsStrategy: "all",
27+
authorization: {
28+
action: "read",
29+
resource: (params) => ({ runs: params.runId }),
30+
superScopes: ["read:runs", "read:all", "admin"],
31+
},
32+
},
33+
async ({ params, authentication, request }) => {
34+
const run = await $replica.taskRun.findFirst({
35+
where: {
36+
friendlyId: params.runId,
37+
runtimeEnvironmentId: authentication.environment.id,
38+
},
39+
});
40+
41+
if (!run) {
42+
return new Response("Run not found", { status: 404 });
43+
}
44+
45+
return realtimeStreams.streamResponse(run.friendlyId, params.streamId, request.signal);
46+
}
47+
);
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { ActionFunctionArgs } from "@remix-run/server-runtime";
2+
3+
export async function action({ request }: ActionFunctionArgs) {
4+
if (!request.body) {
5+
return new Response("No body provided", { status: 400 });
6+
}
7+
8+
const reader = request.body.getReader();
9+
const decoder = new TextDecoder();
10+
let buffer = "";
11+
12+
try {
13+
while (true) {
14+
const { done, value } = await reader.read();
15+
16+
if (done) {
17+
if (buffer) {
18+
const data = JSON.parse(buffer);
19+
console.log(`${new Date().toISOString()} Received data at end:`, data);
20+
// You can process the data as needed
21+
}
22+
break;
23+
}
24+
25+
buffer += decoder.decode(value, { stream: true });
26+
const lines = buffer.split("\n");
27+
buffer = lines.pop() || "";
28+
29+
for (const line of lines) {
30+
if (line.trim()) {
31+
const data = JSON.parse(line);
32+
console.log(`${new Date().toISOString()} Received data:`, data);
33+
// You can process each data chunk as needed
34+
}
35+
}
36+
}
37+
38+
return new Response(null, { status: 200 });
39+
} catch (error) {
40+
console.error("Error processing stream:", error);
41+
return new Response(null, { status: 500 });
42+
}
43+
}

apps/webapp/app/services/httpAsyncStorage.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export type HttpLocalStorage = {
44
requestId: string;
55
path: string;
66
host: string;
7+
method: string;
78
};
89

910
const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import Redis, { RedisOptions } from "ioredis";
2+
import { logger } from "./logger.server";
3+
4+
export type RealtimeStreamsOptions = {
5+
redis: RedisOptions | undefined;
6+
};
7+
8+
export class RealtimeStreams {
9+
constructor(private options: RealtimeStreamsOptions) {}
10+
11+
async streamResponse(runId: string, streamId: string, signal: AbortSignal): Promise<Response> {
12+
const redis = new Redis(this.options.redis ?? {});
13+
const streamKey = `stream:${runId}:${streamId}`;
14+
15+
const stream = new TransformStream({
16+
transform(chunk: string, controller) {
17+
try {
18+
const data = JSON.parse(chunk);
19+
20+
if (typeof data === "object" && data !== null && "__end" in data && data.__end === true) {
21+
controller.terminate();
22+
return;
23+
}
24+
controller.enqueue(`data: ${chunk}\n\n`);
25+
} catch (error) {
26+
console.error("Invalid JSON in stream:", error);
27+
}
28+
},
29+
});
30+
31+
const response = new Response(stream.readable, {
32+
headers: {
33+
"Content-Type": "text/event-stream",
34+
"Cache-Control": "no-cache",
35+
Connection: "keep-alive",
36+
},
37+
});
38+
39+
let isCleanedUp = false;
40+
41+
async function cleanup() {
42+
if (isCleanedUp) return;
43+
isCleanedUp = true;
44+
await redis.quit();
45+
const writer = stream.writable.getWriter();
46+
if (writer) await writer.close().catch(() => {}); // Ensure close doesn't error if already closed
47+
}
48+
49+
signal.addEventListener("abort", cleanup);
50+
51+
(async () => {
52+
let lastId = "0";
53+
let retryCount = 0;
54+
const maxRetries = 3;
55+
56+
try {
57+
while (!signal.aborted) {
58+
try {
59+
const messages = await redis.xread(
60+
"COUNT",
61+
100,
62+
"BLOCK",
63+
5000,
64+
"STREAMS",
65+
streamKey,
66+
lastId
67+
);
68+
69+
retryCount = 0;
70+
71+
if (messages && messages.length > 0) {
72+
const [_key, entries] = messages[0];
73+
74+
for (const [id, fields] of entries) {
75+
lastId = id;
76+
77+
if (fields && fields.length >= 2 && !stream.writable.locked) {
78+
const writer = stream.writable.getWriter();
79+
try {
80+
await writer.write(fields[1]);
81+
} finally {
82+
writer.releaseLock();
83+
}
84+
}
85+
}
86+
}
87+
} catch (error) {
88+
console.error("Error reading from Redis stream:", error);
89+
retryCount++;
90+
if (retryCount >= maxRetries) throw error;
91+
await new Promise((resolve) => setTimeout(resolve, 1000 * retryCount));
92+
}
93+
}
94+
} catch (error) {
95+
console.error("Fatal error in stream processing:", error);
96+
} finally {
97+
await cleanup();
98+
}
99+
})();
100+
101+
return response;
102+
}
103+
104+
async ingestData(
105+
stream: ReadableStream<Uint8Array>,
106+
runId: string,
107+
streamId: string
108+
): Promise<Response> {
109+
const redis = new Redis(this.options.redis ?? {});
110+
111+
const streamKey = `stream:${runId}:${streamId}`;
112+
113+
async function cleanup(stream?: TransformStream) {
114+
try {
115+
await redis.quit();
116+
if (stream) {
117+
const writer = stream.writable.getWriter();
118+
await writer.close(); // Catch in case the stream is already closed
119+
}
120+
} catch (error) {
121+
logger.error("[RealtimeStreams][ingestData] Error in cleanup:", { error });
122+
}
123+
}
124+
125+
try {
126+
const reader = stream.getReader();
127+
const decoder = new TextDecoder();
128+
let buffer = "";
129+
130+
while (true) {
131+
const { done, value } = await reader.read();
132+
133+
logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, done });
134+
135+
if (done) {
136+
if (buffer) {
137+
const data = JSON.parse(buffer);
138+
await redis.xadd(streamKey, "*", "data", JSON.stringify(data));
139+
}
140+
break;
141+
}
142+
143+
buffer += decoder.decode(value, { stream: true });
144+
const lines = buffer.split("\n");
145+
buffer = lines.pop() || "";
146+
147+
for (const line of lines) {
148+
if (line.trim()) {
149+
const data = JSON.parse(line);
150+
151+
logger.debug("[RealtimeStreams][ingestData] Ingesting data", { streamKey });
152+
153+
await redis.xadd(streamKey, "*", "data", JSON.stringify(data));
154+
}
155+
}
156+
}
157+
158+
await redis.xadd(streamKey, "*", "data", JSON.stringify({ __end: true }));
159+
return new Response(null, { status: 200 });
160+
} catch (error) {
161+
console.error("Error in ingestData:", error);
162+
return new Response(null, { status: 500 });
163+
} finally {
164+
await cleanup();
165+
}
166+
}
167+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { env } from "~/env.server";
2+
import { singleton } from "~/utils/singleton";
3+
import { RealtimeStreams } from "./realtimeStreams.server";
4+
5+
function initializeRealtimeStreams() {
6+
return new RealtimeStreams({
7+
redis: {
8+
port: env.REDIS_PORT,
9+
host: env.REDIS_HOST,
10+
username: env.REDIS_USERNAME,
11+
password: env.REDIS_PASSWORD,
12+
enableAutoPipelining: true,
13+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
14+
keyPrefix: "tr:realtime:streams:",
15+
},
16+
});
17+
}
18+
19+
export const realtimeStreams = singleton("realtimeStreams", initializeRealtimeStreams);

apps/webapp/server.ts

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import { RegistryProxy } from "~/v3/registryProxy.server";
1414

1515
const app = express();
1616

17-
if (process.env.DISABLE_COMPRESSION !== "1") {
18-
app.use(compression());
19-
}
17+
// if (process.env.DISABLE_COMPRESSION !== "1") {
18+
// app.use(compression());
19+
// }
2020

2121
// http://expressjs.com/en/advanced/best-practice-security.html#at-a-minimum-disable-x-powered-by-header
2222
app.disable("x-powered-by");
@@ -73,15 +73,63 @@ if (process.env.HTTP_SERVER_DISABLED !== "true") {
7373
next();
7474
});
7575

76-
app.use((req, res, next) => {
77-
// Generate a unique request ID for each request
78-
const requestId = nanoid();
76+
app.post("/realtime/v1/streams/express/test", async (req, res) => {
77+
// Ensure the request is a readable stream
78+
const { method, headers } = req;
79+
console.log("Inside /realtime/v1/streams/express/test");
80+
81+
if (method !== "POST") {
82+
return res.status(405).send("Method Not Allowed");
83+
}
7984

80-
runWithHttpContext({ requestId, path: req.url, host: req.hostname }, next);
85+
// Set encoding to UTF-8 to read string data
86+
req.setEncoding("utf8");
87+
88+
let buffer = "";
89+
90+
try {
91+
req.on("data", (chunk) => {
92+
buffer += chunk;
93+
const lines = buffer.split("\n");
94+
buffer = lines.pop() || "";
95+
96+
for (const line of lines) {
97+
if (line.trim()) {
98+
const data = JSON.parse(line);
99+
console.log(`${new Date().toISOString()} Received data:`, data);
100+
// You can process each data chunk as needed
101+
}
102+
}
103+
});
104+
105+
req.on("end", () => {
106+
if (buffer) {
107+
const data = JSON.parse(buffer);
108+
console.log(`${new Date().toISOString()} Received data at end:`, data);
109+
// You can process the remaining data as needed
110+
}
111+
res.status(200).send(); // Send a success response
112+
});
113+
114+
req.on("error", (error) => {
115+
console.error("Error processing stream:", error);
116+
res.status(500).send("Internal Server Error");
117+
});
118+
} catch (error) {
119+
console.error("Error processing stream:", error);
120+
res.status(500).send("Internal Server Error");
121+
}
81122
});
82123

124+
// app.use((req, res, next) => {
125+
// // Generate a unique request ID for each request
126+
// const requestId = nanoid();
127+
128+
// runWithHttpContext({ requestId, path: req.url, host: req.hostname, method: req.method }, next);
129+
// });
130+
83131
if (process.env.DASHBOARD_AND_API_DISABLED !== "true") {
84-
app.use(apiRateLimiter);
132+
// app.use(apiRateLimiter);
85133

86134
app.all(
87135
"*",

0 commit comments

Comments
 (0)