|
| 1 | +## pg-copy-streams-binary |
| 2 | + |
| 3 | +[](https://travis-ci.org/jeromew/node-pg-copy-streams-binary) |
| 4 | + |
| 5 | +Streams for parsing and deparsing the PostgreSQL COPY binary format. |
| 6 | +Ingest streaming data into PostgresSQL or Export data from PostgreSQL and transform it into a stream, using the COPY BINARY format. |
| 7 | + |
| 8 | +## what are you talking about ? |
| 9 | + |
| 10 | +Well first you have to know that PostgreSQL has not-so-well-known mechanism that helps when importing into PostgreSQL from a source (*copy-in*) |
| 11 | +or exporting to a sink from PostgreSQL (*copy-out*) |
| 12 | + |
| 13 | +You should first go and get familiar with the [pg-copy-streams](https://github.com/brianc/node-pg-copy-streams) module that does |
| 14 | +the heavy lifting of handling the COPY part of the protocol flow. |
| 15 | + |
| 16 | +## what does this module do ? |
| 17 | + |
| 18 | +When dealing with the COPY mechanism, you can use different formats for *copy-out* or *copy-in* : text, csv or binary. |
| 19 | + |
| 20 | +The text and csv formats are interesting but they have some limitations due to the fact that they are text based, need field separators, escaping, etc. Have you ever been in the CSV hell ? |
| 21 | + |
| 22 | +The PostgreSQL documentation states : Many programs produce strange and occasionally perverse CSV files, so the file format is more a convention than a standard. Thus you might encounter some files that cannot be imported using this mechanism, and COPY might produce files that other programs cannot process. |
| 23 | + |
| 24 | +Do you want to go there ? If you take the blue pill, then this module might be for you. |
| 25 | + |
| 26 | +It can be used to parse and deparse the PostgreSQL binary streams that are made available by the `pg-copy-streams` module. |
| 27 | + |
| 28 | +## Examples |
| 29 | + |
| 30 | +This library is mostly interesting for ETL operations (Extract, Transformation, Load). When you just need Extract+Load, `pg-copy-streams` does the job and you don't need this library. |
| 31 | + |
| 32 | +So Here is an example of Tranformation where you want to Extract data from database A (dsnA) and move it to database B (dsnB). But there is a twist. |
| 33 | + |
| 34 | +In database A, you have table of items |
| 35 | + |
| 36 | +```sql |
| 37 | +CREATE TABLE item (id serial PRIMARY KEY, ref text, description text); |
| 38 | +INSERT INTO item VALUES ('1:CTX', 'A little item'); |
| 39 | +INSERT INTO item VALUES ('2:CTX', 'A BIG item'); |
| 40 | +``` |
| 41 | + |
| 42 | +Now you realise that the references (ref column) has historically been composed of a unique id followed by a label. So the target for database B would be |
| 43 | + |
| 44 | +```sql |
| 45 | +CREATE TABLE product (code int4 PRIMARY KEY, label text, description text, ts_creation timestamptz, matrix int2[][]); |
| 46 | +``` |
| 47 | + |
| 48 | +Where the refs '1:CTX' is split in (code, label). |
| 49 | +Moreover, all the descriptions are now required to be lowercase, so you would like to clean things up on this field. |
| 50 | +Someone in-the-know has told you that the creation timestamp of the product can be derived from the id ! Simply add `id` days to 1999-01-01T00:00:00Z. |
| 51 | +You also need a int2 2-dim array matrix field filled with [[ id, id+1 ], [ id+2, id+3 ]] because that is what the specification says. |
| 52 | + |
| 53 | +Here is a code that will do just this. |
| 54 | + |
| 55 | +```js |
| 56 | +var pg = require('pg'); |
| 57 | +var copyOut = require('pg-copy-streams').to; |
| 58 | +var copyIn = require('pg-copy-streams').from; |
| 59 | +var parser = require('pg-copy-streams-binary').parser; |
| 60 | +var deparser = require('pg-copy-streams-binary').deparser; |
| 61 | + |
| 62 | +var client = function(dsn) { |
| 63 | + var client = new pg.Client(dsn); |
| 64 | + client.connect(); |
| 65 | + return client; |
| 66 | +} |
| 67 | + |
| 68 | +var dsnA = null; // configure database A connection parameters |
| 69 | +var dsnB = null; // configure database B connection parameters |
| 70 | + |
| 71 | +var clientA = client(dsnA); |
| 72 | +var clientB = client(dsnB); |
| 73 | + |
| 74 | +var AStream = clientA.query(copyOut('COPY item TO STDOUT BINARY')) |
| 75 | +var Parser = new parser({ |
| 76 | + objectMode: true, |
| 77 | + mapping: [ |
| 78 | + { key: 'id', type: 'int4' }, |
| 79 | + { key: 'ref', type: 'text' }, |
| 80 | + { key: 'description', type: 'text'}, |
| 81 | +]}) |
| 82 | +var Deparser = new deparser({ |
| 83 | + objectMode: true, |
| 84 | + mapping: [ |
| 85 | + function(row) { return { type: 'int4', value: parseInt(row.ref.split(':')[0])} }, |
| 86 | + function(row) { return { type: 'text', value: row.ref.split(':')[1] } }, |
| 87 | + function(row) { return { type: 'text', value: row.description.toLowerCase() } }, |
| 88 | + function(row) { |
| 89 | + var d = new Date('1999-01-01T00:00:00Z'); |
| 90 | + var numberOfDaysToAdd = parseInt(row.ref.split(':')[0]); |
| 91 | + d.setDate(d.getDate() + numberOfDaysToAdd); |
| 92 | + return { type: 'timestamptz', value: d } }, |
| 93 | + function(row) { |
| 94 | + var id = parseInt(row.ref.split(':')[0]); |
| 95 | + return { type: '_int2', value: [[id, id+1], [id+2, id+3]]} |
| 96 | + } |
| 97 | +]}) |
| 98 | +var BStream = clientB.query(copyIn ('COPY product FROM STDIN BINARY')) |
| 99 | + |
| 100 | +var runStream = function(callback) { |
| 101 | + // listen for errors |
| 102 | + AStream.on('error', callback) |
| 103 | + Parser.on('error', callback) |
| 104 | + Deparser.on('error', callback) |
| 105 | + BStream.on('error', callback) |
| 106 | + |
| 107 | + // listen for the end of ingestion |
| 108 | + BStream.on('finish', callback); |
| 109 | + AStream.pipe(Parser).pipe(Deparser).pipe(BStream); |
| 110 | +} |
| 111 | + |
| 112 | +runStream(function(err) { |
| 113 | + // done !! |
| 114 | + clientA.end() |
| 115 | + clientB.end() |
| 116 | +}) |
| 117 | + |
| 118 | +``` |
| 119 | + |
| 120 | +The `test/transform.js` test does something along these lines to check that it works. |
| 121 | + |
| 122 | +## API for Deparser |
| 123 | + |
| 124 | +### options.COPY_sendHeader |
| 125 | + |
| 126 | +default: true |
| 127 | +This option can be used to not send the header that PostgreSQL expects at the beginning of a COPY session. |
| 128 | +You could use this if you want to pipe this stream to an already opened COPY session. |
| 129 | + |
| 130 | +### options.COPY_sendTrailer |
| 131 | + |
| 132 | +default: true |
| 133 | +This option can be used to not send the header that PostgreSQL expects at the end of COPY session. |
| 134 | +You could use this if you want to unpipe this stream pipe another one that will send more data and maybe finish the COPY session. |
| 135 | + |
| 136 | +### options.mapping |
| 137 | + |
| 138 | +default: false |
| 139 | +By default, the Deparser expects a stream of arrays. Each array consists of `{ type: type, value: value }` elements. The length of the array MUST be equal to the number of fields in your target database table. The types MUST be identical to the types of the fields in the database (cf the "currently supported types") |
| 140 | + |
| 141 | +This `mapping` option can be used to transform an stream of objects into such an array. mapping MUST be an array of Function elements. The prototype of the function is `function(row)`. The function can do what it wants with the row and output the necessary `{ type: type, value: value }` |
| 142 | + |
| 143 | + |
| 144 | +## API for Parser |
| 145 | + |
| 146 | +### options.mapping |
| 147 | + |
| 148 | +default: false |
| 149 | +This option can be used to describe the rows that are beeing exported from PostgreSQL. Each and every field MUST be described. |
| 150 | +For each index in the array, you MUST put an object `{ key: name, type: type }` where name will be the name given to the field at the corresponding index in the export. Note that it does not need to be equal to the database field name. The COPY protocol does not include the field names. |
| 151 | +The type MUST correspond to the type of the column in the database. It must be a type that is implemented in the library. |
| 152 | + |
| 153 | +the Parser will push rows with the corresponding keys. |
| 154 | + |
| 155 | +When `mapping` is not given, the Parser will push rows as arrays of Buffers. |
| 156 | + |
| 157 | +## Currently supported types |
| 158 | + |
| 159 | +For all supported types, their corresponding array version is also supported. |
| 160 | + |
| 161 | + * bool |
| 162 | + * bytea |
| 163 | + * int2, int4 |
| 164 | + * float4, float8 |
| 165 | + * text |
| 166 | + * json |
| 167 | + * timestamptz |
| 168 | + |
| 169 | +Note that when types are mentioned in the `mapping` option, it should be stricly equal to one of theses types. pgadmin might sometimes mention aliases (like integer instead of int4) and you should not use these aliases. |
| 170 | + |
| 171 | +The types for array (one or more dimentions) corresponds to the type prefixed with an underscore. So an array of int4, int4[], needs to be referenced as _int4 without any mention of the dimensions. This is because the dimension information is embedded in the binary format. |
| 172 | + |
| 173 | + |
| 174 | +## Warnings & Disclaimer |
| 175 | + |
| 176 | +There are many details in the binary protocol, and as usual, the devil is in the details. |
| 177 | + * Currently, operations are considered to happen on table WITHOUT OIDS. Usage on table WITH OIDS has not been tested. |
| 178 | + * In Arrays null placeholders are not implemented (no spot in the array can be empty). |
| 179 | + * In Arrays, the first element of a dimension is always at index 1. |
| 180 | + * Errors handling has not yet been tuned so do not expect explicit error messages |
| 181 | + |
| 182 | + |
| 183 | +The PostgreSQL documentation states it clearly : "a binary-format file is less portable across machine architectures and PostgreSQL versions". |
| 184 | +Tests are trying to discover issues that may appear in between PostgreSQL version but it might not work in your specific environment. |
| 185 | +You are invited to open your debugger and submit a PR ! |
| 186 | + |
| 187 | +Use it at your own risks ! |
| 188 | + |
| 189 | +## External references |
| 190 | + |
| 191 | + * [COPY documentation, including binary format](https://www.postgresql.org/docs/current/static/sql-copy.html) |
| 192 | + * [send/recv implementations for types in PostgreSQL](https://github.com/postgres/postgres/tree/master/src/backend/utils/adt) |
| 193 | + * [default type OIDs in PostgreSQL catalog](https://github.com/postgres/postgres/blob/master/src/include/catalog/pg_type.h) |
| 194 | + |
| 195 | +## Acknowledgments |
| 196 | + |
| 197 | +This would not have been possible without the great work of Brian M. Carlson on the `pg` module and his breakthrough on `pg-copy-streams` showing that node.js can be used as a mediator between COPY-out and COPY-in. Thanks ! |
| 198 | + |
| 199 | +## Licence |
| 200 | + |
| 201 | +The MIT License (MIT) |
| 202 | + |
| 203 | +Copyright (c) 2016 Jérôme WAGNER |
| 204 | + |
| 205 | +Permission is hereby granted, free of charge, to any person obtaining a copy |
| 206 | +of this software and associated documentation files (the "Software"), to deal |
| 207 | +in the Software without restriction, including without limitation the rights |
| 208 | +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 209 | +copies of the Software, and to permit persons to whom the Software is |
| 210 | +furnished to do so, subject to the following conditions: |
| 211 | + |
| 212 | +The above copyright notice and this permission notice shall be included in |
| 213 | +all copies or substantial portions of the Software. |
| 214 | + |
| 215 | +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 216 | +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 217 | +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 218 | +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 219 | +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 220 | +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 221 | +THE SOFTWARE. |
| 222 | + |
| 223 | + |
0 commit comments