diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index c600e35287..d2107330ba 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1036,17 +1036,17 @@ def generate_format_proc # iteration of event stream, and it should be done just once even if total event stream size # is bigger than chunk_limit_size because of performance. def handle_stream_with_custom_format(tag, es, enqueue: false) - meta_and_data = {} + meta_and_data = Hash.new { |h, k| h[k] = [] } records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) - meta_and_data[meta] ||= [] res = format(tag, time, record) if res meta_and_data[meta] << res records += 1 end end + meta_and_data.default_proc = nil write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end @@ -1057,14 +1057,14 @@ def handle_stream_with_custom_format(tag, es, enqueue: false) def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc - meta_and_data = {} + meta_and_data = Hash.new { |h, k| h[k] = MultiEventStream.new } records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| meta = metadata(tag, time, record) - meta_and_data[meta] ||= MultiEventStream.new meta_and_data[meta].add(time, record) records += 1 end + meta_and_data.default_proc = nil write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end