@@ -77,9 +77,10 @@ class ChangeStream < Aggregation
77
77
# on new documents to satisfy a change stream query.
78
78
# @option options [ Integer ] :batch_size The number of documents to return per batch.
79
79
# @option options [ BSON::Document, Hash ] :collation The collation to use.
80
- # @option options [ BSON::Timestamp ] :start_at_cluster_time Only return changes that occurred
81
- # after the specified timestamp. Any command run against the server will return a cluster time
82
- # that can be used here. Only valid in server versions 4.0+.
80
+ # @option options [ BSON::Timestamp ] :start_at_operation_time Only
81
+ # return changes that occurred at or after the specified timestamp. Any
82
+ # command run against the server will return a cluster time that can
83
+ # be used here. Only recognized by server versions 4.0+.
83
84
#
84
85
# @since 2.5.0
85
86
def initialize ( view , pipeline , changes_for , options = { } )
@@ -88,11 +89,19 @@ def initialize(view, pipeline, changes_for, options = {})
88
89
@change_stream_filters = pipeline && pipeline . dup
89
90
@options = options && options . dup . freeze
90
91
@resume_token = @options [ :resume_after ]
91
- read_with_one_retry { create_cursor! }
92
+ create_cursor!
93
+
94
+ # We send different parameters when we resume a change stream
95
+ # compared to when we send the first query
96
+ @resuming = true
92
97
end
93
98
94
99
# Iterate through documents returned by the change stream.
95
100
#
101
+ # This method retries once per error on resumable errors
102
+ # (two consecutive errors result in the second error being raised,
103
+ # an error which is recovered from resets the error count to zero).
104
+ #
96
105
# @example Iterate through the stream of documents.
97
106
# stream.each do |document|
98
107
# p document
@@ -105,20 +114,82 @@ def initialize(view, pipeline, changes_for, options = {})
105
114
# @yieldparam [ BSON::Document ] Each change stream document.
106
115
def each
107
116
raise StopIteration . new if closed?
117
+ retried = false
108
118
begin
109
119
@cursor . each do |doc |
110
120
cache_resume_token ( doc )
111
121
yield doc
112
122
end if block_given?
113
123
@cursor . to_enum
114
- rescue => e
124
+ rescue Mongo ::Error => e
125
+ if retried || !e . change_stream_resumable?
126
+ raise
127
+ end
128
+
129
+ retried = true
130
+ # Rerun initial aggregation.
131
+ # Any errors here will stop iteration and break out of this
132
+ # method
115
133
close
116
- if retryable? ( e )
134
+ create_cursor!
135
+ retry
136
+ end
137
+ end
138
+
139
+ # Return one document from the change stream, if one is available.
140
+ #
141
+ # Retries once on a resumable error.
142
+ #
143
+ # Raises StopIteration if the change stream is closed.
144
+ #
145
+ # This method will wait up to max_await_time_ms milliseconds
146
+ # for changes from the server, and if no changes are received
147
+ # it will return nil.
148
+ #
149
+ # @note This method is experimental and subject to change.
150
+ #
151
+ # @return [ BSON::Document | nil ] A change stream document.
152
+ # @api private
153
+ def try_next
154
+ raise StopIteration . new if closed?
155
+ retried = false
156
+
157
+ begin
158
+ doc = @cursor . try_next
159
+ rescue Mongo ::Error => e
160
+ unless e . change_stream_resumable?
161
+ raise
162
+ end
163
+
164
+ if retried
165
+ # Rerun initial aggregation.
166
+ # Any errors here will stop iteration and break out of this
167
+ # method
168
+ close
117
169
create_cursor!
170
+ retried = false
171
+ else
172
+ # Attempt to retry a getMore once
173
+ retried = true
118
174
retry
119
175
end
120
- raise
121
176
end
177
+
178
+ if doc
179
+ cache_resume_token ( doc )
180
+ end
181
+ doc
182
+ end
183
+
184
+ def to_enum
185
+ enum = super
186
+ enum . send ( :instance_variable_set , '@obj' , self )
187
+ class << enum
188
+ def try_next
189
+ @obj . try_next
190
+ end
191
+ end
192
+ enum
122
193
end
123
194
124
195
# Close the change stream.
@@ -176,15 +247,30 @@ def for_collection?
176
247
end
177
248
178
249
def cache_resume_token ( doc )
250
+ # Always record both resume token and operation time,
251
+ # in case we get an older or newer server during rolling
252
+ # upgrades/downgrades
179
253
unless @resume_token = ( doc [ :_id ] && doc [ :_id ] . dup )
180
- raise Error ::MissingResumeToken . new
254
+ raise Error ::MissingResumeToken
181
255
end
182
256
end
183
257
184
258
def create_cursor!
259
+ # clear the cache because we may get a newer or an older server
260
+ # (rolling upgrades)
261
+ @start_at_operation_time_supported = nil
262
+
185
263
session = client . send ( :get_session , @options )
186
264
server = server_selector . select_server ( cluster )
187
265
result = send_initial_query ( server , session )
266
+ if doc = result . replies . first && result . replies . first . documents . first
267
+ @start_at_operation_time = doc [ 'operationTime' ]
268
+ else
269
+ # The above may set @start_at_operation_time to nil
270
+ # if it was not in the document for some reason,
271
+ # for consistency set it to nil here as well
272
+ @start_at_operation_time = nil
273
+ end
188
274
@cursor = Cursor . new ( view , result , server , disable_retry : true , session : session )
189
275
end
190
276
@@ -200,6 +286,32 @@ def aggregate_spec(session)
200
286
201
287
def change_doc
202
288
{ fullDocument : ( @options [ :full_document ] || FULL_DOCUMENT_DEFAULT ) } . tap do |doc |
289
+ if resuming?
290
+ # We have a resume token once we retrieved any documents.
291
+ # However, if the first getMore fails and the user didn't pass
292
+ # a resume token we won't have a resume token to use.
293
+ # Use start_at_operation time in this case
294
+ if @resume_token
295
+ # Spec says we need to remove startAtOperationTime if
296
+ # one was passed in by user, thus we won't forward it
297
+ elsif start_at_operation_time_supported? && @start_at_operation_time
298
+ # It is crucial to check @start_at_operation_time_supported
299
+ # here - we may have switched to an older server that
300
+ # does not support operation times and therefore shouldn't
301
+ # try to send one to it!
302
+ #
303
+ # @start_at_operation_time is already a BSON::Timestamp
304
+ doc [ :startAtOperationTime ] = @start_at_operation_time
305
+ else
306
+ # Can't resume if we don't have either
307
+ raise Mongo ::Error ::MissingResumeToken
308
+ end
309
+ else
310
+ if options [ :start_at_operation_time ]
311
+ doc [ :startAtOperationTime ] = time_to_bson_timestamp (
312
+ options [ :start_at_operation_time ] )
313
+ end
314
+ end
203
315
doc [ :resumeAfter ] = @resume_token if @resume_token
204
316
doc [ :allChangesForCluster ] = true if for_cluster?
205
317
end
@@ -208,6 +320,29 @@ def change_doc
208
320
def send_initial_query ( server , session )
209
321
initial_query_op ( session ) . execute ( server )
210
322
end
323
+
324
+ def time_to_bson_timestamp ( time )
325
+ if time . is_a? ( Time )
326
+ seconds = time . to_f
327
+ BSON ::Timestamp . new ( seconds . to_i , ( ( seconds - seconds . to_i ) * 1000000 ) . to_i )
328
+ elsif time . is_a? ( BSON ::Timestamp )
329
+ time
330
+ else
331
+ raise ArgumentError , 'Time must be a Time or a BSON::Timestamp instance'
332
+ end
333
+ end
334
+
335
+ def resuming?
336
+ !!@resuming
337
+ end
338
+
339
+ def start_at_operation_time_supported?
340
+ if @start_at_operation_time_supported . nil?
341
+ server = server_selector . select_server ( cluster )
342
+ @start_at_operation_time_supported = server . description . max_wire_version >= 7
343
+ end
344
+ @start_at_operation_time_supported
345
+ end
211
346
end
212
347
end
213
348
end
0 commit comments