-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EQL: Introduce support for sequences #56300
Conversation
Initial support for EQL sequences The current algorithm is focused on correctness and does not contain any optimization which is left for the future. The current implementation uses a state machine approach which moves ascending and runs each query one after the other working on computing sequences as the data comes in. For each result, the key and its timestamp are being extracted which are then used for matching/building a sequence.
Pinging @elastic/es-ql (:Query Languages/EQL) |
@@ -73,8 +74,8 @@ public Expression visitSingleExpression(EqlBaseParser.SingleExpressionContext ct | |||
} | |||
|
|||
@Override | |||
public List<Expression> visitJoinKeys(JoinKeysContext ctx) { | |||
return ctx != null ? expressions(ctx.expression()) : emptyList(); | |||
public List<Attribute> visitJoinKeys(JoinKeysContext ctx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this doesn't precisely mirror the grammar, what errors will it cause if you do this?
by substring(field)
I completely agree with this restriction for Elasticsearch, even though Endpoint isn't bound by it. Will this raise a good error message or does it get buried in an uncaught exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now it will throw an exception since it expects a materialized expression which will fail for a function.
This needs improving but I'd rather handle that in a follow-up PR.
//TODO: re-visit this branch when it's decided if EQL needs a LocalRelation-like class | ||
//return new LocalRelation(filter.source(), new EmptyExecutable(filter.output())); | ||
throw new RuleExecutionException("Does not know how to handle a local relation"); | ||
return nonMatchingFilter(filter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix! Looks like this resolves
#53237
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not entirely - there is still some work to be done there mainly on returning the results from the empty executable however I didn't want to add that into this PR.
Add Projection to restrict the field extraction only to higher-level constructs like join/sequence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
big PR, started to look, left few comments, will come back
} | ||
|
||
public static List<Object[]> asArray(List<EqlSpec> specs) { | ||
AtomicInteger counter = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why AtomicInteger where int will do just fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int doesn't work since the stream pipeline requires a final/non-enclosed object. The AtomicLong
is essentially a hack to increment the counter while keeping the same reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit of misuse for AtomicInteger only for a Holder functionality but since it's a test it's not important.
|
||
public class ExecutionManager implements QueryClient { | ||
|
||
private static final Logger log = LogManager.getLogger(Querier.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean ExecutionManager.class
here?
|
||
// build a criterion for each query | ||
for (int i = 0; i < plans.size() - 1; i++) { | ||
List<Attribute> keys = listOfKeys.get(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to assert the assumption on sizes of the plans and listOfKeys here? not obvious in the scope of this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is handled inside LogicalPlan.visitJoin/visitSequence
when the keys between the top-level and potentially individual queries are merged in a unified way.
import static org.elasticsearch.common.logging.LoggerMessageFormat.format; | ||
|
||
/** | ||
* Instance of a sequence. Defined by its key and stage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add clarity to what "Instance of a sequence" means, and how it relates to other classes like SequenceRuntime?
- Result of a sequence
- A pending sequence that contains its current state as it progresses from pos 0 to pos N-1
- Node for the AST
- Sequence physical plan
- Query within a sequence
- etc
x-pack/plugin/eql/qa/common/src/main/resources/test_queries_supported.toml
Show resolved
Hide resolved
|
||
[[queries]] | ||
query = ''' | ||
sequence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add another test where the by
is after sequence
?
sequence by unique_ppid
[process where serial_event_id=1]
[process where true]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearly there needs to be more testing in the future and polishing but that's future work. the PR is already large enough - a current issue with the current dataset is that it relies on a custom tie-breaker something not yet implemented.
|
||
[[queries]] | ||
query = ''' | ||
sequence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently these are getting dropped by CommonEqlActionTestCase.readTestSpecs.
Any idea why @aleksmaus?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, they still need to be removed from test_queries_unsupported.toml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And since these are duplicates of test_queries.toml, you can remove them here too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the integration tests aren't hooked up and are failing.
Also CommonEqlActionTestCase.{assertResponse, assertSearchHits, extractIds} need to be updated to account for null
added to the body and sequences
populated instead
class KeyToSequences { | ||
|
||
private final int listSize; | ||
private final Map<SequenceKey, List<SequenceFrame>> keyToSequences; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inverting this might simplify the frame()
function: List<Map<SequenceKey, SequenceFrame>>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed - my concern iswhich layout is more memory efficient. Maps are heavier than lists and for a large number of keys that would be required in the first stages in a sequences, that would result in a number of maps with a large number of keys.
The access time would be faster as the number of keys gets smaller in each stage but (without benchmarking) I considered that to be less of an issue.
Further experimentation will tell which one would work better.
Having a list for every single key (even if transitory) might prove problematic as well.
Add documentation and hook up integration tests again
Good catch - I think between the merge to master and some local tests I had on the previous branch, this branch got left behind. I've updated the test suite and the specs. |
@Override | ||
public LogicalPlan visitEventQuery(EqlBaseParser.EventQueryContext ctx) { | ||
return new Project(source(ctx), visitEventFilter(ctx.eventFilter()), emptyList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The projection was introduced since:
- the results only care about the hits, not individual fields.
- having each field extracted, due to the number of exact types, causes the doc values limit to be reached.
Before this PR the event filters did not return any attributes but now, as inputs to sequences they have to otherwise the key or timestamp field resolution for them fails.
Hence why the visitEventQuery
now wraps the visitEventFilter
- the latter creates the proper plan for internal use while the former wraps it up for external consumption. Note that sequence and join use the same idea - they don't output any attribute since their entire content is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@costin thx for explaining!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, good job!
Left some comments mostly minor.
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
|
||
class QueriesUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: can be final
} | ||
|
||
public List<String> asStringList() { | ||
String[] s = new String[keys.length]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not create an ArrayList and if necessary make it immutable before returning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because this achieves the same thing but faster. By using asList
the backing array is returned with a fixed size (no extra allocation) and wrapped as a semi-immutable list, meaning it won't grow but you can replace it.
Which I think it's fine considering the internal usage.
} | ||
|
||
public static List<Object[]> asArray(List<EqlSpec> specs) { | ||
AtomicInteger counter = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit of misuse for AtomicInteger only for a Holder functionality but since it's a test it's not important.
try { | ||
ShardSearchFailure[] failures = response.getShardFailures(); | ||
if (CollectionUtils.isEmpty(failures) == false) { | ||
listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to use only the 1st one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what we currently use in SQL/QL as well. We could revisit this for a better error message, however it's a separate issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: AtomicInteger, Holder is inside ql package, a dependency qa does not currently use.
// no until declared means no results | ||
// create a dummy keyed filter | ||
String notUsed = "<not-used>"; | ||
Attribute tsField = new FieldAttribute(source, notUsed, new UnsupportedEsField(notUsed, notUsed)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe extract this to a static method so that the string is also more "static".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's no reuse, no much point in making the string static since the compiler does that automatically through the constant pool.
return Objects.equals(timestamp, other.timestamp) | ||
&& Objects.equals(children(), other.children()) | ||
&& Objects.equals(keys, other.keys); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: empty line.
|
||
public long timestamp(int stage) { | ||
if (stage > currentStage) { | ||
return Long.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this, sorry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece of code is not fully fleshed out - essentially for a frame of events, we can tell what's the minimum and maximum timestamp (when it started and where it stopped). The above case is for the corner case - when a stage that did not match is asked for, return an out of band value (MAX) so there's no match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have a comment for that.
|
||
@Override | ||
public String toString() { | ||
int numberOfDigits = stages > 100 ? 3 : stages > 10 ? 2 : 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use String.valueOf(stages).length()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True but it felt a waste to allocate a string only to discard it instead of doing 3 comparisons :).
import java.util.List; | ||
import java.util.Set; | ||
|
||
public class RuntimeUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could be also final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left few minor comments.
|
||
private RuntimeUtils() {} | ||
|
||
static void logSearchResponse(SearchResponse response, Logger logger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method exists in the same form in SQL's Querier
. Can you extract it from both places in a common location in QL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the methods inside RuntimeUtils are similar between the two, including the request preparation.
I think parts of the EQL execution could be ported to SQL (through QL) once things finalized but since we're not yet there, I opted not to chase that in this PR.
The aim here is to get the basic sequence work in place and then iterate over that.
I've raised #56641 to make sure we're not losing track of that.
|
||
private final Configuration cfg; | ||
private final Client client; | ||
private final TimeValue keepAlive; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't seem to do anything with keepAlive
. I assume it will be added later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
frame.add(sequence); | ||
} | ||
|
||
/** Match the given hit (based on key and timestamp) with any potential sequence from the previous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Newline?
|
||
// NB: since the size varies significantly, use a LinkedList | ||
// Considering the order it might make sense to use a B-Tree+ for faster lookups which should work well with | ||
// timestamp compression (which is known for the group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like an incomplete sentence.
} | ||
else { | ||
fail("No events or sequences found"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an odd formatting for if else if .... seems like the same line is more common in the current code
docKeys[i] = keyExtractors.get(i).extract(hit); | ||
} | ||
key = new SequenceKey(docKeys); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: slightly shorter
SequenceKey key = SequenceKey.NONE;
if (!criterion.keyExtractors().isEmpty()) {
Object[] docKeys = new Object[keyExtractors.size()];
for (int i = 0; i < docKeys.length; i++) {
docKeys[i] = keyExtractors.get(i).extract(hit);
}
key = new SequenceKey(docKeys);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Initial support for EQL sequences The current algorithm is focused on correctness and does not contain any optimization which is left for the future. The current implementation uses a state machine approach which moves ascending and runs each query one after the other working on computing sequences as the data comes in. For each result, the key and its timestamp are being extracted which are then used for matching/building a sequence. (cherry picked from commit 4f3e18c)
Initial support for EQL sequences
The current algorithm is focused on correctness and does not contain
any optimization which is left for the future.
The current implementation uses a state machine approach which moves
ascending and runs each query one after the other working on computing
sequences as the data comes in.
For each result, the key and its timestamp are being extracted which are
then used for matching/building a sequence.