-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KLIP-17: SQL Union #4125
KLIP-17: SQL Union #4125
Conversation
@big-andy-coates I hope I don't annoy you mentioning this topic again, but how do you feel about the ordering of the query output? If multiple merged streams generate messages with the same key and the ordering of the output is not guaranteed, processing historical data would yield a different result each time it is processed. I guess reading from all the input streams could be synchronized similar to joins, but that requires all the streams to be co-partitioned, right? What do you think? Is this even a problem? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a first pass through. I think, as you say, the main thing we'd lose by dropping insert into is the ability to add queries writing to a sink. But we can reframe that as evolving the old query. That's non-trivial, but I agree that what's proposed here gives us a simpler model and better performance. If we get feedback that adding queries is important we can look to implement that as a form of query evolution later.
I think this is out of scope for this work, but this does put us in better position to give this kind of guarantee. Before, we had n topologies for n queries. Now that everything would be in 1 topology, we could actually implement synchronization across the streams. |
Not at all.
I would imagine, though I'd need to confirm, that KS will merge the streams with a best-efforts on aligning event-time across the streams. cc @mjsax. So moving to a single topology gives us more scope to order by event time. However, order is less important IMHO when interleaving messages from multiple streams, as a stream does not have the per-key update semantics of a table. The order from each source will be maintained in the output, but the interleaving of multiple sources will not. Ensuring exact ordering is tricky as a stream never ends, so there is always the possibility of out-of-order data coming, meaning you'd have to buffer forever to ensure correct ordering. Correct event time ordering is therefore a trade off between correctness and latency. This is not something we expose in KSQL. |
Yep, totally. |
|
I tried to look that up, the Javadoc for KStream.merge says:
I'm just asking about deterministic ordering (ie merging the same streams multiple times should yield the same ordering each time)
That's right, I guess problems only arise when multiple streams containing the same keys are merged and then fed into a state store. Imagine the following extreme situation:
Does that make sense or am I missing something? |
The distinction you call out between However, we will be able to extend the The main driver for switching from So yes, we lose some functionality in the short term, but then we get it back with bells on. If we feel strongly that users won't want to lose this short term, then we'd need to implement SQL Union and KLIP-18 and |
# Compatibility Implications | ||
|
||
Removal of `INSERT ALL` is clearly a breaking change. However, we _must_ remove it if we are to unlock | ||
the other planned work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INSERT ALL
? Should be INSERT INTO
?
What is unclear in general about this KLIP is, why not just add UNION ALL
for streams, and keep INSERT INTO
as-is? Why remove INSERT INTO
? What other work is planned to use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INSERT ALL ? Should be INSERT INTO?
updated!
What is unclear in general about this KLIP is, why not just add UNION ALL for streams, and keep INSERT INTO as-is? Why remove INSERT INTO? What other work is planned to use it?
I thought the document was being quite clear on this. It has a section on it:
The later drawbacks are stopping other improvements to KSQL. Including, though not limited to:
* [KLIP-18][18]: Distributed Metastore: which requires the (1 - 0..1)
relationship between data source and persistent query that this KLIP will restore.
See the KLIP-17 section in KLIP-18 for details of why this is needed.
* [KLIP-19][19]: Introduce Materialized Views: which looks to clarify the
semantic difference between a view and a table/stream and their associated semantics.
This KLIP is related because it removes `INSERT INTO` which inserts into an existing stream.
* [KLIP-20][20]: Remove `TERMINATE` statement: which requires the (1 - 0..1)
relationship between data source and persistent query that this KLIP will restore.
* [KLIP-21][21]: Correct 'INSERT VALUES' semantics: which depends on [KLIP-19][19].
@big-andy-coates @PeterLindner About ordering in merge. All you said is correct. Kafka Streams does not provides ordering guarantees because it's not really possible to provide them (more below). Kafka Streams also requires that both streams are co-partitioned, otherwise, it might compute the wrong result downstream (what is a known issues: https://issues.apache.org/jira/browse/KAFKA-7293). ksqlDB could (and should!) check for this conditions if The only "workaround" to merge streams based on timestamps would be via topics: ie, both streams must be written into a topic and a new stream is created from both as once: -- no ordering guarantee:
stream3 = stream1.merge(stream2);
-- workaround
stream1.to("topic-1");
stream2.to("topic-2");
Set<String> topics = new HashSet();
topics.add("topic-1");
topics.add("topic-2");
stream3 = builder.stream(topics); Note that this "workaround" is strictly required and even if Hence, I think it is ok as a first step to implement For a time synchronized |
With Tiered Storage, the time difference may be very high which could be inappropriate for grace periods (think years) so I'd rather consider that as a workaround. But I don't know if this is a real problem, just wanted to bring this to your minds. I'm perfectly fine with just supporting non-syncronized |
I've added details of ordering guarantees to the KLIP and partition count requirements. @mjsax can you review please? For my own benefit, I'm assuming the co-partitioning issue arises because the sink topic is set to the max(partition count) of the sources, and each stream job is reading source partitions and explicitly setting the sink partition when producing to the sink? For example, If source A has one partition, and source B has two, then the sink will have two partitions and there will be two stream tasks. The first stream task will read from A:1 and B:1 and produce to SINK:1. The second task will read from B:2 and produce to SINK:2. Or is it something else? |
Hi, is there a plan to assign this to a version? Is this direction still the relevant one? I would very much like to see this implemented, as together with "CREATE OR REPLACE" in 0.12 it will greatly simplify our ability to modify queries in live clusters in programmatic fashion. |
It's not on any roadmap yet. I've been too busy elsewhere to progress this. When I get some time I'll kick start the process. It's still relevant , etc, just needs resourcing. |
Description
KLIP for replacing
INSERT INTO
withUNION ALL
.Some of the internal details still need working out, but this outlines the concepts and is a good starting point for discussion.
Easy to view version can be found here