Skip to content

Commit e350185

Browse files
author
Sylvestre
authored
Add query stream, describe tables and columns for databricks client. (#55)
* add basic query stream support * making error handling not crashing application * scope * Describe tables and columns utils function (#57) * Parameter naive implementation (#59) * fix exclusion character edge case * write schema to response even if no rows is found * update error message to be more explicit * moved runAsync to operation definition
1 parent a3d967d commit e350185

File tree

2 files changed

+309
-4
lines changed

2 files changed

+309
-4
lines changed

lib/databricks.js

Lines changed: 299 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1+
import {json} from "micro";
2+
import {Readable} from "node:stream";
3+
import JSONStream from "JSONStream";
4+
5+
import {
6+
validateDescribeColumnsPayload,
7+
validateQueryPayload,
8+
} from "./validate.js";
9+
import {badRequest} from "./errors.js";
10+
111
export class DatabricksSingleton {
212
static instance = null;
3-
13+
static types = new Map();
414
constructor() {
515
throw new Error(
616
"Do not use new DatabricksSingleton(). Call DatabricksSingleton.initialize() instead."
@@ -10,7 +20,65 @@ export class DatabricksSingleton {
1020
if (!DatabricksSingleton.instance) {
1121
try {
1222
DatabricksSingleton.instance = import("@databricks/sql").then(
13-
(module) => module.default
23+
(module) => {
24+
const databricks = module.default;
25+
const {TCLIService_types} = databricks.thrift;
26+
27+
const boolean = ["null", "boolean"],
28+
integer = ["null", "integer"],
29+
number = ["null", "number"],
30+
object = ["null", "object"],
31+
string = ["null", "string"];
32+
33+
DatabricksSingleton.types
34+
.set(TCLIService_types.TTypeId.BIGINT_TYPE, {
35+
type: integer,
36+
})
37+
.set(TCLIService_types.TTypeId.BINARY_TYPE, {
38+
type: object,
39+
buffer: true,
40+
})
41+
.set(TCLIService_types.TTypeId.BOOLEAN_TYPE, {
42+
type: boolean,
43+
})
44+
.set(TCLIService_types.TTypeId.TINYINT_TYPE, {
45+
type: integer,
46+
})
47+
.set(TCLIService_types.TTypeId.SMALLINT_TYPE, {
48+
type: integer,
49+
})
50+
.set(TCLIService_types.TTypeId.INT_TYPE, {
51+
type: integer,
52+
})
53+
.set(TCLIService_types.TTypeId.DECIMAL_TYPE, {
54+
type: number,
55+
decimal: true,
56+
})
57+
.set(TCLIService_types.TTypeId.DOUBLE_TYPE, {
58+
type: number,
59+
})
60+
.set(TCLIService_types.TTypeId.FLOAT_TYPE, {
61+
type: number,
62+
})
63+
.set(TCLIService_types.TTypeId.TIMESTAMP_TYPE, {
64+
type: string,
65+
date: true,
66+
})
67+
.set(TCLIService_types.TTypeId.DATE_TYPE, {
68+
type: string,
69+
date: true,
70+
})
71+
.set(TCLIService_types.TTypeId.INTERVAL_DAY_TIME_TYPE, {
72+
type: string,
73+
date: true,
74+
})
75+
.set(TCLIService_types.TTypeId.INTERVAL_YEAR_MONTH_TYPE, {
76+
type: string,
77+
date: true,
78+
});
79+
80+
return databricks;
81+
}
1482
);
1583
} catch (err) {
1684
console.error(err);
@@ -24,6 +92,214 @@ export class DatabricksSingleton {
2492
);
2593
return DatabricksSingleton.instance;
2694
}
95+
96+
get types() {
97+
if (DatabricksSingleton.types.size === 0)
98+
throw new Error(
99+
"DatabricksSingleton not initialized. Call DatabricksSingleton.initialize() first."
100+
);
101+
return DatabricksSingleton.types;
102+
}
103+
}
104+
105+
export async function describeTables(req, res, connection) {
106+
const tablesData = {};
107+
const session = await new Promise((resolve, reject) => {
108+
connection.on("error", reject).openSession().then(resolve);
109+
});
110+
111+
try {
112+
const catalogOperation = await session.getCatalogs({runAsync: true});
113+
const catalogs = await catalogOperation.fetchAll();
114+
115+
for (const catalog of catalogs) {
116+
const tableOperation = await session.getTables({
117+
catalogName: catalog.TABLE_CAT,
118+
runAsync: true,
119+
});
120+
const tables = await tableOperation.fetchAll();
121+
tables.forEach(
122+
(table) =>
123+
(tablesData[
124+
`${table.TABLE_CAT}.${table.TABLE_SCHEM}.${table.TABLE_NAME}`
125+
] = table)
126+
);
127+
}
128+
} catch (error) {
129+
if (!error.statusCode) error.statusCode = 400;
130+
throw error;
131+
} finally {
132+
if (session) {
133+
try {
134+
await session.close();
135+
} catch (err) {
136+
console.error(err);
137+
}
138+
}
139+
140+
if (connection) {
141+
try {
142+
await connection.close();
143+
} catch (err) {
144+
console.error(err);
145+
}
146+
}
147+
}
148+
149+
return tablesData;
150+
}
151+
152+
export async function describeColumns(req, res, connection) {
153+
let columns = [];
154+
const session = await new Promise((resolve, reject) => {
155+
connection.on("error", reject).openSession().then(resolve);
156+
});
157+
158+
const body = await json(req);
159+
160+
if (!validateDescribeColumnsPayload(body)) throw badRequest();
161+
162+
const {table, catalog, schema} = body;
163+
164+
try {
165+
const columnsOperation = await session.getColumns({
166+
catalogName: catalog,
167+
schemaName: schema,
168+
tableName: table,
169+
runAsync: true,
170+
});
171+
columns = await columnsOperation.fetchAll();
172+
} catch (error) {
173+
if (!error.statusCode) error.statusCode = 400;
174+
throw error;
175+
} finally {
176+
if (session) {
177+
try {
178+
await session.close();
179+
} catch (err) {
180+
console.error(err);
181+
}
182+
}
183+
184+
if (connection) {
185+
try {
186+
await connection.close();
187+
} catch (err) {
188+
console.error(err);
189+
}
190+
}
191+
}
192+
193+
return columns;
194+
}
195+
196+
/*
197+
* This function is running a given query and streams the results back to the client.
198+
* */
199+
export async function queryStream(req, res, connection) {
200+
let query;
201+
const session = await new Promise((resolve, reject) => {
202+
connection.on("error", reject).openSession().then(resolve);
203+
});
204+
205+
const body = await json(req);
206+
207+
if (!validateQueryPayload(body)) throw badRequest();
208+
209+
res.setHeader("Content-Type", "text/plain");
210+
const keepAlive = setInterval(() => res.write("\n"), 25e3);
211+
212+
let {sql, params = []} = body;
213+
214+
if (params.length) {
215+
sql = params.reduce((sql, p, idx) => {
216+
if (typeof p === "string") {
217+
// If the parameter is a string that includes the ' (exclusion character),
218+
// we need to add the r tag to treat it as raw-literal string.
219+
p = p.match(/'/g) ? `r'${p}'` : `'${p}'`;
220+
}
221+
222+
return sql.replace(`:${idx + 1}`, p);
223+
}, sql);
224+
}
225+
226+
try {
227+
query = await session.executeStatement(sql, {runAsync: true});
228+
const rows = await query.fetchAll();
229+
const schema = await query.getSchema();
230+
231+
const responseSchema = {
232+
type: "array",
233+
items: {
234+
type: "object",
235+
properties: schema.columns.reduce((schema, col, idx) => {
236+
return {
237+
...schema,
238+
...{
239+
[col.columnName]: dataTypeSchema(
240+
col.typeDesc.types[0].primitiveEntry.type
241+
),
242+
},
243+
};
244+
}, {}),
245+
},
246+
};
247+
res.write(`${JSON.stringify(responseSchema)}`);
248+
res.write("\n");
249+
250+
await new Promise(async (resolve, reject) => {
251+
const stream = new Readable.from(rows);
252+
253+
stream.once("data", () => {
254+
clearInterval(keepAlive);
255+
});
256+
257+
stream.on("close", (error) => {
258+
resolve();
259+
stream.destroy();
260+
});
261+
262+
stream.on("error", reject);
263+
264+
stream.pipe(JSONStream.stringify("", "\n", "\n")).pipe(res);
265+
});
266+
} catch (error) {
267+
if (!error.statusCode) error.statusCode = 400;
268+
// The default error message is too generic.
269+
// We want to show a more explicit error message, such as "Table not found" ...etc.
270+
if (error?.response?.displayMessage)
271+
error.message = error.response.displayMessage;
272+
273+
throw error;
274+
} finally {
275+
clearInterval(keepAlive);
276+
277+
if (query) {
278+
try {
279+
await query.close();
280+
} catch (err) {
281+
console.error(err);
282+
}
283+
}
284+
285+
if (session) {
286+
try {
287+
await session.close();
288+
} catch (err) {
289+
console.error(err);
290+
}
291+
}
292+
293+
if (connection) {
294+
try {
295+
await connection.close();
296+
} catch (err) {
297+
console.error(err);
298+
}
299+
}
300+
}
301+
302+
res.end();
27303
}
28304

29305
/*
@@ -58,7 +334,26 @@ export default ({token, host, path}) => {
58334
const client = new databricks.DBSQLClient();
59335
const connection = await client.connect({token, host, path});
60336

61-
// TODO: replace with queryStream method when implemented
62-
return check(req, res, connection);
337+
if (req.method === "POST") {
338+
if (req.url === "/describe-tables") {
339+
return describeTables(req, res, connection);
340+
}
341+
342+
if (req.url === "/describe-columns") {
343+
return describeColumns(req, res, connection);
344+
}
345+
}
346+
347+
return queryStream(req, res, connection);
63348
};
64349
};
350+
351+
// See https://github.com/databricks/databricks-sql-nodejs/blob/main/tests/unit/result/JsonResult.test.js
352+
function dataTypeSchema(type) {
353+
const types = DatabricksSingleton.types;
354+
if (types.has(type)) {
355+
return types.get(type);
356+
}
357+
358+
return {type: ["null", "string"]};
359+
}

lib/validate.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,13 @@ export const validateQueryPayload = ajv.compile({
1111
params: {type: ["object", "array"]},
1212
},
1313
});
14+
export const validateDescribeColumnsPayload = ajv.compile({
15+
type: "object",
16+
additionalProperties: false,
17+
required: ["table"],
18+
properties: {
19+
catalog: {type: "string", minLength: 1, maxLength: 32 * 1000},
20+
schema: {type: "string", minLength: 1, maxLength: 32 * 1000},
21+
table: {type: "string", minLength: 1, maxLength: 32 * 1000}
22+
},
23+
})

0 commit comments

Comments
 (0)