Skip to content

Commit 618bb4a

Browse files
committed
Implement fieldReader, rawReader, rowReader using a shared genericReader
Add jsonb type support
1 parent 9007e55 commit 618bb4a

14 files changed

+657
-159
lines changed

README.md

+90-41
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22

33
[![Build Status](https://travis-ci.org/jeromew/node-pg-copy-streams-binary.svg)](https://travis-ci.org/jeromew/node-pg-copy-streams-binary)
44

5-
Streams for parsing and deparsing the PostgreSQL COPY binary format.
5+
This module contains helper streams for decoding and encoding the PostgreSQL COPY binary format.
66
Ingest streaming data into PostgresSQL or Export data from PostgreSQL and transform it into a stream, using the COPY BINARY format.
77

88
## what are you talking about ?
99

10-
Well first you have to know that PostgreSQL has not-so-well-known mechanism that helps when importing into PostgreSQL from a source (_copy-in_)
10+
Well first you have to know that PostgreSQL has a not-so-well-known mechanism that helps when importing into PostgreSQL from a source (_copy-in_)
1111
or exporting to a sink from PostgreSQL (_copy-out_)
1212

13-
You should first go and get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does
13+
Before using this module, you should make sure to get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does
1414
the heavy lifting of handling the COPY part of the protocol flow.
1515

1616
## what does this module do ?
@@ -21,17 +21,87 @@ The text and csv formats are interesting but they have some limitations due to t
2121

2222
The PostgreSQL documentation states : Many programs produce strange and occasionally perverse CSV files, so the file format is more a convention than a standard. Thus you might encounter some files that cannot be imported using this mechanism, and COPY might produce files that other programs cannot process.
2323

24-
Do you want to go there ? If you take the blue pill, then this module might be for you.
24+
Do you want to go there ? If you choose to go down the BINARY road, this module can help.
2525

26-
It can be used to parse and deparse the PostgreSQL binary streams that are made available by the `pg-copy-streams` module.
26+
It can be used to decode and encode the PostgreSQL binary streams that are made available by the `pg-copy-streams` module.
27+
28+
There are currently 5 helper Stream provided :
29+
30+
- rowReader
31+
- fieldReader
32+
- rawReader
33+
- rowWriter
34+
- transform
2735

2836
The main API is called `transform` an tries to hide many of those details. It can be used to easily do non trivial things like :
2937

3038
- transforming rows
3139
- expanding on the number of rows
3240
- forking rows into several databases at the same time, with the same of different structures
3341

34-
## Example
42+
## rowReader
43+
44+
A rowReader is a Transform stream that takes a copyOut stream as input and outputs a sequence of rows.
45+
The fields in each row are decoded according to the `options.mapping` definition.
46+
47+
### options.mapping
48+
49+
default: false
50+
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described.
51+
For each index in the array, you MUST put an object `{ key: name, type: type }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names.
52+
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library.
53+
54+
the Parser will push rows with the corresponding keys.
55+
56+
When `mapping` is not given, the Parser will push rows as arrays of Buffers.
57+
58+
## fieldReader
59+
60+
A fieldReader is a Transform stream that takes a copyOut stream as input and outputs a sequence of fields.
61+
The fields are decoded according to the `options.mapping` definition
62+
63+
Note that in fieldReader, each field can define a `mode = sync / async` attribute. When `mode = async`, the field output will be a Readable Stream.
64+
This can help in scenarios when you do not want to gather a big field in memory but you will need to make sure that you read the field stream because if you do not read it, backpressure will kick in and you will not receive more fields.
65+
66+
### options.mapping
67+
68+
default: false
69+
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described.
70+
For each index in the array, you MUST put an object `{ key: name, type: type, mode: mode }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names.
71+
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library.
72+
The mode can be 'sync' or 'async'. Default is 'sync'
73+
74+
the Parser will push fields with the corresponding keys.
75+
76+
When `mapping` is not given, the Parser will push fields as arrays of Buffers.
77+
78+
## rawReader
79+
80+
A rawReader is a Transform stream that takes a copyOut stream as input and outputs raw field bytes.
81+
82+
## rowWriter
83+
84+
the deparser is usually used without arguments. It is a Transform Stream (always in object mode) that receives a stream of arrays, and outputs their PostgreSQL binary representation.
85+
86+
Each array is a sequence of { type:.. , value: ..} pairs, where `type` is a PostgreSQL type (cf section supported types) and `value` is the value that need to be deparsed.
87+
88+
Currently, make sure sure value is not the javascript `undefined` because this case is not handled in the deparser. The value can be `null` but don't forget that the target table field should be nullable or the database will complain.
89+
90+
Usually, you would want to use a through2 stream to prepare the arrays, and pipe this into the deparser.
91+
92+
### options.COPY_sendHeader
93+
94+
default: true
95+
This option can be used to not send the header that PostgreSQL expects at the beginning of a COPY session.
96+
You could use this if you want to pipe this stream to an already opened COPY session.
97+
98+
### options.COPY_sendTrailer
99+
100+
default: true
101+
This option can be used to not send the header that PostgreSQL expects at the end of COPY session.
102+
You could use this if you want to unpipe this stream pipe another one that will send more data and maybe finish the COPY session.
103+
104+
## Example of `transform`
35105

36106
This library is mostly interesting for ETL operations (Extract, Transformation, Load). When you just need Extract+Load, `pg-copy-streams` does the job and you don't need this library.
37107

@@ -192,41 +262,6 @@ The Writable Stream will emit a `close` event, following the node.js documentati
192262
193263
Not all Streams emit a `close` event but this one does because it is necessary to wait for the end of all the underlying COPY FROM STDIN BINARY commands on the targets. `close` is emitted when all the underlying COPY commands have emitted their respective `finish` event.
194264

195-
## API for deparser
196-
197-
the deparser is usually used without arguments. It is a Transform Stream (always in object mode) that receives a stream of arrays, and outputs their PostgreSQL binary representation.
198-
199-
Each array is a sequence of { type:.. , value: ..} pairs, where `type` is a PostgreSQL type (cf section supported types) and `value` is the value that need to be deparsed.
200-
201-
Currently, make sure sure value is not the javascript `undefined` because this case is not handled in the deparser. The value can be `null` but don't forget that the target table field should be nullable or the database will complain.
202-
203-
Usually, you would want to use a through2 stream to prepare the arrays, and pipe this into the deparser.
204-
205-
### options.COPY_sendHeader
206-
207-
default: true
208-
This option can be used to not send the header that PostgreSQL expects at the beginning of a COPY session.
209-
You could use this if you want to pipe this stream to an already opened COPY session.
210-
211-
### options.COPY_sendTrailer
212-
213-
default: true
214-
This option can be used to not send the header that PostgreSQL expects at the end of COPY session.
215-
You could use this if you want to unpipe this stream pipe another one that will send more data and maybe finish the COPY session.
216-
217-
## API for Parser
218-
219-
### options.mapping
220-
221-
default: false
222-
This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described.
223-
For each index in the array, you MUST put an object `{ key: name, type: type }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names.
224-
The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library.
225-
226-
the Parser will push rows with the corresponding keys.
227-
228-
When `mapping` is not given, the Parser will push rows as arrays of Buffers.
229-
230265
## Currently supported types
231266

232267
For all supported types, their corresponding array version is also supported.
@@ -237,6 +272,7 @@ For all supported types, their corresponding array version is also supported.
237272
- float4, float8
238273
- text
239274
- json
275+
- jsonb
240276
- timestamptz
241277

242278
Note that when types are mentioned in the `mapping` option, it should be stricly equal to one of theses types. pgadmin might sometimes mention aliases (like integer instead of int4) and you should not use these aliases.
@@ -245,6 +281,19 @@ The types for array (one or more dimentions) corresponds to the type prefixed wi
245281

246282
## changelog
247283

284+
### version 2.0.0 - not yet published
285+
286+
This is a breaking version because it was decided to rename some exported variables.
287+
288+
- Rename exported objects for improved lisibility
289+
pg_types parse => decode
290+
pg_types deparse => encode
291+
parser => rowReader
292+
deparser => rowWriter
293+
- Implement fieldReader with async support
294+
- Implement rawReader
295+
- Add jsonb type support
296+
248297
### version 1.2.1 - published 2020-05-29
249298

250299
- Fix a compatibility bug introduced via `pg-copy-streams` 3.0. The parser can now handle rows that span across several stream chunks

index.js

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
module.exports = {
2+
fieldReader: require('./lib/fieldReader'),
3+
rawReader: require('./lib/rawReader'),
24
rowReader: require('./lib/rowReader'),
35
rowWriter: require('./lib/rowWriter'),
46
transform: require('./lib/transform'),

lib/fieldReader.js

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
module.exports = function (txt, options) {
2+
return new rowReader(txt, options)
3+
}
4+
5+
const binaryReader = require('./genericReader')
6+
7+
class rowReader extends binaryReader {
8+
constructor(options = {}) {
9+
options.readableObjectMode = true
10+
super(options)
11+
this.mapping = options.mapping || false
12+
}
13+
14+
fieldReady() {
15+
const o = {}
16+
o._fieldIndex = this._fieldIndex
17+
o._fieldCount = this._fieldCount
18+
o._fieldLength = this._fieldLength
19+
const key = this.mapping ? this.mapping[this._fieldIndex].key : 'value'
20+
o[key] = this._fieldHolder
21+
this.push(o)
22+
}
23+
24+
fieldMode() {
25+
return this.mapping ? this.mapping[this._fieldIndex].mode : 'sync'
26+
}
27+
28+
fieldType() {
29+
return this.mapping ? this.mapping[this._fieldIndex].type : null
30+
}
31+
}

0 commit comments

Comments
 (0)