1
- var DB = require ( ' sharedb' ) . DB ;
2
- var pg = require ( 'pg' ) ;
1
+ var DB = require ( " sharedb" ) . DB ;
2
+ var pg = require ( "pg" ) ;
3
3
4
4
// Postgres-backed ShareDB database
5
5
@@ -15,22 +15,17 @@ module.exports = PostgresDB;
15
15
16
16
PostgresDB . prototype = Object . create ( DB . prototype ) ;
17
17
18
- PostgresDB . prototype . close = function ( callback ) {
18
+ PostgresDB . prototype . close = function ( callback ) {
19
19
this . closed = true ;
20
20
this . pool . end ( ) ;
21
-
21
+
22
22
if ( callback ) callback ( ) ;
23
23
} ;
24
24
25
- function rollback ( client , done ) {
26
- client . query ( 'ROLLBACK' , function ( err ) {
27
- return done ( err ) ;
28
- } )
29
- }
30
25
31
26
// Persists an op and snapshot if it is for the next version. Calls back with
32
27
// callback(err, succeeded)
33
- PostgresDB . prototype . commit = function ( collection , id , op , snapshot , options , callback ) {
28
+ PostgresDB . prototype . commit = function ( collection , id , op , snapshot , options , callback ) {
34
29
/*
35
30
* op: CreateOp {
36
31
* src: '24545654654646',
@@ -41,121 +36,109 @@ PostgresDB.prototype.commit = function(collection, id, op, snapshot, options, ca
41
36
* }
42
37
* snapshot: PostgresSnapshot
43
38
*/
44
- this . pool . connect ( function ( err , client , done ) {
39
+ this . pool . connect ( ( err , client , done ) => {
45
40
if ( err ) {
46
41
done ( client ) ;
47
42
callback ( err ) ;
48
43
return ;
49
44
}
50
- function commit ( ) {
51
- client . query ( 'COMMIT' , function ( err ) {
52
- done ( err ) ;
53
- if ( err ) {
54
- callback ( err ) ;
55
- } else {
56
- callback ( null , true ) ;
57
- }
58
- } )
59
- }
60
- // ZW: Op versions start from 0, snapshot versions start from 1. To get
61
- // the next snapshot version, we should query the snapshots table, not
62
- // the ops table.
63
- // (cf: the MemoryDB adapter, which uses the number of op rows, rather
64
- // than the highest op version)
65
- client . query (
66
- 'SELECT max(version) AS max_version FROM snapshots WHERE collection = $1 AND doc_id = $2' ,
67
- [ collection , id ] ,
68
- function ( err , res ) {
69
- var max_version = res . rows [ 0 ] . max_version ;
70
- if ( max_version == null )
71
- max_version = 0 ;
72
- if ( snapshot . v !== max_version + 1 ) {
73
- return callback ( null , false ) ;
74
- }
75
- client . query ( 'BEGIN' , function ( err ) {
76
- // ZW: Snapshot version is always 1 above op version. We should
77
- // use op.v here, not snapshot.v
78
- client . query (
79
- 'INSERT INTO ops (collection, doc_id, version, operation) VALUES ($1, $2, $3, $4)' ,
80
- [ collection , id , op . v , op ] ,
81
- function ( err , res ) {
82
- if ( err ) {
83
- // TODO: if err is "constraint violation", callback(null, false) instead
84
- rollback ( client , done ) ;
85
- callback ( err ) ;
86
- return ;
87
- }
88
- if ( snapshot . v === 1 ) {
89
- client . query (
90
- 'INSERT INTO snapshots (collection, doc_id, doc_type, version, data) VALUES ($1, $2, $3, $4, $5)' ,
91
- [ collection , id , snapshot . type , snapshot . v , snapshot . data ] ,
92
- function ( err , res ) {
93
- // TODO:
94
- // if the insert was successful and did insert, callback(null, true)
95
- // if the insert was successful and did not insert, callback(null, false)
96
- // if there was an error, rollback and callback(error)
97
- if ( err ) {
98
- rollback ( client , done ) ;
99
- callback ( err ) ;
100
- return ;
101
- }
102
- commit ( ) ;
103
- }
104
- )
105
- } else {
106
- client . query (
107
- 'UPDATE snapshots SET doc_type = $3, version = $4, data = $5 WHERE collection = $1 AND doc_id = $2 AND version = ($4 - 1)' ,
108
- [ collection , id , snapshot . type , snapshot . v , snapshot . data ] ,
109
- function ( err , res ) {
110
- // TODO:
111
- // if any rows were updated, success
112
- // if 0 rows were updated, rollback and not success
113
- // if error, rollback and not success
114
- if ( err ) {
115
- rollback ( client , done ) ;
116
- callback ( err ) ;
117
- return ;
118
- }
119
- commit ( ) ;
120
- }
121
- )
122
- }
123
- }
124
- )
125
- } )
45
+
46
+ /*
47
+ * This query uses common table expression to upsert the snapshot table
48
+ * (iff the new version is exactly 1 more than the latest table or if
49
+ * the document id does not exists)
50
+ *
51
+ * It will then insert into the ops table if it is exactly 1 more than the
52
+ * latest table or it the first operation and iff the previous insert into
53
+ * the snapshot table is successful.
54
+ *
55
+ * This result of this query the version of the newly inserted operation
56
+ * If either the ops or the snapshot insert fails then 0 rows are returned
57
+ *
58
+ * If 0 zeros are return then the callback must return false
59
+ *
60
+ * Casting is required as postgres thinks that collection and doc_id are
61
+ * not varchar
62
+ */
63
+ const query = {
64
+ name : "sdb-commit-op-and-snap" ,
65
+ text : `WITH snapshot_id AS (
66
+ INSERT INTO snapshots (collection, doc_id, doc_type, version, data)
67
+ SELECT $1::varchar collection, $2::varchar doc_id, $4 doc_type, $3 v, $5 d
68
+ WHERE $3 = (
69
+ SELECT version+1 v
70
+ FROM snapshots
71
+ WHERE collection = $1 AND doc_id = $2
72
+ FOR UPDATE
73
+ ) OR NOT EXISTS (
74
+ SELECT 1
75
+ FROM snapshots
76
+ WHERE collection = $1 AND doc_id = $2
77
+ FOR UPDATE
78
+ )
79
+ ON CONFLICT (collection, doc_id) DO UPDATE SET version = $3, data = $5, doc_type = $4
80
+ RETURNING version
81
+ )
82
+ INSERT INTO ops (collection, doc_id, version, operation)
83
+ SELECT $1::varchar collection, $2::varchar doc_id, $3 v, $6 operation
84
+ WHERE (
85
+ $3 = (
86
+ SELECT max(version)+1
87
+ FROM ops
88
+ WHERE collection = $1 AND doc_id = $2
89
+ ) OR NOT EXISTS (
90
+ SELECT 1
91
+ FROM ops
92
+ WHERE collection = $1 AND doc_id = $2
93
+ )
94
+ ) AND EXISTS (SELECT 1 FROM snapshot_id)
95
+ RETURNING version` ,
96
+ values : [ collection , id , snapshot . v , snapshot . type , snapshot . data , op ]
97
+ } ;
98
+ client . query ( query , ( err , res ) => {
99
+ if ( err ) {
100
+ callback ( err ) ;
101
+ } else if ( res . rows . length === 0 ) {
102
+ done ( client ) ;
103
+ callback ( null , false ) ;
104
+ }
105
+ else {
106
+ done ( client ) ;
107
+ callback ( null , true ) ;
126
108
}
127
- )
128
- } )
109
+ } ) ;
110
+
111
+ } ) ;
129
112
} ;
130
113
131
114
// Get the named document from the database. The callback is called with (err,
132
115
// snapshot). A snapshot with a version of zero is returned if the docuemnt
133
116
// has never been created in the database.
134
- PostgresDB . prototype . getSnapshot = function ( collection , id , fields , options , callback ) {
135
- this . pool . connect ( function ( err , client , done ) {
117
+ PostgresDB . prototype . getSnapshot = function ( collection , id , fields , options , callback ) {
118
+ this . pool . connect ( function ( err , client , done ) {
136
119
if ( err ) {
137
120
done ( client ) ;
138
121
callback ( err ) ;
139
122
return ;
140
123
}
141
124
client . query (
142
- ' SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1' ,
125
+ " SELECT version, data, doc_type FROM snapshots WHERE collection = $1 AND doc_id = $2 LIMIT 1" ,
143
126
[ collection , id ] ,
144
- function ( err , res ) {
127
+ function ( err , res ) {
145
128
done ( ) ;
146
129
if ( err ) {
147
130
callback ( err ) ;
148
131
return ;
149
132
}
150
133
if ( res . rows . length ) {
151
- var row = res . rows [ 0 ]
134
+ var row = res . rows [ 0 ] ;
152
135
var snapshot = new PostgresSnapshot (
153
136
id ,
154
137
row . version ,
155
138
row . doc_type ,
156
139
row . data ,
157
140
undefined // TODO: metadata
158
- )
141
+ ) ;
159
142
callback ( null , snapshot ) ;
160
143
} else {
161
144
var snapshot = new PostgresSnapshot (
@@ -164,12 +147,12 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
164
147
null ,
165
148
undefined ,
166
149
undefined
167
- )
150
+ ) ;
168
151
callback ( null , snapshot ) ;
169
152
}
170
153
}
171
- )
172
- } )
154
+ ) ;
155
+ } ) ;
173
156
} ;
174
157
175
158
// Get operations between [from, to) noninclusively. (Ie, the range should
@@ -181,8 +164,8 @@ PostgresDB.prototype.getSnapshot = function(collection, id, fields, options, cal
181
164
// The version will be inferred from the parameters if it is missing.
182
165
//
183
166
// Callback should be called as callback(error, [list of ops]);
184
- PostgresDB . prototype . getOps = function ( collection , id , from , to , options , callback ) {
185
- this . pool . connect ( function ( err , client , done ) {
167
+ PostgresDB . prototype . getOps = function ( collection , id , from , to , options , callback ) {
168
+ this . pool . connect ( function ( err , client , done ) {
186
169
if ( err ) {
187
170
done ( client ) ;
188
171
callback ( err ) ;
@@ -191,21 +174,21 @@ PostgresDB.prototype.getOps = function(collection, id, from, to, options, callba
191
174
192
175
// ZW: Add explicit row ordering here
193
176
client . query (
194
- ' SELECT version, operation FROM ops WHERE collection = $1 AND doc_id =' +
195
- ' $2 AND version >= $3 AND version < $4 ORDER BY version ASC' ,
177
+ " SELECT version, operation FROM ops WHERE collection = $1 AND doc_id =" +
178
+ " $2 AND version >= $3 AND version < $4 ORDER BY version ASC" ,
196
179
[ collection , id , from , to ] ,
197
- function ( err , res ) {
180
+ function ( err , res ) {
198
181
done ( ) ;
199
182
if ( err ) {
200
183
callback ( err ) ;
201
184
return ;
202
185
}
203
- callback ( null , res . rows . map ( function ( row ) {
186
+ callback ( null , res . rows . map ( function ( row ) {
204
187
return row . operation ;
205
188
} ) ) ;
206
189
}
207
- )
208
- } )
190
+ ) ;
191
+ } ) ;
209
192
} ;
210
193
211
194
function PostgresSnapshot ( id , version , type , data , meta ) {
0 commit comments