diff --git a/lib/fluent/plugin/out_exec_filter.rb b/lib/fluent/plugin/out_exec_filter.rb index 2bcda92615..2dec7997dd 100644 --- a/lib/fluent/plugin/out_exec_filter.rb +++ b/lib/fluent/plugin/out_exec_filter.rb @@ -95,6 +95,7 @@ class ExecFilterOutput < Output COMPAT_PARSE_PARAMS = { 'out_format' => '@type', 'out_keys' => 'keys', + 'out_stream_buffer_size' => 'stream_buffer_size', } COMPAT_EXTRACT_PARAMS = { 'out_tag_key' => 'tag_key', diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb index 50a80a31c9..49f5584d08 100644 --- a/lib/fluent/plugin/parser_json.rb +++ b/lib/fluent/plugin/parser_json.rb @@ -30,6 +30,12 @@ class JSONParser < Parser desc 'Set JSON parser' config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj + # The Yajl library defines a default buffer size of 8092 when parsing + # from IO streams, so maintain this for backwards-compatibility. + # https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse + desc 'Set the buffer size that Yajl will use when parsing streaming input' + config_param :stream_buffer_size, :integer, default: 8092 + config_set_default :time_type, :float def configure(conf) @@ -81,7 +87,7 @@ def parse_io(io, &block) y.on_parse_complete = ->(record){ block.call(parse_time(record), record) } - y.parse(io) + y.parse(io, @stream_buffer_size) end end end diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb index 8d9be9f1fa..d3ced55c2c 100644 --- a/test/plugin/test_out_exec_filter.rb +++ b/test/plugin/test_out_exec_filter.rb @@ -328,6 +328,7 @@ def create_driver(conf) @type json + stream_buffer_size 1 tag_key tag @@ -338,6 +339,7 @@ def create_driver(conf) command cat in_keys message out_format json + out_stream_buffer_size 1 time_key time tag_key tag ] @@ -372,6 +374,7 @@ def create_driver(conf) @type json + stream_buffer_size 1 tag_key tag @@ -382,6 +385,7 @@ def create_driver(conf) command cat in_keys message out_format json + out_stream_buffer_size 1 time_key time tag_key tag ] @@ -414,6 +418,7 @@ def create_driver(conf) @type json + stream_buffer_size 1 tag_key tag @@ -426,6 +431,7 @@ def create_driver(conf) command cat in_keys message out_format json + out_stream_buffer_size 1 time_key time time_format %d/%b/%Y %H:%M:%S.%N %z tag_key tag diff --git a/test/plugin/test_parser_json.rb b/test/plugin/test_parser_json.rb index 46c5f47d9f..19c45402d1 100644 --- a/test/plugin/test_parser_json.rb +++ b/test/plugin/test_parser_json.rb @@ -111,4 +111,28 @@ def test_parse_with_keep_time_key_without_time_format(data) assert_equal text, record['time'] end end + + def test_yajl_parse_io_with_buffer_smaller_than_input + parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::JSONParser) + parser.configure( + 'keep_time_key' => 'true', + 'json_parser' => 'yajl', + 'stream_buffer_size' => 1, + ) + text = "100" + + waiting(5) do + rd, wr = IO.pipe + wr.write "{\"time\":\"#{text}\"}" + + parser.instance.parse_io(rd) do |time, record| + assert_equal text.to_i, time.sec + assert_equal text, record['time'] + + # Once a record has been received the 'write' end of the pipe must be + # closed, otherwise the test will block waiting for more input. + wr.close + end + end + end end