Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_parser: Add emit_invalid_record_to_error parameter #1494

Merged
merged 2 commits into from
Mar 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ParserFilter < Filter
config_param :inject_key_prefix, :string, default: nil
config_param :replace_invalid_sequence, :bool, default: false
config_param :hash_value_field, :string, default: nil
config_param :emit_invalid_record_to_error, :bool, default: true

attr_reader :parser

Expand All @@ -49,7 +50,9 @@ def configure(conf)
def filter_with_time(tag, time, record)
raw_value = record[@key_name]
if raw_value.nil?
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
Expand All @@ -67,7 +70,9 @@ def filter_with_time(tag, time, record)
r = handle_parsed(tag, record, t, values)
return t, r
else
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'"))
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not match with data '#{raw_value}'"))
end
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
Expand All @@ -78,17 +83,23 @@ def filter_with_time(tag, time, record)
end
end
rescue Fluent::Plugin::Parser::ParserError => e
router.emit_error_event(tag, time, record, e)
return FAILED_RESULT
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
end
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0

raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("parse failed #{e.message}"))
return FAILED_RESULT
if @emit_invalid_record_to_error
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
else
return FAILED_RESULT
end
end
end

Expand Down
35 changes: 35 additions & 0 deletions test/plugin/test_filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -662,4 +662,39 @@ def test_not_call_emit_error_event_when_pattern_is_mached
end
end
end

class EmitInvalidRecordToErrorTest < self
def test_pattern_is_mismached_with_emit_invalid_record_to_error
d = create_driver(CONFIG_UNMATCHED_PATTERN_LOG + "emit_invalid_record_to_error false")
flexmock(d.instance.router).should_receive(:emit_error_event).never
assert_nothing_raised {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'message' => INVALID_MESSAGE})
end
}
assert_equal 0, d.filtered.length
end

def test_parser_error_with_emit_invalid_record_to_error
d = create_driver(CONFIG_INVALID_TIME_VALUE + "emit_invalid_record_to_error false")
flexmock(d.instance.router).should_receive(:emit_error_event).never
assert_nothing_raised {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'data' => '{"time":[], "f1":"v1"}'})
end
}
assert_equal 0, d.filtered.length
end

def test_key_not_exist_with_emit_invalid_record_to_error
d = create_driver(CONFIG_NOT_IGNORE + "emit_invalid_record_to_error false")
flexmock(d.instance.router).should_receive(:emit_error_event).never
assert_nothing_raised {
d.run do
d.feed(@tag, Fluent::EventTime.now.to_i, {'foo' => 'bar'})
end
}
assert_equal 0, d.filtered.length
end
end
end