-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add support for IN clause to pull queries #6409
feat: Add support for IN clause to pull queries #6409
Conversation
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java
Outdated
Show resolved
Hide resolved
final List<List<?>> tableRows = new ArrayList<>(); | ||
final List<LogicalSchema> schemas = new ArrayList<>(); | ||
final List<Future<PullQueryResult>> futures = new ArrayList<>(); | ||
final List<List<Struct>> keysByLocation = mat.locator().groupByLocation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you return besides they keys also the active, standby per group? This way you wouldn't need to do locate
twice basically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think currently the routing is wrong and you have to return the <active,standby> per group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had thought that a given set of partitions were grouped together at each active and standby, but I think you're right this isn't the case. I'll change it to groupByActiveStandyList
or something similar. In practice, there aren't too many standbys, so this is likely to be a lot better than grouping by partition or just fetching by key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reworked this so that it now groups key together only if they share the same list of nodes, including active and all standbys. Most of the time if there's 1 or 2 standbys and lots of keys fetched, this will hopefully reduce unnecessary calls.
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java
Outdated
Show resolved
Hide resolved
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Outdated
Show resolved
Hide resolved
WINDOWSTART, | ||
WINDOWEND | ||
} | ||
|
||
private static Map<ComparisonTarget, List<ComparisonExpression>> extractComparisons( | ||
private static class KeyAndWindowBounds { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the benefit of having this class versus adding one more entry in the ComparisonTarget
? Seems a lot of work for achieving the same functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InPredicate isn't a ComparisonExpression. Here, I can have a different type for each target type.
Alternatively, I could have change the map to point to an expression and casted back to the subtype, but that's not a great practice.
pullQueryMetrics); | ||
} | ||
for (Future<PullQueryResult> future : futures) { | ||
final PullQueryResult result = future.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't there be some error handling here? What happens if a thread dies due to an uncaught exception? We would like to fail the entire query and not return partial results, right? Maybe handle something like this:
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// Extract the actual exception from its wrapper
Throwable t = e.getCause();
System.err.println("Uncaught exception is detected! " + t
+ " st: " + Arrays.toString(t.getStackTrace()));
// ... Handle the exception
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that we don't want partial results, so I wouldn't want to catch each individual future.
Also, calling interrupt is what causes the InterruptedException
and it sends that exception to other threads. To have the current thread have it thrown, all we have to do is not catch it.
I pretty much just let the existing handler catch Exception in the wider scope. I added a small special case to unwrap ExecutionException
so we get the same error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calling interrupt is what causes the
InterruptedException
I think you probably meant this, but just a small clarification here - Thread.currentThread().interrupt()
does not directly cause an InterruptedException
. It just sets an interrupt flag on the thread's state, and then any blocking code is expected to check that flag and raise an InterruptedException
when it notices that flag. There is no guarantee that calling interupt
will actually do anything (especially if there's misbehaving client code). It's a cooperative strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're right that a thread may have to explicitly check for being interrupted if it's doing a lot of computation. If it's sleeping in some manner, there's a good chance that it will be thrown for the thread, e.g. Thread.sleep
.
We are missing testing for the case where keys belong to different partitions, where there are partitions that are on the same active host but different standbys. I think this must be done manually, I don't think there is any automatic test currently for multiple partitions per host, right? |
@vpapavas Yeah, there's currently no test of this sort with multiple partitions where we also have failures (since those are required to resort to different standbys). Muckrake tests actually might do the job, but I'll also try to test this manually. I addressed all of your above comments and reworked the code to not only find the location once, but also do it based on the whole list of active and standbys, so this should cover the different partitions case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AlanConfluent! This feature is going to be a huge improvement over the existing functionality. I'm mostly aligned with the approach, but some implementation concerns inline.
case FLOAT64: | ||
return new DoubleLiteral((double) value); | ||
default: | ||
throw new KsqlException("Unknown key type " + schema.type()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very soon we're going to support all expression types as keys (including structs) and we already support DECIMAL
keys. I'm not entirely sure that this approach will work then since it's not always possible to take a java object and convert it into an expression as the inverse conversion is lossy. Is it possible to extract the original expression when we construct the List<Struct> keys
instead of mapping it to Java objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we mentioned elsewhere, I entirely got rid of this rewriter and instead use partitions to ensure I only read the correct data rather than the keys only. I also added partition to the read calls on streams so that it enforces this both for this change and when range queries are soon implemented.
routingOptions | ||
); | ||
} | ||
for (Future<PullQueryResult> future : futures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any requirement that the pull query result is reproducible? it seems like we add rows to the result based on the order of the nodes that we request, but those nodes can host different partitions at different times. I think it's reasonable to not guarantee ordering, but food for thought (and probably should be documented)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right that there are no guarantees about ordering. I'll document that in the code and consider documenting elsewhere as well.
pullQueryMetrics); | ||
} | ||
for (Future<PullQueryResult> future : futures) { | ||
final PullQueryResult result = future.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calling interrupt is what causes the
InterruptedException
I think you probably meant this, but just a small clarification here - Thread.currentThread().interrupt()
does not directly cause an InterruptedException
. It just sets an interrupt flag on the thread's state, and then any blocking code is expected to check that flag and raise an InterruptedException
when it notices that flag. There is no guarantee that calling interupt
will actually do anything (especially if there's misbehaving client code). It's a cooperative strategy.
return new PullQueryResult( | ||
routeQuery(node, statement, executionContext, serviceContext, pullQueryContext), | ||
debugNode); | ||
final List<Optional<KsqlNode>> debugNodes = rows.getRows().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: probably makes sense to make this Optional<List<KsqlNode>>
instead of List<Optional<KsqlNode>>
to prevent creating a list of empty optionals in the non-debug case. And then you can also use Collections#nCopies
to make it a little more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had been thinking that maybe this was necessary because different machines can have different configs, but I now remember that this is a request config, so it should always work. Ok, made it Optional<List<KsqlNode>>
// Rewrite the expression to only query for the particular keys we care about for this node. | ||
// Otherwise, we'll risk reading standby data for other partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried this approach doesn't scale well for things other than IN
queries (not to mention that it feels hacky). Instead, it probably makes sense to have a way to specify that a pull query (internally routed only) should only read from certain partitions. Otherwise, how would we handle things like range queries? I can't "rewrite" the range query and avoid the risk of reading standby data.
Generally, I think it might make sense to think about communicating pull queries internally using something other than just the SQL statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's fair. We actually were discussing range queries as well and it's probably easier to use a unified approach. I think that it should be sufficient to specify partition when reading from the table (to ensure you're only reading from the actives and standbys you intend) and also on the second hop, avoid reading keys that are not hosted on this machine or that don't agree with those specified partitions. That same approach will work on range queries as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, implemented that other approach where partition is specified. No rewriting necessary.
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java
Outdated
Show resolved
Hide resolved
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Outdated
Show resolved
Hide resolved
028419a
to
54f8359
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't looked in depth at the code - just scanned the test cases. They LGTM!
"name": "non-windowed IN lookup on wrong type", | ||
"statements": [ | ||
"CREATE STREAM INPUT (ID INT KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", | ||
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ID;", | ||
"SELECT * FROM AGGREGATE WHERE ID IN ('10', 8);" | ||
], | ||
"expectedError": { | ||
"type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", | ||
"message": "'10' can not be converted to the type of the key column: ID INTEGER KEY", | ||
"status": 400 | ||
} | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's normal for the IN
operator to support some level of coercion of types. I see you've handled INT
-> BIGINT
. But, strangely, it's also normal to support STRING
-> . Of course, we don't have to support this. But those used to sql may expect it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I find it a bit funny that a IN (v1)
isn't just syntactic sugar for a = v1
. I may add that as a followup to try to keep from adding much more to this PR.
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java
Show resolved
Hide resolved
// If one of the partitions required is out of nodes, then we cannot continue. | ||
if (round >= location.getNodes().size()) { | ||
throw new MaterializationException(String.format( | ||
"Unable to execute pull query: %s", statement.getStatementText())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a more descriptive error message so that we know at which point in the code the query failed and why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, changed it to "Unable to execute pull query: %s. Exhausted standby hosts to try."
"Unable to execute pull query: %s", statement.getStatementText())); | ||
} | ||
|
||
private static Map<KsqlNode, List<KsqlLocation>> groupByHost( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What this method does, is group all locations per the same host, which if round=0, will be the active. So, all locations (all keys) that have the same host as active will we grouped together. Then, in the second round, for any keys that the active failed, we will get the standby that is second in ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. I added this example to make it clearer:
// For example, locations might be:
// [ Partition 0 <Host 1, Host 2>,
// Partition 1 <Host 2, Host 1>,
// Partition 2 <Host 1, Host 2> ]
// In Round 0, fetch from Host 1: [Partition 0, Partition 2], from Host 2: [Partition 1]
// If everything succeeds, we're done. If Host 1 failed, then we'd have a Round 1:
// In Round 1, fetch from Host 2: [Partition 0, Partition 2].
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Show resolved
Hide resolved
...-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome job @AlanConfluent ! LGTM!
Description
Adds support for the
IN
clause to pull queries such that multiple keys can be fetched at once. This can be used in conjunction with all of the existing window comparisions (e.g. windowstart, windowend).An example of such a query is:
And you might get a response like this:
Fixes #6115
Testing done
Ran unit tests and rest query validation tests.
Reviewer checklist