Skip to content

Commit

Permalink
Added plugin.id to fish tag log lines related to plugins
Browse files Browse the repository at this point in the history
Fixes #11078
  • Loading branch information
andsel committed Jan 23, 2020
1 parent 8c3ae7e commit 7a22220
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 15 deletions.
6 changes: 3 additions & 3 deletions config/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ name = LogstashPropertiesConfig
appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n

appender.json_console.type = Console
appender.json_console.name = json_console
Expand All @@ -21,7 +21,7 @@ appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
Expand Down Expand Up @@ -144,7 +144,7 @@ appender.deprecation_rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.deprecation_rolling.policies.time.interval = 1
appender.deprecation_rolling.policies.time.modulate = true
appender.deprecation_rolling.layout.type = PatternLayout
appender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n
appender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]}%notEmpty{[%X{plugin.id}]} %m%n
appender.deprecation_rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.deprecation_rolling.policies.size.size = 100MB
appender.deprecation_rolling.strategy.type = DefaultRolloverStrategy
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def start_input(plugin)
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
ThreadContext.put("plugin.id", plugin.id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def start_input(plugin)
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
ThreadContext.put("plugin.id", plugin.id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down
16 changes: 15 additions & 1 deletion logstash-core/spec/logstash/pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def register
end

def run(queue)
@logger.debug("check log4j fish tagging input plugin")
end

def close
Expand Down Expand Up @@ -73,7 +74,9 @@ class DummyFilter < LogStash::Filters::Base

def register() end

def filter(event) end
def filter(event)
@logger.debug("check log4j fish tagging filter plugin")
end

def threadsafe?() false; end

Expand Down Expand Up @@ -243,6 +246,7 @@ def flush(options)
before do
expect(::LogStash::Pipeline).to receive(:logger).and_return(logger)
allow(logger).to receive(:debug?).and_return(true)
allow_any_instance_of(DummyFilter).to receive(:logger).and_return(logger)
end

it "should not receive a debug message with the compiled code" do
Expand All @@ -266,8 +270,18 @@ def flush(options)
pipeline.filter_func([LogStash::Event.new])
pipeline.close
end

it "should log fish tagging of plugins" do
pipeline_settings_obj.set("config.debug", true)
pipeline = mock_pipeline_from_string(test_config_with_filters, pipeline_settings_obj)
expect(logger).to receive(:debug).with(/filter received/, anything)
expect(logger).to receive(:debug).with(/[dummyfilter]/)
pipeline.filter_func([LogStash::Event.new])
pipeline.close
end
end


context "when there is no command line -w N set" do
it "starts one filter thread" do
msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<D
final Closure computeSyntax;
if (parents.isEmpty()) {
clearSyntax = Closure.EMPTY;
computeSyntax = Closure.wrap(invokeOutput(fields.add(output), BATCH_ARG));
computeSyntax = Closure.wrap(setPluginIdForLog4j(output),
invokeOutput(fields.add(output), BATCH_ARG),
unsetPluginIdForLog4j());
} else {
final Collection<ValueSyntaxElement> parentFields =
parents.stream().map(fields::add).collect(Collectors.toList());
Expand All @@ -167,8 +169,10 @@ public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<D
clearSyntax = clearSyntax(parentFields);
}
final ValueSyntaxElement inputBuffer = fields.add(buffer);
computeSyntax = withInputBuffering(
Closure.wrap(invokeOutput(fields.add(output), inputBuffer), inlineClear),
computeSyntax = withInputBuffering(Closure.wrap(
setPluginIdForLog4j(output),
invokeOutput(fields.add(output), inputBuffer), inlineClear,
unsetPluginIdForLog4j()),
parentFields, inputBuffer
);
}
Expand All @@ -184,12 +188,13 @@ private static Closure filterBody(final ValueSyntaxElement outputBuffer,
final ValueSyntaxElement inputBuffer, final ClassFields fields,
final AbstractFilterDelegatorExt plugin) {
final ValueSyntaxElement filterField = fields.add(plugin);
final Closure body = Closure.wrap(
final Closure body = Closure.wrap(setPluginIdForLog4j(plugin),
buffer(outputBuffer, filterField.call("multiFilter", inputBuffer))
);
if (plugin.hasFlush()) {
body.add(callFilterFlush(fields, outputBuffer, filterField, !plugin.periodicFlush()));
}
body.add(unsetPluginIdForLog4j());
return body;
}

Expand Down Expand Up @@ -286,6 +291,24 @@ private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields
);
}

private static MethodLevelSyntaxElement unsetPluginIdForLog4j() {
return () -> "org.apache.logging.log4j.ThreadContext.remove(\"plugin.id\")";
}

private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractFilterDelegatorExt filterPlugin) {
final IRubyObject pluginId = filterPlugin.getId();
return generateLog4jContextAssignment(pluginId);
}

private static MethodLevelSyntaxElement setPluginIdForLog4j(final AbstractOutputDelegatorExt outputPlugin) {
final IRubyObject pluginId = outputPlugin.getId();
return generateLog4jContextAssignment(pluginId);
}

private static MethodLevelSyntaxElement generateLog4jContextAssignment(IRubyObject pluginId) {
return () -> "org.apache.logging.log4j.ThreadContext.put(\"plugin.id\", \"" + pluginId + "\")";
}

