diff --git a/README.md b/README.md index b9eef90..adfc34b 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ Configuration options for fluent.conf are: * `retry_max_interval` - Maximum interval to wait between sending tries (default is `5m`) * `retry_timeout` - Time after which the data is going to be dropped (default is `72h`) (`0s` means that there is no timeout) * `retry_max_times` - Maximum number of retries (default is `0`) (`0` means that there is no max retry times, retries will happen forever) +* `max_request_size` - Maximum request size (before applying compression). Default is `0k` which means no limit __NOTE:__ * [Placeholders](https://docs.fluentd.org/v1.0/articles/buffer-section#placeholders) are supported diff --git a/lib/fluent/plugin/out_sumologic.rb b/lib/fluent/plugin/out_sumologic.rb index 7b8f632..c8c21aa 100644 --- a/lib/fluent/plugin/out_sumologic.rb +++ b/lib/fluent/plugin/out_sumologic.rb @@ -164,6 +164,8 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output config_param :retry_min_interval, :time, :default => 1 # 1s config_param :retry_max_interval, :time, :default => 5*60 # 5m + config_param :max_request_size, :size, :default => 0 + # https://help.sumologic.com/Manage/Fields desc 'Fields string (eg "cluster=payment, service=credit_card") which is going to be added to every log record.' config_param :custom_fields, :string, :default => nil @@ -252,6 +254,10 @@ def configure(conf) conf['compress_encoding'], log, ) + + if !conf['max_request_size'].nil? && conf['max_request_size'].to_i <= 0 + conf['max_request_size'] = '0' + end super end @@ -405,50 +411,75 @@ def write(chunk) fields = [fields,@custom_fields].compact.join(",") end - retries = 0 - start_time = Time.now - sleep_time = @retry_min_interval - - while true - common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}" - begin - @log.debug { "Sending #{messages.count}; #{common_log_part}" } - - @sumo_conn.publish( - messages.join("\n"), - source_host =source_host, - source_category =source_category, - source_name =source_name, - data_type =@data_type, - metric_data_format =@metric_data_format, - collected_fields =fields, - dimensions =@custom_dimensions - ) - break - rescue => e - if !@use_internal_retry - raise e + if @max_request_size <= 0 + messages_to_send = [messages] + else + messages_to_send = [] + current_message = [] + current_length = 0 + messages.each do |message| + current_message.push message + current_length += message.length + + if current_length > @max_request_size + messages_to_send.push(current_message) + current_message = [] + current_length = 0 end - # increment retries - retries = retries + 1 - - log.warn "error while sending request to sumo: #{e}; #{common_log_part}" - log.warn_backtrace e.backtrace - - # drop data if - # - we reached out the @retry_max_times retries - # - or we exceeded @retry_timeout - if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0) - log.warn "dropping records; #{common_log_part}" + current_length += 1 # this is for newline + end + if current_message.length > 0 + messages_to_send.push(current_message) + end + end + + messages_to_send.each_with_index do |message, i| + retries = 0 + start_time = Time.now + sleep_time = @retry_min_interval + + while true + common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}, batch #{i}" + + begin + @log.debug { "Sending #{message.count}; #{common_log_part}" } + + @sumo_conn.publish( + message.join("\n"), + source_host =source_host, + source_category =source_category, + source_name =source_name, + data_type =@data_type, + metric_data_format =@metric_data_format, + collected_fields =fields, + dimensions =@custom_dimensions + ) break - end - - log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}" - sleep sleep_time - - sleep_time = sleep_time * 2 - if sleep_time > @retry_max_interval - sleep_time = @retry_max_interval + rescue => e + if !@use_internal_retry + raise e + end + # increment retries + retries += 1 + + log.warn "error while sending request to sumo: #{e}; #{common_log_part}" + log.warn_backtrace e.backtrace + + # drop data if + # - we reached out the @retry_max_times retries + # - or we exceeded @retry_timeout + if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0) + log.warn "dropping records; #{common_log_part}" + break + end + + log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}" + sleep sleep_time + + sleep_time *= 2 + if sleep_time > @retry_max_interval + sleep_time = @retry_max_interval + end end end end diff --git a/test/plugin/test_out_sumologic.rb b/test/plugin/test_out_sumologic.rb index be625a8..f23a8dc 100644 --- a/test/plugin/test_out_sumologic.rb +++ b/test/plugin/test_out_sumologic.rb @@ -912,4 +912,70 @@ def test_skip_retry assert_equal("Failed to send data to HTTP Source. 500 - ", exception.message) end + def test_split_negative_or_zero + endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234" + + configs = [ + %{ + endpoint #{endpoint} + max_request_size -5 + }, + %{ + endpoint #{endpoint} + max_request_size 0 + } + ] + + time = event_time + + configs.each do |config| + WebMock.reset_executed_requests! + driver = create_driver(config) + stub_request(:post, endpoint).to_return(status: 200, headers: {content_type: 'application/json'}) + + driver.run do + driver.feed("test", time, {"message": "test"}) + driver.feed("test", time, {"message": "test"}) + driver.feed("test", time, {"message": "test"}) + end + + assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", + body: /\A{"timestamp":\d+.,"message":"test"}\n{"timestamp":\d+.,"message":"test"}\n{"timestamp":\d+.,"message":"test"}\z/, + times:1 + end + end + + def test_split + endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234" + + config = %{ + endpoint #{endpoint} + max_request_size 80 + } + + time = event_time + + WebMock.reset_executed_requests! + driver = create_driver(config) + stub_request(:post, endpoint).to_return(status: 200, headers: {content_type: 'application/json'}) + + driver.run do + driver.feed("test", time, {"message": "test1"}) + driver.feed("test", time, {"message": "test2"}) + driver.feed("test", time, {"message": "test3"}) + driver.feed("test", time, {"message": "test4"}) + driver.feed("test", time, {"message": "test5"}) + end + + assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", + body: /\A{"timestamp":\d+.,"message":"test1"}\n{"timestamp":\d+.,"message":"test2"}\z/, + times:1 + assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", + body: /\A{"timestamp":\d+.,"message":"test3"}\n{"timestamp":\d+.,"message":"test4"}\z/, + times:1 + assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", + body: /\A{"timestamp":\d+.,"message":"test5"}\z/, + times:1 + end + end