Skip to content

Commit

Permalink
Add support for compression (gzip and deflate)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dominik Rosiek committed Apr 7, 2020
1 parent e29465d commit 1b7497d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
45 changes: 42 additions & 3 deletions lib/fluent/plugin/out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@
require 'net/https'
require 'yajl'
require 'httpclient'
require 'zlib'
require 'stringio'

class SumologicConnection

attr_reader :http

def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies)
COMPRESS_DEFLATE = 'deflate'
COMPRESS_GZIP = 'gzip'

def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies, compress_enabled, compress_encoding)
@endpoint = endpoint
create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies)
@compress = compress_enabled
@compress_encoding = (compress_encoding ||= COMPRESS_DEFLATE).downcase
end

def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type, collected_fields)
response = http.post(@endpoint, raw_data, request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields))
response = http.post(@endpoint, compress(raw_data), request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields))
unless response.ok?
raise RuntimeError, "Failed to send data to HTTP Source. #{response.code} - #{response.body}"
end
Expand All @@ -26,6 +33,11 @@ def request_headers(source_host, source_category, source_name, data_type, metric
'X-Sumo-Host' => source_host,
'X-Sumo-Client' => 'fluentd-output'
}

if @compress
headers['Content-Encoding'] = @compress_encoding
end

if data_type == 'metrics'
case metric_data_format
when 'graphite'
Expand Down Expand Up @@ -56,6 +68,29 @@ def create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies)
@http.cookie_manager = nil
end
end

def compress(content)
if @compress
if @compress_encoding == COMPRESS_GZIP
result = gzip(content)
result.bytes.to_a.pack("c*")
else
Zlib::Deflate.deflate(content)
end
else
content
end
end # def compress

def gzip(content)
stream = StringIO.new("w")
stream.set_encoding("ASCII")
gz = Zlib::GzipWriter.new(stream)
gz.mtime=1 # Ensure that for same content there is same output
gz.write(content)
gz.close
stream.string.bytes.to_a.pack("c*")
end # def gzip
end

class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
Expand Down Expand Up @@ -89,6 +124,10 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
# https://help.sumologic.com/Manage/Fields
desc 'Fields string (eg "cluster=payment, service=credit_card") which is going to be added to every record.'
config_param :custom_fields, :string, :default => nil
desc 'Compress payload'
config_param :compress, :bool, :default => false
desc 'Encoding method of compresssion (either gzip or deflate)'
config_param :compress_encoding, :string, :default => SumologicConnection::COMPRESS_DEFLATE

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
Expand Down Expand Up @@ -136,7 +175,7 @@ def configure(conf)
conf['custom_fields'] = nil
end

@sumo_conn = SumologicConnection.new(conf['endpoint'], conf['verify_ssl'], conf['open_timeout'].to_i, conf['proxy_uri'], conf['disable_cookies'])
@sumo_conn = SumologicConnection.new(conf['endpoint'], conf['verify_ssl'], conf['open_timeout'].to_i, conf['proxy_uri'], conf['disable_cookies'], conf['compress'], conf['compress_encoding'])
super
end

Expand Down
45 changes: 45 additions & 0 deletions test/plugin/test_out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,49 @@ def test_batching_different_fields
times:1
end

def test_emit_json_merge_timestamp_compress_deflate
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format json_merge
source_category test
source_host test
source_name test
compress true
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => '{"timestamp":123}'})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'Content-Encoding'=>'deflate'},
body: "\x78\x9c\xab\x56\x2a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x50\xb2\x32\x34\x32\xae\x05\x00\x38\xb0\x05\xe1".force_encoding("ASCII-8BIT"),
times:1
end

def test_emit_json_merge_timestamp_compress_gzip
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format json_merge
source_category test
source_host test
source_name test
compress true
compress_encoding gzip
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => '{"timestamp":1234}'})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'Content-Encoding'=>'gzip'},
body: "\x1f\x8b\x08\x00\x01\x00\x00\x00\x00\x03\xab\x56\x2a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x50\xb2\x32\x34\x32\x36\xa9\x05\x00\xfe\x53\xbe\x14\x12\x00\x00\x00".force_encoding("ASCII-8BIT"),
times:1
end

end

0 comments on commit 1b7497d

Please sign in to comment.