From 31f79c0b713e730fc5d8cca6d426755f58c58b63 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Thu, 10 Oct 2019 21:18:29 -0700 Subject: [PATCH 1/4] remove log_format, change expected input format 'message' key --- lib/fluent/plugin/out_sumologic.rb | 75 ++------- test/plugin/test_out_sumologic.rb | 237 +++++++---------------------- 2 files changed, 65 insertions(+), 247 deletions(-) diff --git a/lib/fluent/plugin/out_sumologic.rb b/lib/fluent/plugin/out_sumologic.rb index 2512737..03bc722 100644 --- a/lib/fluent/plugin/out_sumologic.rb +++ b/lib/fluent/plugin/out_sumologic.rb @@ -73,7 +73,6 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output config_param :data_type, :string, :default => DEFAULT_DATA_TYPE config_param :metric_data_format, :default => DEFAULT_METRIC_FORMAT_TYPE config_param :endpoint, :string, secret: true - config_param :log_format, :string, :default => 'json' config_param :log_key, :string, :default => 'message' config_param :source_category, :string, :default => nil config_param :source_name, :string, :default => nil @@ -115,14 +114,6 @@ def configure(conf) end end - if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE - unless conf['log_format'].nil? - unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/ - raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields" - end - end - end - if conf['data_type'] == METRICS_DATA_TYPE && ! conf['metrics_data_type'].nil? unless conf['metrics_data_type'] =~ /\A(?:graphite|carbon2|pronetheus)\z/ raise Fluent::ConfigError, "Invalid metrics_data_type #{conf['metrics_data_type']} must be graphite or carbon2 or prometheus" @@ -143,29 +134,14 @@ def shutdown super end - # Used to merge log record into top level json - def merge_json(record) - if record.has_key?(@log_key) - log = record[@log_key].strip - if log[0].eql?('{') && log[-1].eql?('}') - begin - record = record.merge(JSON.parse(log)) - record.delete(@log_key) - rescue JSON::ParserError - # do nothing, ignore - end - end - end - record - end - # Strip sumo_metadata and dump to json - def dump_log(log) - log.delete('_sumo_metadata') + def dump_log(record) + log = record['message'] + return log.strip if log.is_a?(String) begin parser = Yajl::Parser.new - hash = parser.parse(log[@log_key]) - log[@log_key] = hash + log[@log_key].strip! + log[@log_key] = parser.parse(log[@log_key]) if log[@log_key] Yajl.dump(log) rescue Yajl.dump(log) @@ -198,18 +174,13 @@ def sumo_key(sumo_metadata, chunk) "#{source_name}:#{source_category}:#{source_host}" end - # Convert timestamp to 13 digit epoch if necessary - def sumo_timestamp(time) - time.to_s.length == 13 ? time : time * 1000 - end - def sumo_fields(sumo_metadata) fields = sumo_metadata['fields'] || "" Hash[ - fields.split(',').map do |pair| - k, v = pair.split('=', 2) - [k, v] - end + fields.split(',').map do |pair| + k, v = pair.split('=', 2) + [k, v] + end ] end @@ -233,36 +204,14 @@ def write(chunk) next unless record.is_a? Hash sumo_metadata = record.fetch('_sumo_metadata', {:source => record[@source_name_key] }) key = sumo_key(sumo_metadata, chunk) - log_format = sumo_metadata['log_format'] || @log_format # Strip any unwanted newlines record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!) case @data_type when 'logs' - case log_format - when 'text' - log = record[@log_key] - unless log.nil? - log.strip! - end - when 'json_merge' - if @add_timestamp - record = { @timestamp_key => sumo_timestamp(time) }.merge(record) - end - log = dump_log(merge_json(record)) - when 'fields' - log_fields = sumo_fields(sumo_metadata) - if @add_timestamp - record = { @timestamp_key => sumo_timestamp(time) }.merge(record) - end - log = dump_log(record) - else - if @add_timestamp - record = { @timestamp_key => sumo_timestamp(time) }.merge(record) - end - log = dump_log(record) - end + log_fields = sumo_fields(sumo_metadata) + log = dump_log(record) when 'metrics' log = record[@log_key] unless log.nil? @@ -277,7 +226,6 @@ def write(chunk) messages_list[key] = [log] end end - end # Push logs to sumo @@ -293,6 +241,5 @@ def write(chunk) collected_fields =dump_collected_fields(log_fields) ) end - end end diff --git a/test/plugin/test_out_sumologic.rb b/test/plugin/test_out_sumologic.rb index 2502d52..c68bbed 100644 --- a/test/plugin/test_out_sumologic.rb +++ b/test/plugin/test_out_sumologic.rb @@ -31,15 +31,6 @@ def test_invalid_data_type_configure assert_equal("Invalid data_type foo must be logs or metrics", exception.message) end - def test_invalid_log_format_configure - config = %{ - endpoint https://SUMOLOGIC_URL - log_format foo - } - exception = assert_raise(Fluent::ConfigError) {create_driver(config)} - assert_equal("Invalid log_format foo must be text, json, json_merge or fields", exception.message) - end - def test_invalid_metrics_data_type config = %{ endpoint https://SUMOLOGIC_URL @@ -59,7 +50,6 @@ def test_default_configure assert_equal instance.data_type, 'logs' assert_equal instance.metric_data_format, 'graphite' assert_equal instance.endpoint, 'https://SUMOLOGIC_URL' - assert_equal instance.log_format, 'json' assert_equal instance.log_key, 'message' assert_equal instance.source_category, nil assert_equal instance.source_name, nil @@ -74,164 +64,57 @@ def test_default_configure assert_equal instance.disable_cookies, false end - def test_emit_text + def test_emit_input_text_format_no_sumo_metadata config = %{ endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format text source_category test source_host test source_name test - } driver = create_driver(config) time = event_time stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') - driver.run do - driver.feed("output.test", time, {'foo' => 'bar', 'message' => 'test'}) - end - assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: "test", - times:1 - end - - def test_emit_json - config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format json - source_category test - source_host test - source_name test - - } - driver = create_driver(config) - time = event_time - stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') - driver.run do - driver.feed("output.test", time, {'foo' => 'bar', 'message' => 'test'}) - end - assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"timestamp":\d+.,"foo":"bar","message":"test"}\z/, - times:1 - end - - def test_emit_empty_fields - config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format fields - source_category test - source_host test - source_name test - - } - driver = create_driver(config) - time = event_time - stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') driver.run do driver.feed("output.test", time, {'message' => 'test'}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"timestamp":\d+.,"message":"test"}\z/, - times:1 - end - - def test_emit_json_double_encoded - config = %{ - endpoint https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234 - log_format json - source_category test - source_host test - source_name test - - } - driver = create_driver(config) - time = event_time - stub_request(:post, 'https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234') - driver.run do - driver.feed("output.test", time, {'message' => '{"bar":"foo"}'}) - end - assert_requested :post, "https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"timestamp":\d+.,"message":{"bar":"foo"}}\z/, - times:1 - end - - def test_emit_text_format_as_json - config = %{ - endpoint https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234 - log_format json - source_category test - source_host test - source_name test - - } - driver = create_driver(config) - time = event_time - stub_request(:post, 'https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234') - driver.run do - driver.feed("output.test", time, {'message' => 'some message'}) - end - assert_requested :post, "https://endpoint3.collection.us2.sumologic.com/receiver/v1/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"timestamp":\d+.,"message":"some message"}\z/, - times:1 - end - - def test_emit_json_merge - config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format json_merge - source_category test - source_host test - source_name test - - } - driver = create_driver(config) - time = event_time - stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') - driver.run do - driver.feed("output.test", time, {'foo' => 'bar', 'message' => '{"foo2":"bar2"}'}) - end - assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"timestamp":\d+,"foo":"bar","foo2":"bar2"}\z/, + body: "test", times:1 end - def test_emit_json_merge_timestamp + def test_emit_input_json_format_no_sumo_metadata config = %{ endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format json_merge source_category test source_host test source_name test - } driver = create_driver(config) time = event_time stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') driver.run do - driver.feed("output.test", time, {'message' => '{"timestamp":123}'}) + driver.feed("output.test", time, {'message' => {"bar" => "foo"}}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"timestamp":123}\z/, + body: /\A{"bar":"foo"}\z/, times:1 end - def test_emit_with_sumo_metadata_with_fields_json_format + def test_emit_input_text_format_with_sumo_metadata config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format json - } + endpoint https://collectors.sumologic.com/v1/receivers/http/1234 + source_category test + source_host test + source_name test + } driver = create_driver(config) time = event_time stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') ENV['HOST'] = "foo" driver.run do - driver.feed("output.test", time, {'foo' => 'bar', 'message' => 'test', '_sumo_metadata' => { + driver.feed("output.test", time, {'message' => 'some message', '_sumo_metadata' => { "host": "#{ENV['HOST']}", "source": "${tag}", "category": "test", @@ -239,22 +122,24 @@ def test_emit_with_sumo_metadata_with_fields_json_format }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test'}, - body: /\A{"timestamp":\d+.,"foo":"bar","message":"test"}\z/, - times:1 + headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test', 'X-Sumo-Fields' => 'foo=bar, sumo = logic'}, + body: 'some message', + times:1 end - def test_emit_with_sumo_metadata_with_fields_fields_format + def test_emit_input_json_format_with_sumo_metadata config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format fields - } + endpoint https://collectors.sumologic.com/v1/receivers/http/1234 + source_category test + source_host test + source_name test + } driver = create_driver(config) time = event_time stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') ENV['HOST'] = "foo" driver.run do - driver.feed("output.test", time, {'foo' => 'shark', 'message' => 'test', '_sumo_metadata' => { + driver.feed("output.test", time, {'message' => {"bar" => "foo", "foo" => "shark"}, '_sumo_metadata' => { "host": "#{ENV['HOST']}", "source": "${tag}", "category": "test", @@ -262,73 +147,60 @@ def test_emit_with_sumo_metadata_with_fields_fields_format }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test', 'X-Sumo-Fields' => 'foo=bar, sumo = logic'}, - body: /\A{"timestamp":\d+.,"foo":"shark","message":"test"}\z/, - times:1 + headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test', 'X-Sumo-Fields' => 'foo=bar, sumo = logic'}, + body: /\A{"bar":"foo","foo":"shark"}\z/, + times:1 end - def test_emit_with_sumo_metadata + def test_emit_input_text_format_strip_newlines config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format json - } + endpoint https://collectors.sumologic.com/v1/receivers/http/1234 + source_category test + source_host test + source_name test + } driver = create_driver(config) time = event_time stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') ENV['HOST'] = "foo" driver.run do - driver.feed("output.test", time, {'foo' => 'bar', 'message' => 'test', '_sumo_metadata' => { + driver.feed("output.test", time, {'message' => "\nsome message\n", '_sumo_metadata' => { "host": "#{ENV['HOST']}", "source": "${tag}", - "category": "${tag[1]}" + "category": "test", + "fields": "foo=bar, sumo = logic" }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test'}, - body: /\A{"timestamp":\d+.,"foo":"bar","message":"test"}\z/, - times:1 + headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test', 'X-Sumo-Fields' => 'foo=bar, sumo = logic'}, + body: 'some message', + times:1 end - def test_emit_json_no_timestamp + def test_emit_input_json_format_strip_newlines config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format json - source_category test - source_host test - source_name test - add_timestamp false - } - driver = create_driver(config) - time = event_time - stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') - driver.run do - driver.feed("output.test", time, {'foo' => 'bar', 'message' => 'test'}) - end - assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"foo":"bar","message":"test"}\z/, - times:1 - end - - def test_emit_json_timestamp_key - config = %{ - endpoint https://collectors.sumologic.com/v1/receivers/http/1234 - log_format json - source_category test - source_host test - source_name test - timestamp_key ts - } + endpoint https://collectors.sumologic.com/v1/receivers/http/1234 + source_category test + source_host test + source_name test + log_key log + } driver = create_driver(config) time = event_time stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234') + ENV['HOST'] = "foo" driver.run do - driver.feed("output.test", time, {'message' => 'test'}) + driver.feed("output.test", time, {'message' => {"log" => "\nhello\n", "foo" => "shark"}, '_sumo_metadata' => { + "host": "#{ENV['HOST']}", + "source": "${tag}", + "category": "test", + "fields": "foo=bar, sumo = logic" + }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", - headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'}, - body: /\A{"ts":\d+.,"message":"test"}\z/, - times:1 + headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'foo', 'X-Sumo-Name'=>'output.test', 'X-Sumo-Fields' => 'foo=bar, sumo = logic'}, + body: /\A{"log":"hello","foo":"shark"}\z/, + times:1 end def test_emit_graphite @@ -393,5 +265,4 @@ def test_emit_prometheus body: 'cpu{cluster="prod", node="lb-1"} 87.2 1501753030', times:1 end - end \ No newline at end of file From 574e2d751a2d652903ffbe5199021fad9cd22848 Mon Sep 17 00:00:00 2001 From: Sam Song Date: Thu, 10 Oct 2019 21:55:26 -0700 Subject: [PATCH 2/4] remove no longer used config params --- lib/fluent/plugin/out_sumologic.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/fluent/plugin/out_sumologic.rb b/lib/fluent/plugin/out_sumologic.rb index 03bc722..63330d8 100644 --- a/lib/fluent/plugin/out_sumologic.rb +++ b/lib/fluent/plugin/out_sumologic.rb @@ -81,8 +81,6 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output config_param :verify_ssl, :bool, :default => true config_param :delimiter, :string, :default => "." config_param :open_timeout, :integer, :default => 60 - config_param :add_timestamp, :bool, :default => true - config_param :timestamp_key, :string, :default => 'timestamp' config_param :proxy_uri, :string, :default => nil config_param :disable_cookies, :bool, :default => false From 47882da98ba4629a30ca9af06d45926c0a649abf Mon Sep 17 00:00:00 2001 From: Sam Song Date: Thu, 10 Oct 2019 22:41:52 -0700 Subject: [PATCH 3/4] remove unused configs from test --- test/plugin/test_out_sumologic.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/plugin/test_out_sumologic.rb b/test/plugin/test_out_sumologic.rb index c68bbed..101bf4d 100644 --- a/test/plugin/test_out_sumologic.rb +++ b/test/plugin/test_out_sumologic.rb @@ -58,8 +58,6 @@ def test_default_configure assert_equal instance.verify_ssl, true assert_equal instance.delimiter, '.' assert_equal instance.open_timeout, 60 - assert_equal instance.add_timestamp, true - assert_equal instance.timestamp_key, 'timestamp' assert_equal instance.proxy_uri, nil assert_equal instance.disable_cookies, false end From 7fdb38d740ce86922ebb6101e7cd8ecfe2d4159c Mon Sep 17 00:00:00 2001 From: Sam Song Date: Fri, 11 Oct 2019 12:52:01 -0700 Subject: [PATCH 4/4] accept input _sumo_metadata[:fields] as map not string --- lib/fluent/plugin/out_sumologic.rb | 12 +----------- test/plugin/test_out_sumologic.rb | 8 ++++---- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/lib/fluent/plugin/out_sumologic.rb b/lib/fluent/plugin/out_sumologic.rb index 63330d8..3859bf0 100644 --- a/lib/fluent/plugin/out_sumologic.rb +++ b/lib/fluent/plugin/out_sumologic.rb @@ -172,16 +172,6 @@ def sumo_key(sumo_metadata, chunk) "#{source_name}:#{source_category}:#{source_host}" end - def sumo_fields(sumo_metadata) - fields = sumo_metadata['fields'] || "" - Hash[ - fields.split(',').map do |pair| - k, v = pair.split('=', 2) - [k, v] - end - ] - end - def dump_collected_fields(log_fields) if log_fields.nil? log_fields @@ -208,7 +198,7 @@ def write(chunk) case @data_type when 'logs' - log_fields = sumo_fields(sumo_metadata) + log_fields = sumo_metadata['fields'] log = dump_log(record) when 'metrics' log = record[@log_key] diff --git a/test/plugin/test_out_sumologic.rb b/test/plugin/test_out_sumologic.rb index 101bf4d..76e7b3e 100644 --- a/test/plugin/test_out_sumologic.rb +++ b/test/plugin/test_out_sumologic.rb @@ -116,7 +116,7 @@ def test_emit_input_text_format_with_sumo_metadata "host": "#{ENV['HOST']}", "source": "${tag}", "category": "test", - "fields": "foo=bar, sumo = logic" + "fields": {"foo"=>"bar", " sumo " => " logic"} }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", @@ -141,7 +141,7 @@ def test_emit_input_json_format_with_sumo_metadata "host": "#{ENV['HOST']}", "source": "${tag}", "category": "test", - "fields": "foo=bar, sumo = logic" + "fields": {"foo"=>"bar", " sumo " => " logic"} }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", @@ -166,7 +166,7 @@ def test_emit_input_text_format_strip_newlines "host": "#{ENV['HOST']}", "source": "${tag}", "category": "test", - "fields": "foo=bar, sumo = logic" + "fields": {"foo"=>"bar", " sumo " => " logic"} }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234", @@ -192,7 +192,7 @@ def test_emit_input_json_format_strip_newlines "host": "#{ENV['HOST']}", "source": "${tag}", "category": "test", - "fields": "foo=bar, sumo = logic" + "fields": {"foo"=>"bar", " sumo " => " logic"} }}) end assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",