Skip to content

Commit 6ed6fae

Browse files
committed
Fix parser for rows spanning several chunks / compatibility with pg-copy-stream 3.0+
1 parent 03ff765 commit 6ed6fae

File tree

1 file changed

+79
-50
lines changed

1 file changed

+79
-50
lines changed

lib/parser.js

Lines changed: 79 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ module.exports = function (txt, options) {
1111
const { Transform } = require('stream')
1212
const BP = require('bufferput')
1313
const { parse } = require('./pg_types')
14+
const BufferList = require('bl/BufferList')
15+
16+
const PG_HEADER = 0
17+
const PG_ROW_START = 1
18+
const PG_FIELD_START = 2
19+
const PG_FIELD_DATA = 3
20+
const PG_FIELD_END = 4
21+
const PG_TRAILER = 5
1422

1523
class CopyStream extends Transform {
1624
constructor(options) {
@@ -24,75 +32,96 @@ class CopyStream extends Transform {
2432
.word32be(0)
2533
.buffer()
2634

27-
this.COPYTrailer = Buffer.from([0xff, 0xff])
28-
29-
this._headerReceived = false
30-
this._trailerReceived = false
31-
this._remainder = false
35+
this._buffer = new BufferList()
36+
this._state = PG_HEADER
37+
this._row = null
38+
this._fieldCount = null
39+
this._fieldIndex = null
40+
this._fieldLength = null
41+
this._fieldBuffer = null
3242

3343
this.mapping = options.mapping || false
3444
}
3545

3646
_transform(chunk, enc, cb) {
37-
if (this._remainder && chunk) {
38-
chunk = Buffer.concat([this._remainder, chunk])
39-
}
47+
this._buffer.append(chunk)
48+
while (this._buffer.length > 0) {
49+
if (PG_HEADER === this._state) {
50+
if (this._buffer.length < this.COPYHeaderFull.length) break
51+
if (!this.COPYHeaderFull.equals(this._buffer.slice(0, this.COPYHeaderFull.length))) {
52+
return cb(new Error('COPY BINARY Header mismatch'))
53+
}
54+
this._buffer.consume(this.COPYHeaderFull.length)
55+
this._state = PG_ROW_START
56+
}
4057

41-
let offset = 0
42-
if (!this._headerReceived && chunk.length >= this.COPYHeaderFull.length) {
43-
if (this.COPYHeaderFull.equals(chunk.slice(0, this.COPYHeaderFull.length))) {
44-
this._headerReceived = true
45-
offset += this.COPYHeaderFull.length
58+
if (PG_ROW_START === this._state) {
59+
if (this._buffer.length < 2) break
60+
this._fieldCount = this._buffer.readUInt16BE(0)
61+
this._buffer.consume(2)
62+
const UInt16_0xffff = 65535
63+
if (this._fieldCount === UInt16_0xffff) {
64+
this._state = PG_TRAILER
65+
} else {
66+
this._row = this.mapping ? {} : []
67+
this._state = PG_FIELD_START
68+
this._fieldIndex = -1
69+
}
4670
}
47-
}
4871

49-
// Copy-out mode (data transfer from the server) is initiated when the backend executes a COPY TO STDOUT SQL statement.
50-
// The backend sends a CopyOutResponse message to the frontend, followed by zero or more CopyData messages (always one per row)
51-
const UInt16Len = 2
52-
while (this._headerReceived && chunk.length - offset >= UInt16Len) {
53-
const fieldCount = chunk.readUInt16BE(offset)
54-
offset += 2
55-
const UInt32Len = 4
56-
const UInt16_0xff = 65535
57-
const UInt32_0xffffffff = 4294967295
58-
if (fieldCount === UInt16_0xff) {
59-
this._trailerReceived = true
72+
if (PG_TRAILER === this._state) {
6073
this.push(null)
74+
this._row = null
75+
this._fieldBuffer = null
6176
return cb()
6277
}
63-
const fields = this.mapping ? {} : []
64-
for (let i = 0; i < fieldCount; i++) {
65-
let v
66-
const fieldLen = chunk.readUInt32BE(offset)
67-
offset += UInt32Len
68-
if (fieldLen === UInt32_0xffffffff) {
69-
v = null
78+
79+
if (PG_FIELD_START === this._state) {
80+
if (this._buffer.length < 4) break
81+
this._fieldIndex++
82+
this._fieldLength = this._buffer.readUInt32BE(0)
83+
this._buffer.consume(4)
84+
const UInt32_0xffffffff = 4294967295 /* Magic value for NULL */
85+
if (this._fieldLength === UInt32_0xffffffff) {
86+
this._fieldBuffer = null
87+
this._fieldLength = 0
88+
this._state = PG_FIELD_END
7089
} else {
71-
v = chunk.slice(offset, offset + fieldLen)
72-
if (this.mapping) {
73-
v = parse(v, this.mapping[i].type)
74-
}
75-
offset += fieldLen
90+
this._fieldBuffer = new BufferList()
91+
this._state = PG_FIELD_DATA
92+
}
93+
}
94+
95+
if (PG_FIELD_DATA === this._state) {
96+
if (this._buffer.length === 0) break
97+
const bl = this._buffer.shallowSlice(0, this._fieldLength)
98+
this._fieldBuffer.append(bl)
99+
this._fieldLength -= bl.length
100+
this._buffer.consume(bl.length)
101+
if (this._fieldLength === 0) {
102+
this._state = PG_FIELD_END
103+
}
104+
}
105+
106+
if (PG_FIELD_END === this._state) {
107+
if (this._fieldBuffer && this.mapping) {
108+
this._fieldBuffer = parse(this._fieldBuffer.slice(), this.mapping[this._fieldIndex].type)
76109
}
77110
if (this.mapping) {
78-
fields[this.mapping[i].key] = v
111+
this._row[this.mapping[this._fieldIndex].key] = this._fieldBuffer
79112
} else {
80-
fields.push(v)
113+
this._row.push(this._fieldBuffer)
81114
}
82-
}
83-
this.push(fields)
84-
}
85115

86-
if (chunk.length - offset) {
87-
const slice = chunk.slice(offset)
88-
this._remainder = slice
89-
} else {
90-
this._remainder = false
116+
this._state = PG_FIELD_START
117+
118+
if (this._fieldIndex === this._fieldCount - 1) {
119+
this.push(this._row)
120+
this._state = PG_ROW_START
121+
}
122+
}
91123
}
92-
cb()
93-
}
94124

95-
_flush(cb) {
96125
cb()
97126
}
98127
}

0 commit comments

Comments
 (0)