Skip to content

Commit 167a060

Browse files
kravets-levkofjakobs
authored andcommitted
Merge branch 'main' into custom-auth-provider
Signed-off-by: Fabian Jakobs <[email protected]>
2 parents 8a809a5 + 0332513 commit 167a060

31 files changed

+2361
-199
lines changed

.github/workflows/main.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ jobs:
1212
lint:
1313
runs-on: ubuntu-latest
1414
steps:
15-
- uses: actions/checkout@v2
15+
- uses: actions/checkout@v3
1616
- name: Cache node modules
17-
uses: actions/cache@v1
17+
uses: actions/cache@v3
1818
env:
1919
cache-name: cache-node-modules
2020
with:
@@ -33,9 +33,9 @@ jobs:
3333
test:
3434
runs-on: ubuntu-latest
3535
steps:
36-
- uses: actions/checkout@v2
36+
- uses: actions/checkout@v3
3737
- name: Cache node modules
38-
uses: actions/cache@v1
38+
uses: actions/cache@v3
3939
env:
4040
cache-name: cache-node-modules
4141
with:

.github/workflows/release.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ jobs:
88
build:
99
runs-on: ubuntu-latest
1010
steps:
11-
- uses: actions/checkout@v2
11+
- uses: actions/checkout@v3
1212
- uses: actions/setup-node@v1
1313
with:
14-
node-version: 12
14+
node-version: 16
1515
registry-url: https://registry.npmjs.org/
1616
- run: npm ci
1717
- run: npm publish --access public

lib/DBSQLOperation/SchemaHelper.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
import { TOperationHandle, TGetResultSetMetadataResp, TSparkRowSetType } from '../../thrift/TCLIService_types';
1+
import { TGetResultSetMetadataResp, TOperationHandle, TSparkRowSetType } from '../../thrift/TCLIService_types';
22
import HiveDriver from '../hive/HiveDriver';
33
import StatusFactory from '../factory/StatusFactory';
44
import IOperationResult from '../result/IOperationResult';
55
import JsonResult from '../result/JsonResult';
6+
import ArrowResult from '../result/ArrowResult';
67
import HiveDriverError from '../errors/HiveDriverError';
78
import { definedOrError } from '../utils';
89

