|
1 | 1 | import {json} from "micro";
|
2 |
| -import {Readable} from "node:stream"; |
| 2 | +import {Readable, Transform} from "node:stream"; |
3 | 3 | import JSONStream from "JSONStream";
|
4 | 4 |
|
5 | 5 | import {
|
@@ -226,13 +226,19 @@ export async function queryStream(req, res, connection) {
|
226 | 226 | try {
|
227 | 227 | query = await session.executeStatement(sql, {runAsync: true});
|
228 | 228 | const rows = await query.fetchAll();
|
| 229 | + const timestampFieldMap = new Map(); |
229 | 230 | const schema = await query.getSchema();
|
230 | 231 |
|
231 | 232 | const responseSchema = {
|
232 | 233 | type: "array",
|
233 | 234 | items: {
|
234 | 235 | type: "object",
|
235 |
| - properties: schema.columns.reduce((schema, col, idx) => { |
| 236 | + properties: schema.columns.reduce((schema, col) => { |
| 237 | + // If the column is a timestamp, we need keep track of its key. |
| 238 | + // We will use this later to convert the timestamp to a UTC ISO-96801 format. |
| 239 | + if (col.typeDesc.types[0].primitiveEntry.type === 8) { |
| 240 | + timestampFieldMap.set(col.columnName, "TIMESTAMP"); |
| 241 | + } |
236 | 242 | return {
|
237 | 243 | ...schema,
|
238 | 244 | ...{
|
@@ -261,7 +267,32 @@ export async function queryStream(req, res, connection) {
|
261 | 267 |
|
262 | 268 | stream.on("error", reject);
|
263 | 269 |
|
264 |
| - stream.pipe(JSONStream.stringify("", "\n", "\n")).pipe(res); |
| 270 | + stream |
| 271 | + .pipe( |
| 272 | + new Transform({ |
| 273 | + objectMode: true, |
| 274 | + transform(chunk, encoding, callback) { |
| 275 | + let row = null; |
| 276 | + try { |
| 277 | + row = Object.entries(chunk).reduce((row, [key, value]) => { |
| 278 | + return { |
| 279 | + ...row, |
| 280 | + ...{ |
| 281 | + [key]: timestampFieldMap.has(key) ? `${value}Z` : value, |
| 282 | + }, |
| 283 | + }; |
| 284 | + }, {}); |
| 285 | + } catch (e) { |
| 286 | + console.error("row has unexpected format"); |
| 287 | + // TODO: Add error handling once server supports handling error for in flight streamed response |
| 288 | + // cb(new Error(e)); |
| 289 | + } |
| 290 | + callback(null, row); |
| 291 | + }, |
| 292 | + }) |
| 293 | + ) |
| 294 | + .pipe(JSONStream.stringify("", "\n", "\n")) |
| 295 | + .pipe(res); |
265 | 296 | });
|
266 | 297 | } catch (error) {
|
267 | 298 | if (!error.statusCode) error.statusCode = 400;
|
|
0 commit comments