Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve input event ordering with single pipeline worker #11020

Closed
wants to merge 2 commits into from

Conversation

danhermann
Copy link
Contributor

@danhermann danhermann commented Aug 6, 2019

In cases where an LIR output node has multiple input parent nodes such as a filter node with a conditional, the Java execution engine transfers the events from each of the parent nodes to the output node in succession. In most cases, this presents no problem because either the function of the parent conditional node is to selectively send events to the output or because there are multiple pipeline workers in which case there can be no expectation of event ordering.

This does break event ordering in the case where there is a single pipeline worker and an output node has multiple parent nodes that do not function to selectively send events to the output (see #10938 for an example). This PR adds code to preserve event ordering in that specific scenario by sorting all the events by the order in which they were emitted from their respective parent nodes before adding them to the output batch. While the sort algorithm that is used is optimized for the use case of sorting concatenated sorted arrays, there is obviously a runtime cost associated with preserving this event ordering. As such, this code is generated only for pipelines with a single worker since that is the only scenario in which event ordering can be guaranteed.

Fixes #10938.

@danhermann danhermann force-pushed the 10938_event_ordering branch from 34bd774 to 05d232c Compare August 6, 2019 17:33
Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it should work.

I have left comments in-line about some issues I have with potential fragility in tests and a minor issue with the sequence generation, and opened up a conversation about whether this ordering guarantee should be made opt-in at some point (or opt-out).

@@ -238,7 +238,7 @@ def start_workers
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
org.logstash.execution.WorkerLoop.new(
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick, and potentially orthogonal to this fix:

I see value in ordering guarantees being made explicit opt-in at some point (or at least to make room for an explicit opt-out), since in many cases it doesn't matter and making ordering guarantees incurs overhead.

For this I propose a new pipeline setting pipeline.ordered; in 7.x, this should default to true IFF pipeline.workers==1 to preserve the observed behaviour of the Ruby engine, but in 8.x the setting would need to be explicitly specified to incur the overhead to get the guarantee.

v7.x v8.x
default pipeline.workers == 1 false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That occurred to me, too. No need to incur the overhead if event ordering is unimportant, but definitely something that should be done in a separate PR.


@Test
public void ignoresInputEventInstantiationOrder() throws Exception {
testEventOrder(false, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test assert that event instantiation is ignored, but then goes on to test that an out-of-order batch has its ordering preserved.

I don't feel that this matches well to the guarantees that we are attempting to make.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests that the overhead of event ordering is not present when not requested. Do you think that this doesn't need to be tested or that it should be tested differently?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests verifies the current stable observed behaviour of event re-ordering caused by this specific pipeline's structure, which means it is susceptible to failing when unrelated changes happen. Since we make no ordering guarantees in this case, validating a specific out-of-order ordering with a test named to indicate that it ignores ordering feels like a mismatch.

I would be fine with the test being named something like doesNotIncurEventOrderingOverhead along with a comment indicating that the ordering we are checking is the observed (and stable) behaviour of events coming into the sync from the pipeline as-defined.

@danhermann danhermann force-pushed the 10938_event_ordering branch from 7498f1b to 1358ec2 Compare August 6, 2019 21:39
@yaauie
Copy link
Member

yaauie commented Aug 7, 2019

It occurs to me that some use-cases are still fundamentally broken under the Java Execution Engine, even if we can guarantee that each node receives elements from the batch in order because we do not strictly pass one event through the pipeline before processing the next event in the batch.

Consider the primary use-case in documentation of the Aggregate Filter Plugin, we cannot guarantee that the separate aggregate filters processing the events receive the events in order:

filter {
   grok {
     match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
   }

   if [logger] == "TASK_START" {
     aggregate {
       task_id => "%{taskid}"
       code => "map['sql_duration'] = 0"
       map_action => "create"
     }
   }

   if [logger] == "SQL" {
     aggregate {
       task_id => "%{taskid}"
       code => "map['sql_duration'] += event.get('duration')"
       map_action => "update"
     }
   }

   if [logger] == "TASK_END" {
     aggregate {
       task_id => "%{taskid}"
       code => "event.set('sql_duration', map['sql_duration'])"
       map_action => "update"
       end_of_task => true
       timeout => 120
     }
   }
 }

@danhermann
Copy link
Contributor Author

It occurs to me that some use-cases are still fundamentally broken under the Java Execution Engine, even if we can guarantee that each node receives elements from the batch in order because we do not strictly pass one event through the pipeline before processing the next event in the batch.

I'm not sure I follow. I tried the aggregate filter example you listed and it worked as expected.

@yaauie
Copy link
Member

yaauie commented Aug 7, 2019

I had to dig a fair bit to replicate my intuition here; the long and the short is that ordering of statements can supersede ordering of events in both JEE and REE, and that else if chaining mitigates that on REE but not on JEE, even with this patch set.

https://gist.github.com/yaauie/82e199687c490a2e04fe1f0c9d9a7fee

For the code as pasted from the docs, with separate and unchained if statements that are mutually exclusive, both JEE and REE process each statement for all events in the batch before proceeding to the next statement, again for all matching events in the batch; this makes it very important to specify those blocks in a specific order. With else if chaining, REE will pass all elements through the chain in order, which means that the order of the clauses will determine which branch is executed but will not change the order in which they are executed.

@danhermann
Copy link
Contributor Author

I had to dig a fair bit to replicate my intuition here; the long and the short is that ordering of statements can supersede ordering of events in both JEE and REE, and that else if chaining mitigates that on REE but not on JEE, even with this patch set.

https://gist.github.com/yaauie/82e199687c490a2e04fe1f0c9d9a7fee

For the code as pasted from the docs, with separate and unchained if statements that are mutually exclusive, both JEE and REE process each statement for all events in the batch before proceeding to the next statement, again for all matching events in the batch; this makes it very important to specify those blocks in a specific order. With else if chaining, REE will pass all elements through the chain in order, which means that the order of the clauses will determine which branch is executed but will not change the order in which they are executed.

My initial attempt to preserve event ordering was based on the simplest possible scenario in which there was a single branch in the pipeline execution. Now that I see that it's a general problem with branching and there is, of course, no limit to either the number or nesting of branches in a pipeline, this problem is substantially more difficult and the initial approach I took won't work. The way that the Java execution engine handles both branching and batches would need to be changed pretty dramatically, likely with negative performance effects for the typical use case of multiple pipeline workers and no expectation of event ordering.

Fortunately, the Java execution engine does still preserve event ordering when both the number of pipeline workers and the batch size is 1. This does reduce throughput, but the JEE still performs almost twice as fast as the REE in that scenario.

Given the difficulty and likely negative performance implications for the more common use case of multiple pipeline workers with no expectation of event ordering, I propose we document that batch size => 1 is required when running with the JEE if event ordering is desired.

@kartiksubbarao
Copy link

kartiksubbarao commented Aug 16, 2019

@danhermann The workaround I'm currently using is to set pipeline.java_execution: false, but I'm able to keep pipeline.batch.size: 1000 in logstash.yml. I don't think this is just luck, since I have a number of independent if statements in the config. With the Java engine the ordering gets noticeably messed up, but with the Ruby engine the ordering is consistently preserved (with pipeline.workers: 1 of course). It seems that Ruby with a batch size of 1000 would generally outperform Java with a batch size of 1. Am I missing something here?

@danhermann
Copy link
Contributor Author

@danhermann The workaround I'm currently using is to set pipeline.java_execution: false, but I'm able to keep pipeline.batch.size: 1000 in logstash.yml. I don't think this is just luck, since I have a number of independent if statements in the config. With the Java engine the ordering gets noticeably messed up, but with the Ruby engine the ordering is consistently preserved (with pipeline.workers: 1 of course). It seems that Ruby with a batch size of 1000 would generally outperform Java with a batch size of 1. Am I missing something here?

@kartiksubbarao, as you noted, the Ruby execution engine preserves event ordering when used with a single pipeline worker. There are some scenarios in which a higher batch size in the Ruby execution engine will perform better than a batch size of 1 in the Java execution engine. It sounds like your scenario is one of them.

@colinsurprenant
Copy link
Contributor

@danhermann
There are two problems with forcing a batch size of 1:

  • The performance hit of processing a small batch through the pipeline
  • The performance hit at the output for not being able to use the batch for bulking outputs such a the elasticsearch output.

I am under the impression that not being able to bulk send at the output might be more costly than forcing single element processing. We know for example that the elasticsearch output throughput is tightly related to the bulk size, thus the batch size.

Would it be possible to not have to force a batch size of 1 but instead, as suggested by @yaauie, add a new option such as pipeline.ordered: true which, in a first implementation, could functionally break down an inbound batch from the input(s) into "single element" batches but re-aggregate those for the final mutli_receive at the output?

@lassebv
Copy link

lassebv commented Aug 18, 2019

We have done a number of tests where we compare a batch size of one in the JEE to not limiting the batch size with REE. I can indeed confirm that we have pipelines which cannot keep up with the incoming flow of data when using the JEE, but that works absolutely fine when using the REE. The improved performance of JEE in this case is therefore not enough to offset the performance hit of not batching properly.
We are using logstash for parsing data into Elasticsearch, so I guess our issue is that the bulk API is then not used optimally. I suppose that this also increase the load on Elasticsearch in that case.

@colinsurprenant
Copy link
Contributor

@lassebv yes, setting a batch size of 1, in the case of a configuration using the elasticsearch output, will result in bulk request size of 1 into elasticsearch which will significantly reduce the indexing throughput into elasticsearch.

@danhermann
Copy link
Contributor Author

Closing as this approach will not work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Java execution reorders events even with 1 pipeline worker
6 participants