Skip to content

Commit

Permalink
feat(records_per_request): add records_per_request to limit number of…
Browse files Browse the repository at this point in the history
… records per request

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
Dominik Rosiek committed Apr 22, 2022
1 parent 417b123 commit e318625
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 42 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Configuration options for fluent.conf are:
* `retry_max_interval` - Maximum interval to wait between sending tries (default is `5m`)
* `retry_timeout` - Time after which the data is going to be dropped (default is `72h`) (`0s` means that there is no timeout)
* `retry_max_times` - Maximum number of retries (default is `0`) (`0` means that there is no max retry times, retries will happen forever)
* `max_request_size` - Maximum request size (before applying compression). Default is `0k` which means no limit

__NOTE:__ <sup>*</sup> [Placeholders](https://docs.fluentd.org/v1.0/articles/buffer-section#placeholders) are supported

Expand Down
115 changes: 73 additions & 42 deletions lib/fluent/plugin/out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
config_param :retry_min_interval, :time, :default => 1 # 1s
config_param :retry_max_interval, :time, :default => 5*60 # 5m

config_param :max_request_size, :size, :default => 0

# https://help.sumologic.com/Manage/Fields
desc 'Fields string (eg "cluster=payment, service=credit_card") which is going to be added to every log record.'
config_param :custom_fields, :string, :default => nil
Expand Down Expand Up @@ -252,6 +254,10 @@ def configure(conf)
conf['compress_encoding'],
log,
)

if !conf['max_request_size'].nil? && conf['max_request_size'].to_i <= 0
conf['max_request_size'] = '0'
end
super
end

Expand Down Expand Up @@ -405,50 +411,75 @@ def write(chunk)
fields = [fields,@custom_fields].compact.join(",")
end

retries = 0
start_time = Time.now
sleep_time = @retry_min_interval

while true
common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}"
begin
@log.debug { "Sending #{messages.count}; #{common_log_part}" }

@sumo_conn.publish(
messages.join("\n"),
source_host =source_host,
source_category =source_category,
source_name =source_name,
data_type =@data_type,
metric_data_format =@metric_data_format,
collected_fields =fields,
dimensions =@custom_dimensions
)
break
rescue => e
if !@use_internal_retry
raise e
if @max_request_size <= 0
messages_to_send = [messages]
else
messages_to_send = []
current_message = []
current_length = 0
messages.each do |message|
current_message.push message
current_length += message.length

if current_length > @max_request_size
messages_to_send.push(current_message)
current_message = []
current_length = 0
end
# increment retries
retries = retries + 1

log.warn "error while sending request to sumo: #{e}; #{common_log_part}"
log.warn_backtrace e.backtrace

# drop data if
# - we reached out the @retry_max_times retries
# - or we exceeded @retry_timeout
if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0)
log.warn "dropping records; #{common_log_part}"
current_length += 1 # this is for newline
end
if current_message.length > 0
messages_to_send.push(current_message)
end
end

messages_to_send.each_with_index do |message, i|
retries = 0
start_time = Time.now
sleep_time = @retry_min_interval

while true
common_log_part = "#{@data_type} records with source category '#{source_category}', source host '#{source_host}', source name '#{source_name}', chunk #{chunk_id}, try #{retries}, batch #{i}"

begin
@log.debug { "Sending #{message.count}; #{common_log_part}" }

@sumo_conn.publish(
message.join("\n"),
source_host =source_host,
source_category =source_category,
source_name =source_name,
data_type =@data_type,
metric_data_format =@metric_data_format,
collected_fields =fields,
dimensions =@custom_dimensions
)
break
end

log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}"
sleep sleep_time

sleep_time = sleep_time * 2
if sleep_time > @retry_max_interval
sleep_time = @retry_max_interval
rescue => e
if !@use_internal_retry
raise e
end
# increment retries
retries += 1

log.warn "error while sending request to sumo: #{e}; #{common_log_part}"
log.warn_backtrace e.backtrace

# drop data if
# - we reached out the @retry_max_times retries
# - or we exceeded @retry_timeout
if (retries >= @retry_max_times && @retry_max_times > 0) || (Time.now > start_time + @retry_timeout && @retry_timeout > 0)
log.warn "dropping records; #{common_log_part}"
break
end

log.info "going to retry to send data at #{Time.now + sleep_time}; #{common_log_part}"
sleep sleep_time

sleep_time *= 2
if sleep_time > @retry_max_interval
sleep_time = @retry_max_interval
end
end
end
end
Expand Down
66 changes: 66 additions & 0 deletions test/plugin/test_out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -912,4 +912,70 @@ def test_skip_retry
assert_equal("Failed to send data to HTTP Source. 500 - ", exception.message)
end

def test_split_negative_or_zero
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"

configs = [
%{
endpoint #{endpoint}
max_request_size -5
},
%{
endpoint #{endpoint}
max_request_size 0
}
]

time = event_time

configs.each do |config|
WebMock.reset_executed_requests!
driver = create_driver(config)
stub_request(:post, endpoint).to_return(status: 200, headers: {content_type: 'application/json'})

driver.run do
driver.feed("test", time, {"message": "test"})
driver.feed("test", time, {"message": "test"})
driver.feed("test", time, {"message": "test"})
end

assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
body: /\A{"timestamp":\d+.,"message":"test"}\n{"timestamp":\d+.,"message":"test"}\n{"timestamp":\d+.,"message":"test"}\z/,
times:1
end
end

def test_split
endpoint = "https://collectors.sumologic.com/v1/receivers/http/1234"

config = %{
endpoint #{endpoint}
max_request_size 80
}

time = event_time

WebMock.reset_executed_requests!
driver = create_driver(config)
stub_request(:post, endpoint).to_return(status: 200, headers: {content_type: 'application/json'})

driver.run do
driver.feed("test", time, {"message": "test1"})
driver.feed("test", time, {"message": "test2"})
driver.feed("test", time, {"message": "test3"})
driver.feed("test", time, {"message": "test4"})
driver.feed("test", time, {"message": "test5"})
end

assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
body: /\A{"timestamp":\d+.,"message":"test1"}\n{"timestamp":\d+.,"message":"test2"}\z/,
times:1
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
body: /\A{"timestamp":\d+.,"message":"test3"}\n{"timestamp":\d+.,"message":"test4"}\z/,
times:1
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
body: /\A{"timestamp":\d+.,"message":"test5"}\z/,
times:1
end

end

0 comments on commit e318625

Please sign in to comment.