Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: event factory support #13017

Merged
merged 17 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require "logstash/config/mixin"
require "logstash/plugins/ecs_compatibility_support"
require "concurrent"
require "logstash/plugins/event_factory_support"
require "securerandom"

require_relative 'plugin_metadata'
Expand All @@ -31,6 +32,7 @@ class LogStash::Plugin

include LogStash::Config::Mixin
include LogStash::Plugins::ECSCompatibilitySupport
include LogStash::Plugins::EventFactorySupport

# Disable or enable metric logging for this specific plugin instance
# by default we record all the metrics we can, but you can disable metrics collection
Expand Down
56 changes: 56 additions & 0 deletions logstash-core/lib/logstash/plugins/event_factory_support.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
require 'logstash/util/thread_safe_attributes'

module LogStash
module Plugins
module EventFactorySupport

include LogStash::Util::ThreadSafeAttributes


lazy_init_attr :event_factory do
create_event_factory
end

lazy_init_attr :targeted_event_factory do
raise ArgumentError.new('config.target not present') unless respond_to?(:target)
target.nil? ? event_factory : TargetedEventFactory(event_factory, target)
end

private

# @private Internal API
def create_event_factory
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we drop this method for now? to align with the support mixin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's private, so not necessarily something we need to align on. It is also a good hook point for future efforts in LS core that don't need backport support (things like alternate Event implementation experiments like copy-on-write or field reference parser changes), so I expect even if we remove it now we will add it back in short order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, sticking around just wanted to double check given what private means in Ruby land 😉

BasicEventFactory::INSTANCE
end

class BasicEventFactory
INSTANCE = new

# @param payload [Hash]
# @return [LogStash::Event]
def new_event(payload)
LogStash::Event.new(payload)
end

end
private_constant :BasicEventFactory

class TargetedEventFactory

def initialize(inner, target)
@delegate = inner
@target = target
kares marked this conversation as resolved.
Show resolved Hide resolved
kares marked this conversation as resolved.
Show resolved Hide resolved
end

# @param payload [Hash]
# @return [LogStash::Event]
def new_event(payload)
@delegate.new_event(@target => payload)
end

end
private_constant :TargetedEventFactory

end
end
end
51 changes: 51 additions & 0 deletions logstash-core/lib/logstash/util/thread_safe_attributes.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

module LogStash
module Util
module ThreadSafeAttributes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 clean reusable implementation, really cleans up the details of using them.


def self.included(base)
base.extend ClassMethods
end

module ClassMethods

def lazy_init_attr(attribute, &block)
raise ArgumentError.new("invalid attribute name: #{attribute}") unless attribute.match? /^[_A-Za-z]\w*$/
raise ArgumentError.new('no block given') unless block_given?
var_name = "@#{attribute}".to_sym
send(:define_method, attribute.to_sym) do
if instance_variable_defined?(var_name)
instance_variable_get(var_name)
else
LogStash::Util.synchronize(self) do
if instance_variable_defined?(var_name)
instance_variable_get(var_name)
else
instance_variable_set(var_name, instance_eval(&block))
end
end
end
end
end

end

end
end
end
12 changes: 8 additions & 4 deletions logstash-core/spec/logstash/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,8 @@
end

it "should consistently handle nil" do
blank_strings.each do |s|
expect{LogStash::Event.from_json(nil)}.to raise_error
expect{LogStash::Event.new(LogStash::Json.load(nil))}.to raise_error
end
expect{LogStash::Event.from_json(nil)}.to raise_error # TypeError
expect{LogStash::Event.new(LogStash::Json.load(nil))}.to raise_error # java.lang.ClassCastException
end

it "should consistently handle bare string" do
Expand All @@ -331,6 +329,12 @@
expect{LogStash::Event.new(LogStash::Json.load(s))}.to raise_error LogStash::Json::ParserError
end
end

it "should allow to pass a block that acts as an event factory" do
events = LogStash::Event.from_json(source_json) { |data| LogStash::Event.new(data).tap { |e| e.set('answer', 42) } }
expect( events.size ).to eql 1
expect( events.first.get('answer') ).to eql 42
end
end

