-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preserve input event ordering with single pipeline worker #11020
Conversation
34bd774
to
05d232c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it should work.
I have left comments in-line about some issues I have with potential fragility in tests and a minor issue with the sequence generation, and opened up a conversation about whether this ordering guarantee should be made opt-in at some point (or opt-out).
@@ -238,7 +238,7 @@ def start_workers | |||
Util.set_thread_name("[#{pipeline_id}]>worker#{t}") | |||
org.logstash.execution.WorkerLoop.new( | |||
lir_execution, filter_queue_client, @events_filtered, @events_consumed, | |||
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick, and potentially orthogonal to this fix:
I see value in ordering guarantees being made explicit opt-in at some point (or at least to make room for an explicit opt-out), since in many cases it doesn't matter and making ordering guarantees incurs overhead.
For this I propose a new pipeline setting pipeline.ordered
; in 7.x, this should default to true IFF pipeline.workers==1
to preserve the observed behaviour of the Ruby engine, but in 8.x the setting would need to be explicitly specified to incur the overhead to get the guarantee.
v7.x | v8.x | |
---|---|---|
default | pipeline.workers == 1 |
false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That occurred to me, too. No need to incur the overhead if event ordering is unimportant, but definitely something that should be done in a separate PR.
logstash-core/src/main/java/org/logstash/ext/JrubyEventExtLibrary.java
Outdated
Show resolved
Hide resolved
logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java
Outdated
Show resolved
Hide resolved
|
||
@Test | ||
public void ignoresInputEventInstantiationOrder() throws Exception { | ||
testEventOrder(false, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test assert that event instantiation is ignored, but then goes on to test that an out-of-order batch has its ordering preserved.
I don't feel that this matches well to the guarantees that we are attempting to make.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests that the overhead of event ordering is not present when not requested. Do you think that this doesn't need to be tested or that it should be tested differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This tests verifies the current stable observed behaviour of event re-ordering caused by this specific pipeline's structure, which means it is susceptible to failing when unrelated changes happen. Since we make no ordering guarantees in this case, validating a specific out-of-order ordering with a test named to indicate that it ignores ordering feels like a mismatch.
I would be fine with the test being named something like doesNotIncurEventOrderingOverhead
along with a comment indicating that the ordering we are checking is the observed (and stable) behaviour of events coming into the sync from the pipeline as-defined.
logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java
Outdated
Show resolved
Hide resolved
logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java
Show resolved
Hide resolved
7498f1b
to
1358ec2
Compare
It occurs to me that some use-cases are still fundamentally broken under the Java Execution Engine, even if we can guarantee that each node receives elements from the batch in order because we do not strictly pass one event through the pipeline before processing the next event in the batch. Consider the primary use-case in documentation of the Aggregate Filter Plugin, we cannot guarantee that the separate aggregate filters processing the events receive the events in order:
|
I'm not sure I follow. I tried the aggregate filter example you listed and it worked as expected. |
I had to dig a fair bit to replicate my intuition here; the long and the short is that ordering of statements can supersede ordering of events in both JEE and REE, and that https://gist.github.com/yaauie/82e199687c490a2e04fe1f0c9d9a7fee For the code as pasted from the docs, with separate and unchained |
My initial attempt to preserve event ordering was based on the simplest possible scenario in which there was a single branch in the pipeline execution. Now that I see that it's a general problem with branching and there is, of course, no limit to either the number or nesting of branches in a pipeline, this problem is substantially more difficult and the initial approach I took won't work. The way that the Java execution engine handles both branching and batches would need to be changed pretty dramatically, likely with negative performance effects for the typical use case of multiple pipeline workers and no expectation of event ordering. Fortunately, the Java execution engine does still preserve event ordering when both the number of pipeline workers and the batch size is 1. This does reduce throughput, but the JEE still performs almost twice as fast as the REE in that scenario. Given the difficulty and likely negative performance implications for the more common use case of multiple pipeline workers with no expectation of event ordering, I propose we document that |
@danhermann The workaround I'm currently using is to set |
@kartiksubbarao, as you noted, the Ruby execution engine preserves event ordering when used with a single pipeline worker. There are some scenarios in which a higher batch size in the Ruby execution engine will perform better than a batch size of 1 in the Java execution engine. It sounds like your scenario is one of them. |
@danhermann
I am under the impression that not being able to bulk send at the output might be more costly than forcing single element processing. We know for example that the elasticsearch output throughput is tightly related to the bulk size, thus the batch size. Would it be possible to not have to force a batch size of 1 but instead, as suggested by @yaauie, add a new option such as |
We have done a number of tests where we compare a batch size of one in the JEE to not limiting the batch size with REE. I can indeed confirm that we have pipelines which cannot keep up with the incoming flow of data when using the JEE, but that works absolutely fine when using the REE. The improved performance of JEE in this case is therefore not enough to offset the performance hit of not batching properly. |
@lassebv yes, setting a batch size of 1, in the case of a configuration using the elasticsearch output, will result in bulk request size of 1 into elasticsearch which will significantly reduce the indexing throughput into elasticsearch. |
Closing as this approach will not work. |
In cases where an LIR output node has multiple input parent nodes such as a filter node with a conditional, the Java execution engine transfers the events from each of the parent nodes to the output node in succession. In most cases, this presents no problem because either the function of the parent conditional node is to selectively send events to the output or because there are multiple pipeline workers in which case there can be no expectation of event ordering.
This does break event ordering in the case where there is a single pipeline worker and an output node has multiple parent nodes that do not function to selectively send events to the output (see #10938 for an example). This PR adds code to preserve event ordering in that specific scenario by sorting all the events by the order in which they were emitted from their respective parent nodes before adding them to the output batch. While the sort algorithm that is used is optimized for the use case of sorting concatenated sorted arrays, there is obviously a runtime cost associated with preserving this event ordering. As such, this code is generated only for pipelines with a single worker since that is the only scenario in which event ordering can be guaranteed.
Fixes #10938.