Skip to content

Commit 503054c

Browse files
committed
Add max size limit to requests for bulk import
This commit adds a new parameter `max_size`, in bytes, which is used to enforce an upper limit on the overall HTTP POST size. This is useful when trying to maximize bulk import speed by reducing roundtrips to retrieve and send data. This is needed for scenarios where there is no control over Elasticsearch's maximum HTTP request payload size. For example, AWS' elasticsearch offering has either a 10MiB or 100MiB HTTP request payload size limit. `batch_size` is good for bounding local runtime memory usage, but when indexing large sets of big objects, it's entirely possible to hit a service provider's underlying request size limit and biff the import mid-run. This is even worse when `force` is true - then the index is left in an incomplete state with no obvious value to adjust batch_size down to, in order to sneak under the limit. The `max_size` defaults to `10_000_000`, to catch the worst-case scenario on AWS.
1 parent 939bf5d commit 503054c

File tree

1 file changed

+41
-13
lines changed

1 file changed

+41
-13
lines changed

elasticsearch-model/lib/elasticsearch/model/importing.rb

+41-13
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ def import(options={}, &block)
145145
transform = options.delete(:transform) || __transform
146146
pipeline = options.delete(:pipeline)
147147
return_value = options.delete(:return) || 'count'
148+
max_size = options.delete(:max_size) || 10_000_000
148149

149150
unless transform.respond_to?(:call)
150151
raise ArgumentError,
@@ -159,19 +160,46 @@ def import(options={}, &block)
159160
end
160161

161162
__find_in_batches(options) do |batch|
162-
params = {
163-
index: target_index,
164-
type: target_type,
165-
body: __batch_to_bulk(batch, transform)
166-
}
167-
168-
params[:pipeline] = pipeline if pipeline
169-
170-
response = client.bulk params
171-
172-
yield response if block_given?
173-
174-
errors += response['items'].select { |k, v| k.values.first['error'] }
163+
batch = __batch_to_bulk(batch, transform)
164+
165+
until batch.empty?
166+
todo = []
167+
size = 0
168+
169+
# Accumulate until we hit max size
170+
until size > max_size or batch.empty?
171+
todo.push batch.shift
172+
size += todo.last.to_s.size
173+
end
174+
175+
# Put back last one if we went over
176+
if size > max_size
177+
batch.push todo.pop
178+
size -= batch.last.to_s.size
179+
end
180+
181+
# If we got here with nothing to do, we put our only todo back
182+
# because it was too big - error.
183+
if todo.empty?
184+
item = batch.last
185+
raise RuntimeError,
186+
"#{target} #{item[:index][:_id]} size #{item.to_s.size} is larger than max_size #{max_size}"
187+
end
188+
189+
params = {
190+
index: target_index,
191+
type: target_type,
192+
body: todo
193+
}
194+
195+
params[:pipeline] = pipeline if pipeline
196+
197+
response = client.bulk params
198+
199+
yield response if block_given?
200+
201+
errors += response['items'].select { |k, v| k.values.first['error'] }
202+
end
175203
end
176204

177205
self.refresh_index! index: target_index if refresh

0 commit comments

Comments
 (0)