@@ -13,8 +13,6 @@ import (
13
13
"context"
14
14
"expvar"
15
15
"fmt"
16
- "math"
17
- "time"
18
16
19
17
sgbucket "github.com/couchbase/sg-bucket"
20
18
)
@@ -123,12 +121,7 @@ type LeakyBucketConfig struct {
123
121
DDocDeleteErrorCount int
124
122
DDocGetErrorCount int
125
123
126
- // Emulate TAP/DCP feed de-dupliation behavior, such that within a
127
- // window of # of mutations or a timeout, mutations for a given document
128
- // will be filtered such that only the _latest_ mutation will make it through.
129
- TapFeedDeDuplication bool
130
- TapFeedVbuckets bool // Emulate vbucket numbers on feed
131
- TapFeedMissingDocs []string // Emulate entry not appearing on tap feed
124
+ DCPFeedMissingDocs []string // Emulate entry not appearing on DCP feed
132
125
133
126
ForceErrorSetRawKeys []string // Issuing a SetRaw call with a specified key will return an error
134
127
@@ -161,209 +154,17 @@ type LeakyBucketConfig struct {
161
154
IgnoreClose bool
162
155
}
163
156
164
- func (b * LeakyBucket ) StartTapFeed (args sgbucket.FeedArguments , dbStats * expvar.Map ) (sgbucket.MutationFeed , error ) {
165
-
166
- if b .config .TapFeedDeDuplication {
167
- return b .wrapFeedForDeduplication (args , dbStats )
168
- } else if len (b .config .TapFeedMissingDocs ) > 0 {
169
- callback := func (event * sgbucket.FeedEvent ) bool {
170
- for _ , key := range b .config .TapFeedMissingDocs {
157
+ func (b * LeakyBucket ) StartDCPFeed (ctx context.Context , args sgbucket.FeedArguments , callback sgbucket.FeedEventCallbackFunc , dbStats * expvar.Map ) error {
158
+ if len (b .config .DCPFeedMissingDocs ) > 0 {
159
+ wrappedCallback := func (event sgbucket.FeedEvent ) bool {
160
+ for _ , key := range b .config .DCPFeedMissingDocs {
171
161
if string (event .Key ) == key {
172
162
return false
173
163
}
174
164
}
175
- return true
176
- }
177
- return b .wrapFeed (args , callback , dbStats )
178
- } else if b .config .TapFeedVbuckets {
179
- // kick off the wrapped sgbucket tap feed
180
- walrusTapFeed , err := b .bucket .StartTapFeed (args , dbStats )
181
- if err != nil {
182
- return walrusTapFeed , err
183
- }
184
- // this is the sgbucket.MutationFeed impl we'll return to callers, which
185
- // will add vbucket information
186
- channel := make (chan sgbucket.FeedEvent , 10 )
187
- vbTapFeed := & wrappedTapFeedImpl {
188
- channel : channel ,
189
- wrappedTapFeed : walrusTapFeed ,
165
+ return callback (event )
190
166
}
191
- go func () {
192
- for event := range walrusTapFeed .Events () {
193
- key := string (event .Key )
194
- event .VbNo = uint16 (sgbucket .VBHash (key , 1024 ))
195
- vbTapFeed .channel <- event
196
- }
197
- close (vbTapFeed .channel )
198
- }()
199
- return vbTapFeed , nil
200
-
201
- } else {
202
- return b .bucket .StartTapFeed (args , dbStats )
167
+ return b .bucket .StartDCPFeed (ctx , args , wrappedCallback , dbStats )
203
168
}
204
-
205
- }
206
-
207
- func (b * LeakyBucket ) StartDCPFeed (ctx context.Context , args sgbucket.FeedArguments , callback sgbucket.FeedEventCallbackFunc , dbStats * expvar.Map ) error {
208
169
return b .bucket .StartDCPFeed (ctx , args , callback , dbStats )
209
170
}
210
-
211
- type EventUpdateFunc func (event * sgbucket.FeedEvent ) bool
212
-
213
- func (b * LeakyBucket ) wrapFeed (args sgbucket.FeedArguments , callback EventUpdateFunc , dbStats * expvar.Map ) (sgbucket.MutationFeed , error ) {
214
-
215
- // kick off the wrapped sgbucket tap feed
216
- walrusTapFeed , err := b .bucket .StartTapFeed (args , dbStats )
217
- if err != nil {
218
- return walrusTapFeed , err
219
- }
220
-
221
- // create an output channel
222
- channel := make (chan sgbucket.FeedEvent , 10 )
223
-
224
- // this is the sgbucket.MutationFeed impl we'll return to callers, which
225
- // will have missing entries
226
- wrapperFeed := & wrappedTapFeedImpl {
227
- channel : channel ,
228
- wrappedTapFeed : walrusTapFeed ,
229
- }
230
-
231
- go func () {
232
- for event := range walrusTapFeed .Events () {
233
- // Callback returns false if the event should be skipped
234
- if callback (& event ) {
235
- wrapperFeed .channel <- event
236
- }
237
- }
238
- close (wrapperFeed .channel )
239
- }()
240
- return wrapperFeed , nil
241
- }
242
-
243
- func (b * LeakyBucket ) wrapFeedForDeduplication (args sgbucket.FeedArguments , dbStats * expvar.Map ) (sgbucket.MutationFeed , error ) {
244
- // create an output channel
245
- // start a goroutine which reads off the sgbucket tap feed
246
- // - de-duplicate certain events
247
- // - puts them to output channel
248
-
249
- // the number of changes that it will buffer up before de-duplicating
250
- deDuplicationWindowSize := 5
251
-
252
- // the timeout window in milliseconds after which it will flush to output, even if
253
- // the dedupe buffer has not filled up yet.
254
- deDuplicationTimeoutMs := time .Millisecond * 1000
255
-
256
- // kick off the wrapped sgbucket tap feed
257
- walrusTapFeed , err := b .bucket .StartTapFeed (args , dbStats )
258
- if err != nil {
259
- return walrusTapFeed , err
260
- }
261
-
262
- // create an output channel for de-duplicated events
263
- channel := make (chan sgbucket.FeedEvent , 10 )
264
-
265
- // this is the sgbucket.MutationFeed impl we'll return to callers, which
266
- // will reead from the de-duplicated events channel
267
- dupeTapFeed := & wrappedTapFeedImpl {
268
- channel : channel ,
269
- wrappedTapFeed : walrusTapFeed ,
270
- }
271
-
272
- go func () {
273
- defer close (dupeTapFeed .channel )
274
- // the buffer to hold tap events that are candidates for de-duplication
275
- deDupeBuffer := []sgbucket.FeedEvent {}
276
-
277
- timer := time .NewTimer (math .MaxInt64 )
278
- for {
279
- select {
280
- case tapEvent , ok := <- walrusTapFeed .Events ():
281
- if ! ok {
282
- // channel closed, goroutine is done
283
- // dedupe and send what we currently have
284
- dedupeAndForward (deDupeBuffer , channel )
285
- return
286
- }
287
- deDupeBuffer = append (deDupeBuffer , tapEvent )
288
-
289
- // if we've collected enough, dedeupe and send what we have,
290
- // and reset buffer.
291
- if len (deDupeBuffer ) >= deDuplicationWindowSize {
292
- dedupeAndForward (deDupeBuffer , channel )
293
- deDupeBuffer = []sgbucket.FeedEvent {}
294
- } else {
295
- _ = timer .Reset (deDuplicationTimeoutMs )
296
- }
297
-
298
- case <- timer .C :
299
- if len (deDupeBuffer ) > 0 {
300
- // give up on waiting for the buffer to fill up,
301
- // de-dupe and send what we currently have
302
- dedupeAndForward (deDupeBuffer , channel )
303
- deDupeBuffer = []sgbucket.FeedEvent {}
304
- }
305
- }
306
- }
307
-
308
- }()
309
- return dupeTapFeed , nil
310
- }
311
-
312
- // An implementation of a sgbucket tap feed that wraps
313
- // tap events on the upstream tap feed to better emulate real world
314
- // TAP/DCP behavior.
315
- type wrappedTapFeedImpl struct {
316
- channel chan sgbucket.FeedEvent
317
- wrappedTapFeed sgbucket.MutationFeed
318
- }
319
-
320
- func (feed * wrappedTapFeedImpl ) Close () error {
321
- return feed .wrappedTapFeed .Close ()
322
- }
323
-
324
- func (feed * wrappedTapFeedImpl ) Events () <- chan sgbucket.FeedEvent {
325
- return feed .channel
326
- }
327
-
328
- func (feed * wrappedTapFeedImpl ) WriteEvents () chan <- sgbucket.FeedEvent {
329
- return feed .channel
330
- }
331
-
332
- func dedupeAndForward (tapEvents []sgbucket.FeedEvent , destChannel chan <- sgbucket.FeedEvent ) {
333
-
334
- deduped := dedupeTapEvents (tapEvents )
335
-
336
- for _ , tapEvent := range deduped {
337
- destChannel <- tapEvent
338
- }
339
-
340
- }
341
-
342
- func dedupeTapEvents (tapEvents []sgbucket.FeedEvent ) []sgbucket.FeedEvent {
343
-
344
- // For each document key, keep track of the latest seen tapEvent
345
- // doc1 -> tapEvent with Seq=1
346
- // doc2 -> tapEvent with Seq=5
347
- // (if tapEvent with Seq=7 comes in for doc1, it will clobber existing)
348
- latestTapEventPerKey := map [string ]sgbucket.FeedEvent {}
349
-
350
- for _ , tapEvent := range tapEvents {
351
- key := string (tapEvent .Key )
352
- latestTapEventPerKey [key ] = tapEvent
353
- }
354
-
355
- // Iterate over the original tapEvents, and only keep what
356
- // is in latestTapEventPerKey, and discard all previous mutations
357
- // of that doc. This will preserve the original
358
- // sequence order as read off the feed.
359
- deduped := []sgbucket.FeedEvent {}
360
- for _ , tapEvent := range tapEvents {
361
- latestTapEventForKey := latestTapEventPerKey [string (tapEvent .Key )]
362
- if tapEvent .Cas == latestTapEventForKey .Cas {
363
- deduped = append (deduped , tapEvent )
364
- }
365
- }
366
-
367
- return deduped
368
-
369
- }
0 commit comments