Skip to content

Commit b9ba2f9

Browse files
committed
RUBY-716 Add parallel_collection_scan helper
1 parent 449a582 commit b9ba2f9

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

lib/mongo/collection.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,24 @@ def group(opts, condition={}, initial={}, reduce=nil, finalize=nil)
824824
end
825825
end
826826

827+
828+
def parallel_scan(num_cursors, opts={})
829+
cmd = BSON::OrderedHash.new
830+
cmd[:parallelCollectionScan] = self.name
831+
cmd[:numCursors] = num_cursors
832+
result = @db.command(cmd, command_options(opts))
833+
834+
result['cursors'].collect do |cursor_info|
835+
seed = {
836+
:cursor_id => cursor_info['cursor']['id'],
837+
:first_batch => cursor_info['cursor']['firstBatch'],
838+
:pool => @connection.pinned_pool
839+
}
840+
Cursor.new(self, seed.merge!(opts))
841+
end
842+
843+
end
844+
827845
private
828846

829847
def new_group(opts={})

test/functional/collection_test.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,23 @@ def test_map_reduce_with_collection_output_to_other_db
11801180
end
11811181
end
11821182

1183+
if @@version >= '2.5.5'
1184+
def test_parallel_scan
1185+
8000.times { |i| @@test.insert({ :_id => i }) }
1186+
1187+
n_docs = {}
1188+
threads = []
1189+
cursors = @@test.parallel_scan(3)
1190+
cursors.each_with_index do |cursor, i|
1191+
threads << Thread.new do
1192+
n_docs[i] = cursor.to_a.size
1193+
end
1194+
end
1195+
threads.each(&:join)
1196+
assert_equal @@test.count, n_docs.values.inject(0) { |sum, n| sum + n }
1197+
end
1198+
end
1199+
11831200
if @@version > "1.3.0"
11841201
def test_find_and_modify
11851202
@@test << { :a => 1, :processed => false }

0 commit comments

Comments
 (0)