Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[breaking change] remove log_format #53

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 7 additions & 72 deletions lib/fluent/plugin/out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
config_param :data_type, :string, :default => DEFAULT_DATA_TYPE
config_param :metric_data_format, :default => DEFAULT_METRIC_FORMAT_TYPE
config_param :endpoint, :string, secret: true
config_param :log_format, :string, :default => 'json'
config_param :log_key, :string, :default => 'message'
config_param :source_category, :string, :default => nil
config_param :source_name, :string, :default => nil
Expand All @@ -82,8 +81,6 @@ class Fluent::Plugin::Sumologic < Fluent::Plugin::Output
config_param :verify_ssl, :bool, :default => true
config_param :delimiter, :string, :default => "."
config_param :open_timeout, :integer, :default => 60
config_param :add_timestamp, :bool, :default => true
config_param :timestamp_key, :string, :default => 'timestamp'
config_param :proxy_uri, :string, :default => nil
config_param :disable_cookies, :bool, :default => false

Expand Down Expand Up @@ -115,14 +112,6 @@ def configure(conf)
end
end

if conf['data_type'].nil? || conf['data_type'] == LOGS_DATA_TYPE
unless conf['log_format'].nil?
unless conf['log_format'] =~ /\A(?:json|text|json_merge|fields)\z/
raise Fluent::ConfigError, "Invalid log_format #{conf['log_format']} must be text, json, json_merge or fields"
end
end
end

if conf['data_type'] == METRICS_DATA_TYPE && ! conf['metrics_data_type'].nil?
unless conf['metrics_data_type'] =~ /\A(?:graphite|carbon2|pronetheus)\z/
raise Fluent::ConfigError, "Invalid metrics_data_type #{conf['metrics_data_type']} must be graphite or carbon2 or prometheus"
Expand All @@ -143,29 +132,14 @@ def shutdown
super
end

# Used to merge log record into top level json
def merge_json(record)
if record.has_key?(@log_key)
log = record[@log_key].strip
if log[0].eql?('{') && log[-1].eql?('}')
begin
record = record.merge(JSON.parse(log))
record.delete(@log_key)
rescue JSON::ParserError
# do nothing, ignore
end
end
end
record
end

# Strip sumo_metadata and dump to json
def dump_log(log)
log.delete('_sumo_metadata')
def dump_log(record)
log = record['message']
return log.strip if log.is_a?(String)
begin
parser = Yajl::Parser.new
hash = parser.parse(log[@log_key])
log[@log_key] = hash
log[@log_key].strip!
log[@log_key] = parser.parse(log[@log_key]) if log[@log_key]
Yajl.dump(log)
rescue
Yajl.dump(log)
Expand Down Expand Up @@ -198,21 +172,6 @@ def sumo_key(sumo_metadata, chunk)
"#{source_name}:#{source_category}:#{source_host}"
end

# Convert timestamp to 13 digit epoch if necessary
def sumo_timestamp(time)
time.to_s.length == 13 ? time : time * 1000
end

def sumo_fields(sumo_metadata)
fields = sumo_metadata['fields'] || ""
Hash[
fields.split(',').map do |pair|
k, v = pair.split('=', 2)
[k, v]
end
]
end

def dump_collected_fields(log_fields)
if log_fields.nil?
log_fields
Expand All @@ -233,36 +192,14 @@ def write(chunk)
next unless record.is_a? Hash
sumo_metadata = record.fetch('_sumo_metadata', {:source => record[@source_name_key] })
key = sumo_key(sumo_metadata, chunk)
log_format = sumo_metadata['log_format'] || @log_format

# Strip any unwanted newlines
record[@log_key].chomp! if record[@log_key] && record[@log_key].respond_to?(:chomp!)

case @data_type
when 'logs'
case log_format
when 'text'
log = record[@log_key]
unless log.nil?
log.strip!
end
when 'json_merge'
if @add_timestamp
record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
end
log = dump_log(merge_json(record))
when 'fields'
log_fields = sumo_fields(sumo_metadata)
if @add_timestamp
record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
end
log = dump_log(record)
else
if @add_timestamp
record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
end
log = dump_log(record)
end
log_fields = sumo_metadata['fields']
log = dump_log(record)
when 'metrics'
log = record[@log_key]
unless log.nil?
Expand All @@ -277,7 +214,6 @@ def write(chunk)
messages_list[key] = [log]
end
end

end

# Push logs to sumo
Expand All @@ -293,6 +229,5 @@ def write(chunk)
collected_fields =dump_collected_fields(log_fields)
)
end

end
end
Loading