private static MethodLevelSyntaxElement clear(final ValueSyntaxElement field) {
return field.call("clear");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.WorkerLoop;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.counter.LongCounter;

import java.util.UUID;

import static org.logstash.RubyUtil.RUBY;

@JRubyClass(name = "FilterDelegator")
public final class FilterDelegatorExt extends AbstractFilterDelegatorExt {

Expand Down Expand Up @@ -44,13 +48,15 @@ public IRubyObject initialize(final ThreadContext context, final IRubyObject fil
}

@VisibleForTesting
public FilterDelegatorExt initForTesting(final IRubyObject filter) {
public FilterDelegatorExt initForTesting(final IRubyObject filter, RubyObject configNameDouble) {
eventMetricOut = LongCounter.DUMMY_COUNTER;
eventMetricIn = LongCounter.DUMMY_COUNTER;
eventMetricTime = LongCounter.DUMMY_COUNTER;
this.filter = filter;
filterMethod = filter.getMetaClass().searchMethod(FILTER_METHOD_NAME);
flushes = filter.respondsTo("flush");
filterClass = configNameDouble.getType();
id = RUBY.newString(UUID.randomUUID().toString());
return this;
}

Expand Down Expand Up @@ -96,8 +102,14 @@ protected IRubyObject getConfigName(final ThreadContext context) {
@Override
@SuppressWarnings({"rawtypes"})
protected RubyArray doMultiFilter(final RubyArray batch) {
return (RubyArray) filterMethod.call(
WorkerLoop.THREAD_CONTEXT.get(), filter, filterClass, FILTER_METHOD_NAME, batch);
final IRubyObject pluginId = this.getId();
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
try {
return (RubyArray) filterMethod.call(
WorkerLoop.THREAD_CONTEXT.get(), filter, filterClass, FILTER_METHOD_NAME, batch);
} finally {
org.apache.logging.log4j.ThreadContext.remove("plugin.id");
}
}

@Override
Expand All @@ -112,6 +124,6 @@ protected boolean getHasFlush() {

@Override
protected boolean getPeriodicFlush() {
return filter.callMethod(RubyUtil.RUBY.getCurrentContext(), "periodic_flush").isTrue();
return filter.callMethod(RUBY.getCurrentContext(), "periodic_flush").isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public IRubyObject start(final ThreadContext context) {
}
Thread t = new Thread(() -> {
org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString());
org.apache.logging.log4j.ThreadContext.put("plugin.id", this.getId(context).toString());
input.start(queueWriter::push);
});
t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import org.logstash.instrument.metrics.AbstractMetricExt;

@JRubyClass(name = "OutputDelegator")
public final class OutputDelegatorExt extends AbstractOutputDelegatorExt {
public final class
OutputDelegatorExt extends AbstractOutputDelegatorExt {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -75,9 +76,13 @@ protected IRubyObject getConcurrency(final ThreadContext context) {
@Override
protected void doOutput(final Collection<JrubyEventExtLibrary.RubyEvent> batch) {
try {
final IRubyObject pluginId = this.getId();
org.apache.logging.log4j.ThreadContext.put("plugin.id", pluginId.toString());
strategy.multiReceive(WorkerLoop.THREAD_CONTEXT.get(), (IRubyObject) batch);
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
} finally {
org.apache.logging.log4j.ThreadContext.remove("plugin.id");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.After;
Expand Down Expand Up @@ -490,9 +491,10 @@ public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithM
@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
final RubyObject configNameDouble = org.logstash.config.ir.PluginConfigNameMethodDouble.create(name);
return new FilterDelegatorExt(
RubyUtil.RUBY, RubyUtil.FILTER_DELEGATOR_CLASS)
.initForTesting(setupPlugin(name, filters));
.initForTesting(setupPlugin(name, filters), configNameDouble);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.logstash.config.ir;

import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;

import static org.logstash.RubyUtil.RUBY;

/**
* Fake class to double the Ruby's method config_name()
* */
@JRubyClass(name = "PluginConfigNameMethodDouble")
public class PluginConfigNameMethodDouble extends RubyObject {

private static final long serialVersionUID = 1L;

static final RubyClass RUBY_META_CLASS;
private RubyString filterName;

static {
RUBY_META_CLASS = RUBY.defineClass("PluginConfigNameMethodDouble", RUBY.getObject(),
PluginConfigNameMethodDouble::new);
RUBY_META_CLASS.defineAnnotatedMethods(org.logstash.config.ir.compiler.FakeOutClass.class);
}

PluginConfigNameMethodDouble(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

static PluginConfigNameMethodDouble create(RubyString filterName) {
final PluginConfigNameMethodDouble instance = new PluginConfigNameMethodDouble(RUBY, RUBY_META_CLASS);
instance.filterName = filterName;
return instance;
}

@JRubyMethod
public IRubyObject name(final ThreadContext context) {
return RUBY.newString("example");
}

@JRubyMethod(name = "config_name")
public IRubyObject configName(final ThreadContext context, final IRubyObject recv) {
return filterName;
}

@JRubyMethod
public IRubyObject initialize(final ThreadContext context, final IRubyObject args) {
return this;
}
}
18 changes: 18 additions & 0 deletions qa/integration/fixtures/plugin_name_log_spec.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
services:
- logstash
config: |-
input {
generator {
count => 4
}
}
filter {
sleep {
id => "sleep_filter_123"
time => 1
}
}
output {
null {}
}
Loading

0 comments on commit 7a22220

Please sign in to comment.