diff --git a/elasticsearch-model/lib/elasticsearch/model/importing.rb b/elasticsearch-model/lib/elasticsearch/model/importing.rb index 927dd16e..5e525537 100644 --- a/elasticsearch-model/lib/elasticsearch/model/importing.rb +++ b/elasticsearch-model/lib/elasticsearch/model/importing.rb @@ -145,6 +145,7 @@ def import(options={}, &block) transform = options.delete(:transform) || __transform pipeline = options.delete(:pipeline) return_value = options.delete(:return) || 'count' + max_size = options.delete(:max_size) || 10_000_000 unless transform.respond_to?(:call) raise ArgumentError, @@ -159,19 +160,46 @@ def import(options={}, &block) end __find_in_batches(options) do |batch| - params = { - index: target_index, - type: target_type, - body: __batch_to_bulk(batch, transform) - } - - params[:pipeline] = pipeline if pipeline - - response = client.bulk params - - yield response if block_given? - - errors += response['items'].select { |k, v| k.values.first['error'] } + batch = __batch_to_bulk(batch, transform) + + until batch.empty? + todo = [] + size = 0 + + # Accumulate until we hit max size + until size > max_size or batch.empty? + todo.push batch.shift + size += todo.last.to_s.size + end + + # Put back last one if we went over + if size > max_size + batch.push todo.pop + size -= batch.last.to_s.size + end + + # If we got here with nothing to do, we put our only todo back + # because it was too big - error. + if todo.empty? + item = batch.last + raise RuntimeError, + "#{target} #{item[:index][:_id]} size #{item.to_s.size} is larger than max_size #{max_size}" + end + + params = { + index: target_index, + type: target_type, + body: todo + } + + params[:pipeline] = pipeline if pipeline + + response = client.bulk params + + yield response if block_given? + + errors += response['items'].select { |k, v| k.values.first['error'] } + end end self.refresh_index! index: target_index if refresh