Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for compression (gzip and deflate) #58

Merged
merged 2 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions lib/fluent/plugin/out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,30 @@
require 'net/https'
require 'yajl'
require 'httpclient'
require 'zlib'
require 'stringio'

class SumologicConnection

attr_reader :http

def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies, sumo_client)
COMPRESS_DEFLATE = 'deflate'
COMPRESS_GZIP = 'gzip'

def initialize(endpoint, verify_ssl, connect_timeout, proxy_uri, disable_cookies, sumo_client, compress_enabled, compress_encoding)
@endpoint = endpoint
@sumo_client = sumo_client
create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies)
@compress = compress_enabled
@compress_encoding = (compress_encoding ||= COMPRESS_GZIP).downcase

unless [COMPRESS_DEFLATE, COMPRESS_GZIP].include? @compress_encoding
raise "Invalid compression encoding #{@compress_encoding} must be gzip or deflate"
end
end

def publish(raw_data, source_host=nil, source_category=nil, source_name=nil, data_type, metric_data_type, collected_fields)
response = http.post(@endpoint, raw_data, request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields))
response = http.post(@endpoint, compress(raw_data), request_headers(source_host, source_category, source_name, data_type, metric_data_type, collected_fields))
unless response.ok?
raise RuntimeError, "Failed to send data to HTTP Source. #{response.code} - #{response.body}"
end
Expand All @@ -27,6 +38,11 @@ def request_headers(source_host, source_category, source_name, data_type, metric
'X-Sumo-Host' => source_host,
'X-Sumo-Client' => @sumo_client,
}

if @compress
headers['Content-Encoding'] = @compress_encoding
end

if data_type == 'metrics'
case metric_data_format
when 'graphite'
Expand Down Expand Up @@ -57,6 +73,29 @@ def create_http_client(verify_ssl, connect_timeout, proxy_uri, disable_cookies)
@http.cookie_manager = nil
end
end

def compress(content)
if @compress
if @compress_encoding == COMPRESS_GZIP
result = gzip(content)
result.bytes.to_a.pack("c*")
else
Zlib::Deflate.deflate(content)
end
else
content
end
end # def compress

def gzip(content)
stream = StringIO.new("w")
stream.set_encoding("ASCII")
gz = Zlib::GzipWriter.new(stream)
gz.mtime=1 # Ensure that for same content there is same output
gz.write(content)
gz.close
stream.string.bytes.to_a.pack("c*")
end # def gzip
end

class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
Expand Down Expand Up @@ -92,6 +131,10 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
config_param :custom_fields, :string, :default => nil
desc 'Name of sumo client which is send as X-Sumo-Client header'
config_param :sumo_client, :string, :default => 'fluentd-output'
desc 'Compress payload'
config_param :compress, :bool, :default => false
desc 'Encoding method of compresssion (either gzip or deflate)'
config_param :compress_encoding, :string, :default => SumologicConnection::COMPRESS_GZIP

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
Expand Down Expand Up @@ -150,7 +193,9 @@ def configure(conf)
conf['open_timeout'].to_i,
conf['proxy_uri'],
conf['disable_cookies'],
conf['sumo_client']
conf['sumo_client'],
conf['compress'],
conf['compress_encoding']
)
super
end
Expand Down
46 changes: 46 additions & 0 deletions test/plugin/test_out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def test_default_configure
assert_equal instance.proxy_uri, nil
assert_equal instance.disable_cookies, false
assert_equal instance.sumo_client, 'fluentd-output'
assert_equal instance.compress_encoding, 'gzip'
end

def test_emit_text
Expand Down Expand Up @@ -570,4 +571,49 @@ def test_batching_different_fields
times:1
end

def test_emit_json_merge_timestamp_compress_deflate
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format json_merge
source_category test
source_host test
source_name test
compress true
compress_encoding deflate

}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => '{"timestamp":123}'})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'Content-Encoding'=>'deflate'},
body: "\x78\x9c\xab\x56\x2a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x50\xb2\x32\x34\x32\xae\x05\x00\x38\xb0\x05\xe1".force_encoding("ASCII-8BIT"),
times:1
end

def test_emit_json_merge_timestamp_compress_gzip
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format json_merge
source_category test
source_host test
source_name test
compress true

}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => '{"timestamp":1234}'})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'Content-Encoding'=>'gzip'},
body: "\x1f\x8b\x08\x00\x01\x00\x00\x00\x00\x03\xab\x56\x2a\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x50\xb2\x32\x34\x32\x36\xa9\x05\x00\xfe\x53\xbe\x14\x12\x00\x00\x00".force_encoding("ASCII-8BIT"),
times:1
end

end