Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: event target => namespace support (for ECS) #37

Merged
merged 19 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.1.0
- Feat: event `target => namespace` support (for ECS) [#37](https://github.com/logstash-plugins/logstash-codec-json/pull/37)
- Refactor: dropped support for old Logstash versions (< 6.0)

## 3.0.5
- Update gemspec summary

Expand Down
34 changes: 33 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ failure, the payload will be stored in the `message` field.
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-charset>> |<<string,string>>, one of `["ASCII-8BIT", "UTF-8", "US-ASCII", "Big5", "Big5-HKSCS", "Big5-UAO", "CP949", "Emacs-Mule", "EUC-JP", "EUC-KR", "EUC-TW", "GB2312", "GB18030", "GBK", "ISO-8859-1", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "KOI8-R", "KOI8-U", "Shift_JIS", "UTF-16BE", "UTF-16LE", "UTF-32BE", "UTF-32LE", "Windows-31J", "Windows-1250", "Windows-1251", "Windows-1252", "IBM437", "IBM737", "IBM775", "CP850", "IBM852", "CP852", "IBM855", "CP855", "IBM857", "IBM860", "IBM861", "IBM862", "IBM863", "IBM864", "IBM865", "IBM866", "IBM869", "Windows-1258", "GB1988", "macCentEuro", "macCroatian", "macCyrillic", "macGreek", "macIceland", "macRoman", "macRomania", "macThai", "macTurkish", "macUkraine", "CP950", "CP951", "IBM037", "stateless-ISO-2022-JP", "eucJP-ms", "CP51932", "EUC-JIS-2004", "GB12345", "ISO-2022-JP", "ISO-2022-JP-2", "CP50220", "CP50221", "Windows-1256", "Windows-1253", "Windows-1255", "Windows-1254", "TIS-620", "Windows-874", "Windows-1257", "MacJapanese", "UTF-7", "UTF8-MAC", "UTF-16", "UTF-32", "UTF8-DoCoMo", "SJIS-DoCoMo", "UTF8-KDDI", "SJIS-KDDI", "ISO-2022-JP-KDDI", "stateless-ISO-2022-JP-KDDI", "UTF8-SoftBank", "SJIS-SoftBank", "BINARY", "CP437", "CP737", "CP775", "IBM850", "CP857", "CP860", "CP861", "CP862", "CP863", "CP864", "CP865", "CP866", "CP869", "CP1258", "Big5-HKSCS:2008", "ebcdic-cp-us", "eucJP", "euc-jp-ms", "EUC-JISX0213", "eucKR", "eucTW", "EUC-CN", "eucCN", "CP936", "ISO2022-JP", "ISO2022-JP2", "ISO8859-1", "ISO8859-2", "ISO8859-3", "ISO8859-4", "ISO8859-5", "ISO8859-6", "CP1256", "ISO8859-7", "CP1253", "ISO8859-8", "CP1255", "ISO8859-9", "CP1254", "ISO8859-10", "ISO8859-11", "CP874", "ISO8859-13", "CP1257", "ISO8859-14", "ISO8859-15", "ISO8859-16", "CP878", "MacJapan", "ASCII", "ANSI_X3.4-1968", "646", "CP65000", "CP65001", "UTF-8-MAC", "UTF-8-HFS", "UCS-2BE", "UCS-4BE", "UCS-4LE", "CP932", "csWindows31J", "SJIS", "PCK", "CP1250", "CP1251", "CP1252", "external", "locale"]`|No
| <<plugins-{type}s-{plugin}-ecs_compatibility>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
|=======================================================================

&nbsp;
Expand All @@ -59,4 +61,34 @@ actual encoding of the text and Logstash will convert it for you.

For nxlog users, you may to set this to "CP1252".


[id="plugins-{type}s-{plugin}-ecs_compatibility"]
===== `ecs_compatibility`

* Value type is <<string,string>>
* Supported values are:
** `disabled`: JSON document data added at root level
** `v1`,`v8`: Elastic Common Schema compliant behavior (warns when `target` isn't set)
* Default value depends on which version of Logstash is running:
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
** Otherwise, the default value is `disabled`

Controls this plugin's compatibility with the {ecs-ref}[Elastic Common Schema (ECS)].

[id="plugins-{type}s-{plugin}-target"]
===== `target`

* Value type is <<string,string>>
* There is no default value for this setting.

Define the target field for placing the parsed data. If this setting is not
set, the JSON data will be stored at the root (top level) of the event.

For example, if you want data to be put under the `document` field:
[source,ruby]
input {
http {
codec => json {
target => "[document]"
}
}
}
81 changes: 44 additions & 37 deletions lib/logstash/codecs/json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
require "logstash/util/charset"
require "logstash/json"
require "logstash/event"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
require 'logstash/plugin_mixins/event_support/event_factory_adapter'
require 'logstash/plugin_mixins/event_support/from_json_helper'

# This codec may be used to decode (via inputs) and encode (via outputs)
# full JSON messages. If the data being sent is a JSON array at its root multiple events will be created (one per element).
Expand All @@ -16,10 +21,19 @@
# it will fall back to plain text and add a tag `_jsonparsefailure`. Upon a JSON
# failure, the payload will be stored in the `message` field.
class LogStash::Codecs::JSON < LogStash::Codecs::Base

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck

extend LogStash::PluginMixins::ValidatorSupport::FieldReferenceValidationAdapter

include LogStash::PluginMixins::EventSupport::EventFactoryAdapter
include LogStash::PluginMixins::EventSupport::FromJsonHelper

config_name "json"

# The character encoding used in this codec. Examples include "UTF-8" and
# "CP1252".
# The character encoding used in this codec.
# Examples include "UTF-8" and "CP1252".
#
# JSON requires valid UTF-8 strings, but in some cases, software that
# emits JSON does so in another encoding (nxlog, for example). In
Expand All @@ -29,13 +43,26 @@ class LogStash::Codecs::JSON < LogStash::Codecs::Base
# For nxlog users, you may to set this to "CP1252".
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"

def register
# Defines a target field for placing decoded fields.
# If this setting is omitted, data gets stored at the root (top level) of the event.
# The target is only relevant while decoding data into a new event.
config :target, :validate => :field_reference
jsvd marked this conversation as resolved.
Show resolved Hide resolved

def initialize(*params)
super

@original_field = ecs_select[disabled: nil, v1: '[event][original]']

@converter = LogStash::Util::Charset.new(@charset)
@converter.logger = @logger
end

def register
# no-op
end

def decode(data, &block)
parse(@converter.convert(data), &block)
parse_json(@converter.convert(data), &block)
end

def encode(event)
Expand All @@ -44,42 +71,22 @@ def encode(event)

private

def from_json_parse(json, &block)
LogStash::Event.from_json(json).each { |event| yield event }
rescue LogStash::Json::ParserError => e
@logger.error("JSON parse error, original data now in message field", :error => e, :data => json)
yield LogStash::Event.new("message" => json, "tags" => ["_jsonparsefailure"])
end

def legacy_parse(json, &block)
decoded = LogStash::Json.load(json)

case decoded
when Array
decoded.each {|item| yield(LogStash::Event.new(item)) }
when Hash
yield LogStash::Event.new(decoded)
def parse_json(json)
events = events_from_json(json, targeted_event_factory)
if events.size == 1
event = events.first
event.set(@original_field, json) if @original_field
yield event
else
@logger.error("JSON codec is expecting array or object/map", :data => json)
yield LogStash::Event.new("message" => json, "tags" => ["_jsonparsefailure"])
events.each { |event| yield event }
end
rescue LogStash::Json::ParserError => e
@logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => json)
yield LogStash::Event.new("message" => json, "tags" => ["_jsonparsefailure"])
rescue StandardError => e
# This should NEVER happen. But hubris has been the cause of many pipeline breaking things
# If something bad should happen we just don't want to crash logstash here.
@logger.warn(
"An unexpected error occurred parsing JSON data",
:data => json,
:message => e.message,
:class => e.class.name,
:backtrace => e.backtrace
)
rescue => e
@logger.error("JSON parse error, original data now in message field", message: e.message, exception: e.class, data: json)
yield parse_json_error_event(json)
end

# keep compatibility with all v2.x distributions. only in 2.3 will the Event#from_json method be introduced
# and we need to keep compatibility for all v2 releases.
alias_method :parse, LogStash::Event.respond_to?(:from_json) ? :from_json_parse : :legacy_parse
def parse_json_error_event(json)
event_factory.new_event("message" => json, "tags" => ["_jsonparsefailure"])
end

end
7 changes: 6 additions & 1 deletion logstash-codec-json.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-json'
s.version = '3.0.5'
s.version = '3.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads JSON formatted content, creating one event per element in a JSON array"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -19,8 +19,13 @@ Gem::Specification.new do |s|
# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "codec" }

s.required_ruby_version = '>= 2.3' # Event.from_json exists at least since LS 5.6

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3'
s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'insist'
Expand Down
110 changes: 93 additions & 17 deletions spec/codecs/json_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
require "logstash/event"
require "logstash/json"
require "insist"
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'

describe LogStash::Codecs::JSON, :ecs_compatibility_support do

let(:options) { Hash.new }

describe LogStash::Codecs::JSON do
subject do
LogStash::Codecs::JSON.new
LogStash::Codecs::JSON.new(options)
end

shared_examples :codec do
Expand Down Expand Up @@ -127,7 +131,7 @@

context "when json could not be parsed" do

let(:message) { "random_message" }
let(:message) { "random_message" }

it "add the failure tag" do
subject.decode(message) do |event|
Expand All @@ -147,6 +151,78 @@
end
end
end

ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select|

before(:each) do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
end

context "with target" do

let(:options) { super().merge('target' => 'root') }

let(:message) { ' { "foo": "bar", "baz": { "0": [1, 2, 3], "1": true } } ' }

context 'sample json' do

let(:json) { '{ "foo": "bar", "baz": { "0": [1, 2, 3], "1": true } } ' }

it "yields an event" do
count = 0
subject.decode(json) do |event|
count += 1
expect( event.include?("foo") ).to be false
expect( event.include?("baz") ).to be false
expect( event.get("[root][foo]") ).to eql 'bar'
expect( event.get("[root][baz]")['1'] ).to be true
end
expect( count ).to eql 1
end

it 'set event.original in ECS mode' do
subject.decode(json) do |event|
if ecs_select.active_mode == :disabled
expect( event.get("[event][original]") ).to be nil
else
expect( event.get("[event][original]") ).to eql json
end
end
end

end

context 'json array' do

let(:json) { '[ {"foo": "bar"}, {"baz": { "v": 1.0 } }, {}]' }

it "yields multiple events" do
count = 0
subject.decode(json) do |event|
expect( event.include?("foo") ).to be false
expect( event.include?("baz") ).to be false
count += 1
case count
when 1
expect( event.get("[root][foo]") ).to eql 'bar'
when 2
expect( event.get("[root][baz]") ).to eql 'v' => 1.0
end
end
expect( count ).to eql 3
end

it 'does not set event.original' do
subject.decode(json) do |event|
expect( event.include?("[event][original]") ).to be false
end
end

end

end

end
end

context "#encode" do
Expand All @@ -165,27 +241,27 @@
insist { got_event }
end
end
end

context "forcing legacy parsing" do
it_behaves_like :codec do
before(:each) do
# stub codec parse method to force use of the legacy parser.
# this is very implementation specific but I am not sure how
# this can be tested otherwise.
allow(subject).to receive(:parse) do |data, &block|
subject.send(:legacy_parse, data, &block)
context "target" do
it "should return json data" do
data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}}
event = LogStash::Event.new(data)
got_event = false
subject.on_event do |e, d|
insist { d.chomp } == event.to_json
insist { LogStash::Json.load(d)["foo"] } == data["foo"]
insist { LogStash::Json.load(d)["baz"] } == data["baz"]
insist { LogStash::Json.load(d)["bah"] } == data["bah"]
got_event = true
end
subject.encode(event)
insist { got_event }
end
end
end

context "default parser choice" do
# here we cannot force the use of the Event#from_json since if this test is run in the
# legacy context (no Java Event) it will fail but if in the new context, it will be picked up.
it_behaves_like :codec do
# do nothing
end
it_behaves_like :codec
end

end