context "initialize" do
Expand Down
66 changes: 49 additions & 17 deletions logstash-core/src/main/java/org/logstash/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -253,30 +252,52 @@ public String toJson() throws JsonProcessingException {
private static final Event[] NULL_ARRAY = new Event[0];

@SuppressWarnings("unchecked")
public static Event[] fromJson(String json)
throws IOException
{
private static Object parseJson(final String json) throws IOException {
return JSON_MAPPER.readValue(json, Object.class);
}

/**
* Map a JSON string into events.
* @param json input string
* @return events
* @throws IOException when (JSON) parsing fails
*/
public static Event[] fromJson(final String json) throws IOException {
return fromJson(json, EventFactory.DEFAULT);
}

/**
* Map a JSON string into events.
* @param json input string
* @param factory event factory
* @return events
* @throws IOException when (JSON) parsing fails
*/
@SuppressWarnings("unchecked")
public static Event[] fromJson(final String json, final EventFactory factory) throws IOException {
// empty/blank json string does not generate an event
if (json == null || json.trim().isEmpty()) {
if (json == null || isBlank(json)) {
return NULL_ARRAY;
}

Event[] result;
Object o = JSON_MAPPER.readValue(json, Object.class);
Object o = parseJson(json);
// we currently only support Map or Array json objects
if (o instanceof Map) {
result = new Event[]{ new Event((Map<String, Object>)o) };
} else if (o instanceof List) {
final Collection<Map<String, Object>> list = (Collection<Map<String, Object>>) o;
result = new Event[list.size()];
int i = 0;
for (final Map<String, Object> e : list) {
result[i++] = new Event(e);
}
} else {
throw new IOException("incompatible json object type=" + o.getClass().getName() + " , only hash map or arrays are supported");
return new Event[] { factory.newEvent((Map<String, Object>) o) };
}
if (o instanceof List) { // Jackson returns an ArrayList
return fromList((List<Map<String, Object>>) o, factory);
}

throw new IOException("incompatible json object type=" + o.getClass().getName() + " , only hash map or arrays are supported");
}

private static Event[] fromList(final List<Map<String, Object>> list, final EventFactory factory) {
final int len = list.size();
Event[] result = new Event[len];
for (int i = 0; i < len; i++) {
result[i] = factory.newEvent(list.get(i));
}
return result;
}

Expand Down Expand Up @@ -359,6 +380,17 @@ public String toString() {
: hostMessageString;
}

private static boolean isBlank(final String str) {
final int len = str.length();
if (len == 0) return true;
for (int i = 0; i < len; i++) {
if (!Character.isWhitespace(str.charAt(i))) {
return false;
}
}
return true;
}

private static Timestamp initTimestamp(Object o) {
if (o == null || o instanceof RubyNil) {
// most frequent
Expand Down
42 changes: 42 additions & 0 deletions logstash-core/src/main/java/org/logstash/EventFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash;

import java.util.Collections;
import java.util.Map;

/**
* A factory for events.
*/
@FunctionalInterface
public interface EventFactory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new interface very nearly is implemented by the co.elastic.logstash.api.EventFactory that is available to java-based plugins through co.elastic.logstash.api.Context which is provided to the constructor. Can we converge without breaking consumers of co.elastic.logstash.api.EventFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaauie 💯 thanks I completely missed the existing EventFactory - for sure, no reason to re-invent the wheel!
the only down-side is org.logstash.Event.fromJson assumes the concrete implementation being returned, thus we'll assume and cast the returned events.


Event newEvent(Map<String, Object> data);

default Event newEvent() {
return newEvent(Collections.emptyMap());
}

/**
* A default event factory implementation.
*/
EventFactory DEFAULT = (data) -> new Event(data);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
Expand All @@ -35,10 +34,12 @@
import org.jruby.exceptions.RaiseException;
import org.jruby.java.proxies.MapJavaProxy;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ConvertedMap;
import org.logstash.Event;
import org.logstash.EventFactory;
import org.logstash.FieldReference;
import org.logstash.RubyUtil;
import org.logstash.Rubyfier;
Expand Down Expand Up @@ -196,7 +197,7 @@ public IRubyObject ruby_sprintf(ThreadContext context, IRubyObject format) {
try {
return RubyString.newString(context.runtime, event.sprintf(format.toString()));
} catch (IOException e) {
throw RaiseException.from(getRuntime(), RubyUtil.LOGSTASH_ERROR, "timestamp field is missing");
throw toRubyError(context, RubyUtil.LOGSTASH_ERROR, "timestamp field is missing", e);
}
}

Expand All @@ -223,8 +224,7 @@ public IRubyObject ruby_to_hash_with_metadata(ThreadContext context) {
}

@JRubyMethod(name = "to_java")
public IRubyObject ruby_to_java(ThreadContext context)
{
public IRubyObject ruby_to_java(ThreadContext context) {
return JavaUtil.convertJavaToUsableRubyObject(context.runtime, this.event);
}

Expand All @@ -234,21 +234,29 @@ public IRubyObject ruby_to_json(ThreadContext context, IRubyObject[] args)
try {
return RubyString.newString(context.runtime, event.toJson());
} catch (Exception e) {
throw RaiseException.from(context.runtime, RubyUtil.GENERATOR_ERROR, e.getMessage());
throw toRubyError(context, RubyUtil.GENERATOR_ERROR, e);
}
}

// @param value [String] the json string. A json object/map will newFromRubyArray to an array containing a single Event.
// and a json array will newFromRubyArray each element into individual Event
// @return Array<Event> array of events
@JRubyMethod(name = "from_json", required = 1, meta = true)
public static IRubyObject ruby_from_json(ThreadContext context, IRubyObject recv, RubyString value)
{
public static IRubyObject ruby_from_json(ThreadContext context, IRubyObject recv, RubyString value, final Block block) {
yaauie marked this conversation as resolved.
Show resolved Hide resolved
if (!block.isGiven()) return fromJson(context, value, EventFactory.DEFAULT);
return fromJson(context, value, (data) -> {
// LogStash::Event works fine with a Map arg (instead of a native Hash)
IRubyObject event = block.yield(context, RubyUtil.toRubyObject(data));
return ((RubyEvent) event).getEvent(); // we unwrap just to re-wrap later
});
}

private static IRubyObject fromJson(ThreadContext context, RubyString json, EventFactory eventFactory) {
Event[] events;
try {
events = Event.fromJson(value.asJavaString());
events = Event.fromJson(json.asJavaString(), eventFactory);
} catch (Exception e) {
throw RaiseException.from(context.runtime, RubyUtil.PARSER_ERROR, e.getMessage());
throw toRubyError(context, RubyUtil.PARSER_ERROR, e);
}

if (events.length == 1) {
Expand Down Expand Up @@ -371,5 +379,16 @@ private static int nextHash() {
final long sequence = SEQUENCE_GENERATOR.incrementAndGet();
return (int) (sequence ^ sequence >>> 32) + 31;
}

private static RaiseException toRubyError(ThreadContext context, RubyClass type, Exception e) {
return toRubyError(context, type, e.getMessage(), e);
}

private static RaiseException toRubyError(ThreadContext context, RubyClass type, String message, Exception e) {
RaiseException ex = RaiseException.from(context.runtime, type, message);
ex.initCause(e);
return ex;
}

}
}
Loading