@@ -40,12 +41,13 @@ export default class SchemaHelper {
4041

4142
async getResultHandler(): Promise<IOperationResult> {
4243
const metadata = await this.fetchMetadata();
43-
const schema = definedOrError(metadata.schema);
4444
const resultFormat = definedOrError(metadata.resultFormat);
4545

4646
switch (resultFormat) {
4747
case TSparkRowSetType.COLUMN_BASED_SET:
48-
return new JsonResult(schema);
48+
return new JsonResult(metadata.schema);
49+
case TSparkRowSetType.ARROW_BASED_SET:
50+
return new ArrowResult(metadata.schema, metadata.arrowSchema);
4951
default:
5052
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
5153
}

lib/DBSQLSession.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import { stringify, NIL, parse } from 'uuid';
2-
import { TSessionHandle, TStatus, TOperationHandle, TSparkDirectResults } from '../thrift/TCLIService_types';
2+
import {
3+
TSessionHandle,
4+
TStatus,
5+
TOperationHandle,
6+
TSparkDirectResults,
7+
TSparkArrowTypes,
8+
} from '../thrift/TCLIService_types';
39
import HiveDriver from './hive/HiveDriver';
410
import { Int64 } from './hive/Types';
511
import IDBSQLSession, {
@@ -21,6 +27,7 @@ import StatusFactory from './factory/StatusFactory';
2127
import InfoValue from './dto/InfoValue';
2228
import { definedOrError } from './utils';
2329
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
30+
import globalConfig from './globalConfig';
2431

2532
const defaultMaxRows = 100000;
2633

@@ -42,6 +49,30 @@ function getDirectResultsOptions(maxRows: number | null = defaultMaxRows) {
4249
};
4350
}
4451

52+
function getArrowOptions(): {
53+
canReadArrowResult: boolean;
54+
useArrowNativeTypes?: TSparkArrowTypes;
55+
} {
56+
const { arrowEnabled = true, useArrowNativeTypes = true } = globalConfig;
57+
58+
if (!arrowEnabled) {
59+
return {
60+
canReadArrowResult: false,
61+
};
62+
}
63+
64+
return {
65+
canReadArrowResult: true,
66+
useArrowNativeTypes: {
67+
timestampAsArrow: useArrowNativeTypes,
68+
decimalAsArrow: useArrowNativeTypes,
69+
complexTypesAsArrow: useArrowNativeTypes,
70+
// TODO: currently unsupported by `apache-arrow` (see https://github.com/streamlit/streamlit/issues/4489)
71+
intervalTypesAsArrow: false,
72+
},
73+
};
74+
}
75+
4576
export default class DBSQLSession implements IDBSQLSession {
4677
private driver: HiveDriver;
4778

@@ -101,6 +132,7 @@ export default class DBSQLSession implements IDBSQLSession {
101132
queryTimeout: options.queryTimeout,
102133
runAsync: options.runAsync || false,
103134
...getDirectResultsOptions(options.maxRows),
135+
...getArrowOptions(),
104136
})
105137
.then((response) => this.createOperation(response));
106138
}

lib/globalConfig.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
interface GlobalConfig {
2+
arrowEnabled?: boolean;
3+
useArrowNativeTypes?: boolean;
4+
}
5+
6+
export default {
7+
arrowEnabled: true,
8+
useArrowNativeTypes: true,
9+
} satisfies GlobalConfig;

lib/result/ArrowResult.ts

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { Buffer } from 'buffer';
2+
import {
3+
tableFromIPC,
4+
Schema,
5+
Field,
6+
TypeMap,
7+
DataType,
8+
Type,
9+
StructRow,
10+
MapRow,
11+
Vector,
12+
util as arrowUtils,
13+
} from 'apache-arrow';
14+
import { TRowSet, TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
15+
import IOperationResult from './IOperationResult';
16+
import { getSchemaColumns, convertThriftValue } from './utils';
17+
18+
const { isArrowBigNumSymbol, bignumToBigInt } = arrowUtils;
19+
20+
type ArrowSchema = Schema<TypeMap>;
21+
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
22+
23+
export default class ArrowResult implements IOperationResult {
24+
private readonly schema: Array<TColumnDesc>;
25+
26+
private readonly arrowSchema?: Buffer;
27+
28+
constructor(schema?: TTableSchema, arrowSchema?: Buffer) {
29+
this.schema = getSchemaColumns(schema);
30+
this.arrowSchema = arrowSchema;
31+
}
32+
33+
getValue(data?: Array<TRowSet>) {
34+
if (this.schema.length === 0 || !this.arrowSchema || !data) {
35+
return [];
36+
}
37+
38+
const batches = this.getBatches(data);
39+
if (batches.length === 0) {
40+
return [];
41+
}
42+
43+
const table = tableFromIPC<TypeMap>([this.arrowSchema, ...batches]);
44+
return this.getRows(table.schema, table.toArray());
45+
}
46+
47+
private getBatches(data: Array<TRowSet>): Array<Buffer> {
48+
const result: Array<Buffer> = [];
49+
50+
data.forEach((rowSet) => {
51+
rowSet.arrowBatches?.forEach((arrowBatch) => {
52+
if (arrowBatch.batch) {
53+
result.push(arrowBatch.batch);
54+
}
55+
});
56+
});
57+
58+
return result;
59+
}
60+
61+
private getRows(schema: ArrowSchema, rows: Array<StructRow | MapRow>): Array<any> {
62+
return rows.map((row) => {
63+
// First, convert native Arrow values to corresponding plain JS objects
64+
const record = this.convertArrowTypes(row, undefined, schema.fields);
65+
// Second, cast all the values to original Thrift types
66+
return this.convertThriftTypes(record);
67+
});
68+
}
69+
70+
private convertArrowTypes(value: any, valueType: DataType | undefined, fields: Array<ArrowSchemaField> = []): any {
71+
const fieldsMap: Record<string, ArrowSchemaField> = {};
72+
for (const field of fields) {
73+
fieldsMap[field.name] = field;
74+
}
75+
76+
// Convert structures to plain JS object and process all its fields recursively
77+
if (value instanceof StructRow) {
78+
const result = value.toJSON();
79+
for (const key of Object.keys(result)) {
80+
const field: ArrowSchemaField | undefined = fieldsMap[key];
81+
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
82+
}
83+
return result;
84+
}
85+
if (value instanceof MapRow) {
86+
const result = value.toJSON();
87+
// Map type consists of its key and value types. We need only value type here, key will be cast to string anyway
88+
const field = fieldsMap.entries?.type.children.find((item) => item.name === 'value');
89+
for (const key of Object.keys(result)) {
90+
result[key] = this.convertArrowTypes(result[key], field?.type, field?.type.children || []);
91+
}
92+
return result;
93+
}
94+
95+
// Convert lists to JS array and process items recursively
96+
if (value instanceof Vector) {
97+
const result = value.toJSON();
98+
// Array type contains the only child which defines a type of each array's element
99+
const field = fieldsMap.element;
100+
return result.map((item) => this.convertArrowTypes(item, field?.type, field?.type.children || []));
101+
}
102+
103+
if (DataType.isTimestamp(valueType)) {
104+
return new Date(value);
105+
}
106+
107+
// Convert big number values to BigInt
108+
// Decimals are also represented as big numbers in Arrow, so additionally process them (convert to float)
109+
if (value instanceof Object && value[isArrowBigNumSymbol]) {
110+
const result = bignumToBigInt(value);
111+
if (DataType.isDecimal(valueType)) {
112+
return Number(result) / 10 ** valueType.scale;
113+
}
114+
return result;
115+
}
116+
117+
// Convert binary data to Buffer
118+
if (value instanceof Uint8Array) {
119+
return Buffer.from(value);
120+
}
121+
122+
// Return other values as is
123+
return typeof value === 'bigint' ? Number(value) : value;
124+
}
125+
126+
private convertThriftTypes(record: Record<string, any>): any {
127+
const result: Record<string, any> = {};
128+
129+
this.schema.forEach((column) => {
130+
const typeDescriptor = column.typeDesc.types[0]?.primitiveEntry;
131+
const field = column.columnName;
132+
result[field] = convertThriftValue(typeDescriptor, record[field]);
133+
});
134+
135+
return result;
136+
}
137+
}

lib/result/JsonResult.ts

Lines changed: 8 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,28 @@
1-
import { ColumnCode, ColumnType } from '../hive/Types';
2-
import {
3-
TTypeId,
4-
TRowSet,
5-
TTableSchema,
6-
TColumn,
7-
TColumnDesc,
8-
TPrimitiveTypeEntry,
9-
} from '../../thrift/TCLIService_types';
1+
import { ColumnCode } from '../hive/Types';
2+
import { TRowSet, TTableSchema, TColumn, TColumnDesc } from '../../thrift/TCLIService_types';
103
import IOperationResult from './IOperationResult';
4+
import { getSchemaColumns, convertThriftValue } from './utils';
115

126
export default class JsonResult implements IOperationResult {
13-
private readonly schema?: TTableSchema;
7+
private readonly schema: Array<TColumnDesc>;
148

159
constructor(schema?: TTableSchema) {
16-
this.schema = schema;
10+
this.schema = getSchemaColumns(schema);
1711
}
1812

1913
getValue(data?: Array<TRowSet>): Array<object> {
20-
if (!data) {
14+
if (this.schema.length === 0 || !data) {
2115
return [];
2216
}
2317

24-
const descriptors = this.getSchemaColumns();
25-
2618
return data.reduce((result: Array<any>, rowSet: TRowSet) => {
2719
const columns = rowSet.columns || [];
28-
const rows = this.getRows(columns, descriptors);
20+
const rows = this.getRows(columns, this.schema);
2921

3022
return result.concat(rows);
3123
}, []);
3224
}
3325

34-
private getSchemaColumns(): Array<TColumnDesc> {
35-
if (!this.schema) {
36-
return [];
37-
}
38-
39-
return [...this.schema.columns].sort((c1, c2) => c1.position - c2.position);
40-
}
41-
4226
private getRows(columns: Array<TColumn>, descriptors: Array<TColumnDesc>): Array<any> {
4327
return descriptors.reduce(
4428
(rows, descriptor) =>
@@ -69,67 +53,17 @@ export default class JsonResult implements IOperationResult {
6953
if (columnValue.nulls && this.isNull(columnValue.nulls, i)) {
7054
return null;
7155
}
72-
return this.convertData(typeDescriptor, value);
56+
return convertThriftValue(typeDescriptor, value);
7357
});
7458
}
7559

76-
private convertData(typeDescriptor: TPrimitiveTypeEntry | undefined, value: ColumnType): any {
77-
if (!typeDescriptor) {
78-
return value;
79-
}
80-
81-
switch (typeDescriptor.type) {
82-
case TTypeId.TIMESTAMP_TYPE:
83-
case TTypeId.DATE_TYPE:
84-
case TTypeId.UNION_TYPE:
85-
case TTypeId.USER_DEFINED_TYPE:
86-
return String(value);
87-
case TTypeId.DECIMAL_TYPE:
88-
return Number(value);
89-
case TTypeId.STRUCT_TYPE:
90-
case TTypeId.MAP_TYPE:
91-
return this.toJSON(value, {});
92-
case TTypeId.ARRAY_TYPE:
93-
return this.toJSON(value, []);
94-
case TTypeId.BIGINT_TYPE:
95-
return this.convertBigInt(value);
96-
case TTypeId.NULL_TYPE:
97-
case TTypeId.BINARY_TYPE:
98-
case TTypeId.INTERVAL_YEAR_MONTH_TYPE:
99-
case TTypeId.INTERVAL_DAY_TIME_TYPE:
100-
case TTypeId.FLOAT_TYPE:
101-
case TTypeId.DOUBLE_TYPE:
102-
case TTypeId.INT_TYPE:
103-
case TTypeId.SMALLINT_TYPE:
104-
case TTypeId.TINYINT_TYPE:
105-
case TTypeId.BOOLEAN_TYPE:
106-
case TTypeId.STRING_TYPE:
107-
case TTypeId.CHAR_TYPE:
108-
case TTypeId.VARCHAR_TYPE:
109-
default:
110-
return value;
111-
}
112-
}
113-
11460
private isNull(nulls: Buffer, i: number): boolean {
11561
const byte = nulls[Math.floor(i / 8)];
11662
const ofs = 2 ** (i % 8);
11763

11864
return (byte & ofs) !== 0;
11965
}
12066

121-
private toJSON(value: any, defaultValue: any): any {
122-
try {
123-
return JSON.parse(value);
124-
} catch (e) {
125-
return defaultValue;
126-
}
127-
}
128-
129-
private convertBigInt(value: any): any {
130-
return value.toNumber();
131-
}
132-
13367
private getColumnValue(column: TColumn) {
13468
return (
13569
column[ColumnCode.binaryVal] ||

0 commit comments

Comments
 (0)