1
1
/*jslint bitwise: true, eqeqeq: true, immed: true, newcap: true, nomen: true, onevar: true, plusplus: true, regexp: true, undef: true, white: true, indent: 2 */
2
2
/*globals include md5 node exports */
3
3
4
- process . mixin ( require ( './postgres-js/bits' ) ) ;
5
4
process . mixin ( require ( './postgres-js/md5' ) ) ;
5
+ var bits = require ( './postgres-js/bits' ) ;
6
6
var tcp = require ( "tcp" ) ;
7
7
var sys = require ( "sys" ) ;
8
8
9
9
exports . DEBUG = 0 ;
10
10
11
- String . prototype . add_header = function ( code ) {
12
- if ( code === undefined ) {
13
- code = "" ;
14
- }
15
- return code . add_int32 ( this . length + 4 ) + this ;
16
- } ;
17
-
18
- Object . prototype . map_pairs = function ( func ) {
19
- var result = [ ] ;
20
- for ( var k in this ) {
21
- if ( this . hasOwnProperty ( k ) ) {
22
- result . push ( func . call ( this , k , this [ k ] ) ) ;
23
- }
24
- }
25
- return result ;
26
- }
27
-
28
11
// http://www.postgresql.org/docs/8.3/static/protocol-message-formats.html
29
12
var formatter = {
30
13
CopyData : function ( ) {
@@ -34,62 +17,65 @@ var formatter = {
34
17
// TODO: implement
35
18
} ,
36
19
Describe : function ( name , type ) {
37
- var stream = [ type . charCodeAt ( 0 ) ] . add_cstring ( name ) ;
38
- return stream . add_header ( 'D' ) ;
20
+ return ( new bits . Encoder ( 'D' ) )
21
+ . push_raw_string ( type )
22
+ . push_cstring ( name ) ;
39
23
} ,
40
24
Execute : function ( name , max_rows ) {
41
- var stream = [ ]
42
- . add_cstring ( name )
43
- . add_int32 ( max_rows ) ;
44
- return stream . add_header ( 'E' ) ;
25
+ return ( new bits . Encoder ( 'E' ) )
26
+ . push_cstring ( name )
27
+ . push_int32 ( max_rows ) ;
45
28
} ,
46
29
Flush : function ( ) {
47
- return [ ] . add_header ( 'H' ) ;
30
+ return new bits . Encoder ( 'H' ) ;
48
31
} ,
49
32
FunctionCall : function ( ) {
50
33
// TODO: implement
51
34
} ,
52
35
Parse : function ( name , query , var_types ) {
53
- var stream = [ ]
54
- . add_cstring ( name )
55
- . add_cstring ( query )
56
- . add_int16 ( var_types . length ) ;
36
+ var builder = ( new bits . Encoder ( 'P' ) )
37
+ . push_cstring ( name )
38
+ . push_cstring ( query )
39
+ . push_int16 ( var_types . length ) ;
57
40
var_types . each ( function ( var_type ) {
58
- stream . add_int32 ( var_type ) ;
41
+ stream . push_int32 ( var_type ) ;
59
42
} ) ;
60
- return stream . add_header ( 'P' ) ;
43
+ return builder ;
61
44
} ,
62
45
PasswordMessage : function ( password ) {
63
- return "" . add_cstring ( password ) . add_header ( 'p' ) ;
46
+ return ( new bits . Encoder ( 'p' ) )
47
+ . push_cstring ( password ) ;
64
48
} ,
65
49
Query : function ( query ) {
66
- return "" . add_cstring ( query ) . add_header ( 'Q' ) ;
50
+ return ( new bits . Encoder ( 'Q' ) )
51
+ . push_cstring ( query ) ;
67
52
} ,
68
53
SSLRequest : function ( ) {
69
- return "" . add_int32 ( 0x4D2162F ) . add_header ( ) ;
54
+ return ( new bits . Encoder ( ) )
55
+ . push_int32 ( 0x4D2162F ) ;
70
56
} ,
71
57
StartupMessage : function ( options ) {
72
58
// Protocol version number 3
73
- return ( "" . add_int32 ( 0x30000 ) +
74
- options . map_pairs ( function ( key , value ) {
75
- return "" . add_cstring ( key ) . add_cstring ( value ) ;
76
- } ) . join ( "" ) + "0" ) . add_header ( ) ;
59
+ return ( new bits . Encoder ( ) )
60
+ . push_int32 ( 0x30000 )
61
+ . push_hash ( options ) ;
77
62
} ,
78
63
Sync : function ( ) {
79
- return [ ] . add_header ( 'S' ) ;
64
+ return new bits . Encoder ( 'S' ) ;
80
65
} ,
81
66
Terminate : function ( ) {
82
- return [ ] . add_header ( 'X' ) ;
67
+ return new bits . Encoder ( 'X' ) ;
83
68
}
84
69
} ;
85
70
86
71
// Parse response streams from the server
87
72
function parse_response ( code , stream ) {
73
+ var input = new bits . Decoder ( stream ) ;
88
74
var type , args ;
89
75
args = [ ] ;
90
76
switch ( code ) {
91
77
case 'R' :
92
- switch ( stream . parse_int32 ( ) [ 1 ] ) {
78
+ switch ( stream . shift_int32 ( ) ) {
93
79
case 0 :
94
80
type = "AuthenticationOk" ;
95
81
break ;
@@ -101,11 +87,11 @@ function parse_response(code, stream) {
101
87
break ;
102
88
case 4 :
103
89
type = "AuthenticationCryptPassword" ;
104
- args = stream . substr ( 4 ) . parse ( [ "raw_string" , 2 ] ) [ 1 ] ;
90
+ args = [ stream . shift_raw_string ( 2 ) ] ;
105
91
break ;
106
92
case 5 :
107
93
type = "AuthenticationMD5Password" ;
108
- args = stream . substr ( 4 ) . parse ( [ "raw_string" , 4 ] ) [ 1 ] ;
94
+ args = [ stream . shift_raw_string ( 4 ) ] ;
109
95
break ;
110
96
case 6 :
111
97
type = "AuthenticationSCMCredential" ;
@@ -122,62 +108,56 @@ function parse_response(code, stream) {
122
108
case 'E' :
123
109
type = "ErrorResponse" ;
124
110
args = [ { } ] ;
125
- stream . parse ( "multi_cstring" ) [ 1 ] [ 0 ] . forEach ( function ( field ) {
111
+ stream . shift_multi_cstring ( ) . forEach ( function ( field ) {
126
112
args [ 0 ] [ field [ 0 ] ] = field . substr ( 1 ) ;
127
113
} ) ;
128
114
break ;
129
115
case 'S' :
130
116
type = "ParameterStatus" ;
131
- args = stream . parse ( "cstring" , "cstring" ) [ 1 ] ;
117
+ args = [ stream . shift_cstring ( ) , stream . shift_cstring ( ) ] ;
132
118
break ;
133
119
case 'K' :
134
120
type = "BackendKeyData" ;
135
- args = stream . parse ( "int32" , "int32" ) [ 1 ] ;
121
+ args = [ stream . shift_int32 ( ) , stream . shift_int32 ( ) ] ;
136
122
break ;
137
123
case 'Z' :
138
124
type = "ReadyForQuery" ;
139
- args = stream . parse ( [ "raw_string" , 1 ] ) [ 1 ] ;
125
+ args = [ stream . shift_raw_string ( 1 ) ] ;
140
126
break ;
141
127
case 'T' :
142
128
type = "RowDescription" ;
143
- var num_fields = stream . parse_int16 ( ) [ 1 ] ;
144
- stream = stream . substr ( 2 ) ;
129
+ var num_fields = stream . shift_int16 ( ) ;
145
130
var row = [ ] ;
146
131
for ( var i = 0 ; i < num_fields ; i += 1 ) {
147
- var parts = stream . parse ( "cstring" , "int32" , "int16" , "int32" , "int16" , "int32" , "int16" ) ;
148
132
row . push ( {
149
- field : parts [ 1 ] [ 0 ] ,
150
- table_id : parts [ 1 ] [ 1 ] ,
151
- column_id : parts [ 1 ] [ 2 ] ,
152
- type_id : parts [ 1 ] [ 3 ] ,
153
- type_size : parts [ 1 ] [ 4 ] ,
154
- type_modifier : parts [ 1 ] [ 5 ] ,
155
- format_code : parts [ 1 ] [ 6 ]
133
+ field : stream . shift_cstring ( ) ,
134
+ table_id : stream . shift_int32 ( ) ,
135
+ column_id : stream . shift_int16 ( ) ,
136
+ type_id : stream . shift_int32 ( ) ,
137
+ type_size : stream . shift_int16 ( ) ,
138
+ type_modifier : stream . shift_int32 ( ) ,
139
+ format_code : stream . shift_int16 ( )
156
140
} ) ;
157
- stream = stream . substr ( parts [ 0 ] ) ;
158
141
}
159
142
args = [ row ] ;
160
143
break ;
161
144
case 'D' :
162
145
type = "DataRow" ;
163
146
var data = [ ] ;
164
- var num_cols = stream . parse_int16 ( ) [ 1 ] ;
165
- stream = stream . substr ( 2 ) ;
147
+ var num_cols = stream . shift_int16 ( ) ;
166
148
for ( i = 0 ; i < num_cols ; i += 1 ) {
167
- var size = stream . parse_int32 ( ) [ 1 ] ;
168
- stream = stream . substr ( 4 ) ;
149
+ var size = stream . shift_int32 ( ) ;
169
150
if ( size === - 1 ) {
170
151
data . push ( null ) ;
171
152
} else {
172
- data . push ( stream . parse_raw_string ( size ) [ 1 ] ) ;
173
- stream = stream . substr ( size ) ;
153
+ data . push ( stream . shift_raw_string ( size ) ) ;
174
154
}
175
155
}
176
156
args = [ data ] ;
177
157
break ;
178
158
case 'C' :
179
159
type = "CommandComplete" ;
180
- args = stream . parse ( "cstring" ) [ 1 ] ;
160
+ args = [ stream . shift_cstring ( ) ] ;
181
161
break ;
182
162
}
183
163
if ( ! type ) {
@@ -205,7 +185,7 @@ exports.Connection = function (database, username, password, port) {
205
185
206
186
// Sends a message to the postgres server
207
187
function sendMessage ( type , args ) {
208
- var stream = formatter [ type ] . apply ( this , args ) ;
188
+ var stream = ( formatter [ type ] . apply ( this , args ) ) . toString ( ) ;
209
189
if ( exports . DEBUG > 0 ) {
210
190
sys . debug ( "Sending " + type + ": " + JSON . stringify ( args ) ) ;
211
191
if ( exports . DEBUG > 2 ) {
@@ -221,16 +201,15 @@ exports.Connection = function (database, username, password, port) {
221
201
sendMessage ( 'StartupMessage' , [ { user : username , database : database } ] ) ;
222
202
} ) ;
223
203
connection . addListener ( "receive" , function ( data ) {
224
-
204
+ var input = new bits . Decoder ( data ) ;
225
205
if ( exports . DEBUG > 2 ) {
226
206
sys . debug ( "<-" + JSON . stringify ( data ) ) ;
227
207
}
228
208
229
- while ( data . length > 0 ) {
230
- var code = data [ 0 ] ;
231
- var len = data . substr ( 1 , 4 ) . parse_int32 ( ) [ 1 ] ;
232
- var stream = data . substr ( 5 , len - 4 ) ;
233
- data = data . substr ( len + 1 ) ;
209
+ while ( input . data . length > 0 ) {
210
+ var code = input . shift_code ( ) ;
211
+ var len = input . shift_int32 ( ) ;
212
+ var stream = new bits . Decoder ( input . shift_raw_string ( len - 4 ) ) ;
234
213
if ( exports . DEBUG > 1 ) {
235
214
sys . debug ( "stream: " + code + " " + JSON . stringify ( stream ) ) ;
236
215
}
0 commit comments