1
- ## QuestDB Node.js Client
1
+ # QuestDB Node.js Client
2
+
3
+ ## Requirements
4
+
5
+ The client requires Node.js v16 or newer version.
2
6
3
7
## Installation
4
8
``` shell
5
- npm install @questdb/nodejs-client
9
+ npm i -s @questdb/nodejs-client
6
10
```
7
11
8
12
## Examples
9
13
10
14
### Basic API usage
15
+
11
16
``` javascript
12
- const { Sender } = require (" @questdb/nodejs-client" );
17
+ const { Sender } = require (' @questdb/nodejs-client' );
13
18
14
19
async function run () {
15
- // create a sender with a 4k buffer
16
- // it is important to size the buffer correctly so messages can fit
17
- const sender = new Sender ({bufferSize: 4096 });
20
+ const sender = new Sender ();
18
21
19
22
// connect to QuestDB
20
23
// host and port are required in connect options
21
- await sender .connect ({port: 9009 , host: " localhost" });
24
+ await sender .connect ({port: 9009 , host: ' localhost' });
22
25
23
26
// add rows to the buffer of the sender
24
- sender .table (" prices" ).symbol (" instrument" , " EURUSD" )
25
- .floatColumn (" bid" , 1.0195 ).floatColumn (" ask" , 1.0221 ).atNow ();
26
- sender .table (" prices" ).symbol (" instrument" , " GBPUSD" )
27
- .floatColumn (" bid" , 1.2076 ).floatColumn (" ask" , 1.2082 ).atNow ();
27
+ sender .table (' prices' ).symbol (' instrument' , ' EURUSD' )
28
+ .floatColumn (' bid' , 1.0195 ).floatColumn (' ask' , 1.0221 )
29
+ .at (Date .now (), ' ms' );
30
+ sender .table (' prices' ).symbol (' instrument' , ' GBPUSD' )
31
+ .floatColumn (' bid' , 1.2076 ).floatColumn (' ask' , 1.2082 )
32
+ .at (Date .now (), ' ms' );
28
33
29
34
// flush the buffer of the sender, sending the data to QuestDB
30
35
// the buffer is cleared after the data is sent and the sender is ready to accept new data
31
36
await sender .flush ();
32
37
33
38
// add rows to the buffer again and send it to the server
34
- sender .table (" prices" ).symbol (" instrument" , " EURUSD" )
35
- .floatColumn (" bid" , 1.0197 ).floatColumn (" ask" , 1.0224 ).atNow ();
39
+ sender .table (' prices' ).symbol (' instrument' , ' EURUSD' )
40
+ .floatColumn (' bid' , 1.0197 ).floatColumn (' ask' , 1.0224 )
41
+ .at (Date .now (), ' ms' );
36
42
await sender .flush ();
37
43
38
44
// close the connection after all rows ingested
39
45
await sender .close ();
40
46
return new Promise (resolve => resolve (0 ));
41
47
}
42
48
43
- run ().then (value => console .log (value)).catch (err => console .log (err));
49
+ run ()
50
+ .then (console .log )
51
+ .catch (console .error );
44
52
```
45
53
46
54
### Authentication and secure connection
55
+
47
56
``` javascript
48
- const { Sender } = require (" @questdb/nodejs-client" );
57
+ const { Sender } = require (' @questdb/nodejs-client' );
49
58
50
59
async function run () {
51
60
// construct a JsonWebKey
52
- const CLIENT_ID = " testapp" ;
53
- const PRIVATE_KEY = " 9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8" ;
61
+ const CLIENT_ID = ' testapp' ;
62
+ const PRIVATE_KEY = ' 9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8' ;
54
63
const PUBLIC_KEY = {
55
- x: " aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc" ,
56
- y: " __ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
64
+ x: ' aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc' ,
65
+ y: ' __ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
57
66
};
58
67
const JWK = {
59
68
... PUBLIC_KEY,
60
69
d: PRIVATE_KEY ,
61
70
kid: CLIENT_ID ,
62
- kty: " EC " ,
63
- crv: " P-256" ,
71
+ kty: ' EC ' ,
72
+ crv: ' P-256' ,
64
73
};
65
74
66
75
// pass the JsonWebKey to the sender
67
76
// will use it for authentication
68
- const sender = new Sender ({bufferSize : 4096 , jwk: JWK });
77
+ const sender = new Sender ({jwk: JWK });
69
78
70
79
// connect() takes an optional second argument
71
80
// if 'true' passed the connection is secured with TLS encryption
72
- await sender .connect ({port: 9009 , host: " localhost" }, true );
81
+ await sender .connect ({port: 9009 , host: ' localhost' }, true );
73
82
74
83
// send the data over the authenticated and secure connection
75
- sender .table (" prices" ).symbol (" instrument" , " EURUSD" )
76
- .floatColumn (" bid" , 1.0197 ).floatColumn (" ask" , 1.0224 ).atNow ();
84
+ sender .table (' prices' ).symbol (' instrument' , ' EURUSD' )
85
+ .floatColumn (' bid' , 1.0197 ).floatColumn (' ask' , 1.0224 )
86
+ .at (Date .now (), ' ms' );
77
87
await sender .flush ();
78
88
79
89
// close the connection after all rows ingested
80
90
await sender .close ();
81
- return new Promise (resolve => resolve (0 ));
82
91
}
83
92
84
- run ().then ( value => console . log (value)). catch (err => console .log (err) );
93
+ run ().catch (console .error );
85
94
```
86
95
87
96
### TypeScript example
97
+
88
98
``` typescript
89
- import { Sender } from " @questdb/nodejs-client" ;
99
+ import { Sender } from ' @questdb/nodejs-client' ;
90
100
91
101
async function run(): Promise <number > {
92
102
// construct a JsonWebKey
93
- const CLIENT_ID: string = " testapp" ;
94
- const PRIVATE_KEY: string = " 9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8" ;
103
+ const CLIENT_ID: string = ' testapp' ;
104
+ const PRIVATE_KEY: string = ' 9b9x5WhJywDEuo1KGQWSPNxtX-6X6R2BRCKhYMMY6n8' ;
95
105
const PUBLIC_KEY: { x: string , y: string } = {
96
- x: " aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc" ,
97
- y: " __ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg"
106
+ x: ' aultdA0PjhD_cWViqKKyL5chm6H1n-BiZBo_48T-uqc' ,
107
+ y: ' __ptaol41JWSpTTL525yVEfzmY8A6Vi_QrW1FjKcHMg'
98
108
};
99
109
const JWK: { x: string , y: string , kid: string , kty: string , d: string , crv: string } = {
100
110
... PUBLIC_KEY ,
101
111
d: PRIVATE_KEY ,
102
112
kid: CLIENT_ID ,
103
- kty: " EC " ,
104
- crv: " P-256" ,
113
+ kty: ' EC ' ,
114
+ crv: ' P-256' ,
105
115
};
106
116
107
117
// pass the JsonWebKey to the sender
108
118
// will use it for authentication
109
- const sender: Sender = new Sender ({bufferSize: 4096 , jwk: JWK });
119
+ const sender: Sender = new Sender ({jwk: JWK });
110
120
111
121
// connect() takes an optional second argument
112
122
// if 'true' passed the connection is secured with TLS encryption
113
- await sender .connect ({port: 9009 , host: " localhost" }, true );
123
+ await sender .connect ({port: 9009 , host: ' localhost' }, true );
114
124
115
125
// send the data over the authenticated and secure connection
116
- sender .table (" prices" ).symbol (" instrument" , " EURUSD" )
117
- .floatColumn (" bid" , 1.0197 ).floatColumn (" ask" , 1.0224 ).atNow ( );
126
+ sender .table (' prices' ).symbol (' instrument' , ' EURUSD' )
127
+ .floatColumn (' bid' , 1.0197 ).floatColumn (' ask' , 1.0224 ).at ( Date . now (), ' ms ' );
118
128
await sender .flush ();
119
129
120
130
// close the connection after all rows ingested
121
131
await sender .close ();
122
- return new Promise (resolve => resolve (0 ));
123
132
}
124
133
125
- run ().then ( value => console . log ( value )). catch (err => console .log ( err ) );
134
+ run ().catch (console .error );
126
135
```
127
136
128
137
### Worker threads example
138
+
129
139
``` javascript
130
- const { Sender } = require (" @questdb/nodejs-client" );
140
+ const { Sender } = require (' @questdb/nodejs-client' );
131
141
const { Worker , isMainThread , parentPort , workerData } = require (' worker_threads' );
132
142
133
143
// fake venue
@@ -136,7 +146,7 @@ function* venue(ticker) {
136
146
let end = false ;
137
147
setTimeout (() => { end = true ; }, rndInt (5000 ));
138
148
while (! end) {
139
- yield {" ticker" : ticker, " price" : Math .random ()};
149
+ yield {' ticker' : ticker, ' price' : Math .random ()};
140
150
}
141
151
}
142
152
@@ -153,35 +163,37 @@ async function subscribe(ticker, onTick) {
153
163
154
164
async function run () {
155
165
if (isMainThread) {
156
- const tickers = [" t1 " , " t2 " , " t3 " , " t4 " ];
166
+ const tickers = [' t1 ' , ' t2 ' , ' t3 ' , ' t4 ' ];
157
167
// main thread to start a worker thread for each ticker
158
168
for (let ticker in tickers) {
159
169
const worker = new Worker (__filename , { workerData: { ticker: ticker } })
160
170
.on (' error' , (err ) => { throw err; })
161
171
.on (' exit' , () => { console .log (` ${ ticker} thread exiting...` ); })
162
- .on (' message' , (msg ) => { console .log (" Ingested " + msg .count + " prices for ticker " + msg .ticker ); });
172
+ .on (' message' , (msg ) => {
173
+ console .log (` Ingested ${ msg .count } prices for ticker ${ msg .ticker } ` );
174
+ });
163
175
}
164
176
} else {
165
177
// it is important that each worker has a dedicated sender object
166
178
// threads cannot share the sender because they would write into the same buffer
167
- const sender = new Sender ({ bufferSize : 4096 } );
168
- await sender .connect ({ port: 9009 , host: " localhost" });
179
+ const sender = new Sender ();
180
+ await sender .connect ({ port: 9009 , host: ' localhost' });
169
181
170
182
// subscribe for the market data of the ticker assigned to the worker
171
183
// ingest each price update into the database using the sender
172
184
let count = 0 ;
173
185
await subscribe (workerData .ticker , async (tick ) => {
174
186
sender
175
- .table (" prices" )
176
- .symbol (" ticker" , tick .ticker )
177
- .floatColumn (" price" , tick .price )
178
- .atNow ( );
187
+ .table (' prices' )
188
+ .symbol (' ticker' , tick .ticker )
189
+ .floatColumn (' price' , tick .price )
190
+ .at ( Date . now (), ' ms ' );
179
191
await sender .flush ();
180
192
count++ ;
181
193
});
182
194
183
195
// let the main thread know how many prices were ingested
184
- parentPort .postMessage ({" ticker" : workerData .ticker , " count" : count});
196
+ parentPort .postMessage ({' ticker' : workerData .ticker , ' count' : count});
185
197
186
198
// close the connection to the database
187
199
await sender .close ();
@@ -196,5 +208,7 @@ function rndInt(limit) {
196
208
return Math .floor ((Math .random () * limit) + 1 );
197
209
}
198
210
199
- run ().catch ((err ) => console .log (err));
211
+ run ()
212
+ .then (console .log )
213
+ .catch (console .error );
200
214
```
0 commit comments