Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KLIP-17: SQL Union #4125

Closed

Conversation

big-andy-coates
Copy link
Contributor

@big-andy-coates big-andy-coates commented Dec 12, 2019

Description

KLIP for replacing INSERT INTO with UNION ALL.

Some of the internal details still need working out, but this outlines the concepts and is a good starting point for discussion.

Easy to view version can be found here

@big-andy-coates big-andy-coates requested a review from a team as a code owner December 12, 2019 12:23
@PeterLindner
Copy link

@big-andy-coates I hope I don't annoy you mentioning this topic again, but how do you feel about the ordering of the query output?

If multiple merged streams generate messages with the same key and the ordering of the output is not guaranteed, processing historical data would yield a different result each time it is processed.

I guess reading from all the input streams could be synchronized similar to joins, but that requires all the streams to be co-partitioned, right?

What do you think? Is this even a problem?

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a first pass through. I think, as you say, the main thing we'd lose by dropping insert into is the ability to add queries writing to a sink. But we can reframe that as evolving the old query. That's non-trivial, but I agree that what's proposed here gives us a simpler model and better performance. If we get feedback that adding queries is important we can look to implement that as a form of query evolution later.

design-proposals/klip-17-sql-union.md Show resolved Hide resolved
design-proposals/klip-17-sql-union.md Outdated Show resolved Hide resolved
design-proposals/klip-17-sql-union.md Outdated Show resolved Hide resolved
@rodesai
Copy link
Contributor

rodesai commented Dec 12, 2019

I guess reading from all the input streams could be synchronized similar to joins, but that requires all the streams to be co-partitioned, right?
What do you think? Is this even a problem?

I think this is out of scope for this work, but this does put us in better position to give this kind of guarantee. Before, we had n topologies for n queries. Now that everything would be in 1 topology, we could actually implement synchronization across the streams.

@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Dec 16, 2019

@PeterLindner

@big-andy-coates I hope I don't annoy you mentioning this topic again, but how do you feel about the ordering of the query output?

Not at all.

If multiple merged streams generate messages with the same key and the ordering of the output is not guaranteed, processing historical data would yield a different result each time it is processed.

I guess reading from all the input streams could be synchronized similar to joins, but that requires all the streams to be co-partitioned, right?

What do you think? Is this even a problem?

I would imagine, though I'd need to confirm, that KS will merge the streams with a best-efforts on aligning event-time across the streams. cc @mjsax. So moving to a single topology gives us more scope to order by event time.

However, order is less important IMHO when interleaving messages from multiple streams, as a stream does not have the per-key update semantics of a table. The order from each source will be maintained in the output, but the interleaving of multiple sources will not.

Ensuring exact ordering is tricky as a stream never ends, so there is always the possibility of out-of-order data coming, meaning you'd have to buffer forever to ensure correct ordering. Correct event time ordering is therefore a trade off between correctness and latency. This is not something we expose in KSQL.

@big-andy-coates
Copy link
Contributor Author

@rodesai

Took a first pass through. I think, as you say, the main thing we'd lose by dropping insert into is the ability to add queries writing to a sink. But we can reframe that as evolving the old query. That's non-trivial, but I agree that what's proposed here gives us a simpler model and better performance. If we get feedback that adding queries is important we can look to implement that as a form of query evolution later.

Yep, totally.

@hjafarpour
Copy link
Contributor

INSERT INTO is not equivalent of UNION ALL and each of them have their intended purpose. We cannot replace INSERT INTO with UNION ALL and have the same behavior. For instance, in many cases we may have an existing stream and would need to add another stream to it. You can do this with INSERT INTO while in UNION ALL you have to have all queries available at the time you issue the statement.
Why not have both, keep INSERT INTO and add UNION ALL?

@PeterLindner
Copy link

I would imagine, though I'd need to confirm, that KS will merge the streams with a best-efforts on aligning event-time across the streams

I tried to look that up, the Javadoc for KStream.merge says:

There is no ordering guarantee between records from this KStream and records from the provided KStream in the merged stream. Relative order is preserved within each input stream though (ie, records within one input stream are processed in order).

Ensuring exact ordering is tricky

I'm just asking about deterministic ordering (ie merging the same streams multiple times should yield the same ordering each time)

However, order is less important IMHO when interleaving messages from multiple streams, as a stream does not have the per-key update semantics of a table

That's right, I guess problems only arise when multiple streams containing the same keys are merged and then fed into a state store. Imagine the following extreme situation:

  • stream1 already has a lot of messages with timestamps of the past 7 days
  • stream2 is an empty stream and I create a single message now
  • all of the messages of both steams have the same key
  • I merged stream1 and stream2 and created stream3 out of them 7 days ago, the order of stream3 would be as expected
  • If I merge stream1 and stream2 now and create stream4 out of them, the single message of stream2 would be one of the first processed messages and all messages from stream1 being processed afterwards would be considered out of date
  • If I then create the same aggregation for stream3 and stream4, I'd expect them to yield the same results, since I issued the exact same queries, but stream4 probably has a lot of records that are even out of the grace period.

Does that make sense or am I missing something?

@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Dec 30, 2019

SERT INTO is not equivalent of UNION ALL and each of them have their intended purpose. We cannot replace INSERT INTO with UNION ALL and have the same behavior. For instance, in many cases we may have an existing stream and would need to add another stream to it. You can do this with INSERT INTO while in UNION ALL you have to have all queries available at the time you issue the statement.
Why not have both, keep INSERT INTO and add UNION ALL?

The distinction you call out between INSERT INTO and UNION ALL is already called out in the KLIP itself in the Compatibility Implications section.

However, we will be able to extend the UNION ALL solution to cover this gap once we can support UPDATE TABLE style operations, which we're moving towards. In the meantime switching to UNION ALL does not, as stated, have exact parity with INSERT INTO.

The main driver for switching from INSERT INTO to UNION ALL is to bring about a 1-2-1 model between sink and query. This enables us to get rid of the command topic and replace it with metastore database, covered in KLIP-18. Once KLIP-18 is done we can add in UPDATE TABLE semantics, and the first thing we could support is the ability to add new sources to an existing union.

So yes, we lose some functionality in the short term, but then we get it back with bells on. If we feel strongly that users won't want to lose this short term, then we'd need to implement SQL Union and KLIP-18 and UPDATE TABLE on unions before dropping INSERT INTO. However, that's a PITA.

# Compatibility Implications

Removal of `INSERT ALL` is clearly a breaking change. However, we _must_ remove it if we are to unlock
the other planned work.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSERT ALL ? Should be INSERT INTO?

What is unclear in general about this KLIP is, why not just add UNION ALL for streams, and keep INSERT INTO as-is? Why remove INSERT INTO? What other work is planned to use it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSERT ALL ? Should be INSERT INTO?

updated!

What is unclear in general about this KLIP is, why not just add UNION ALL for streams, and keep INSERT INTO as-is? Why remove INSERT INTO? What other work is planned to use it?

I thought the document was being quite clear on this. It has a section on it:

The later drawbacks are stopping other improvements to KSQL. Including, though not limited to:

* [KLIP-18][18]: Distributed Metastore: which requires the (1 - 0..1)
relationship between data source and persistent query that this KLIP will restore.
See the KLIP-17 section in KLIP-18 for details of why this is needed.
* [KLIP-19][19]: Introduce Materialized Views: which looks to clarify the
semantic difference between a view and a table/stream and their associated semantics. 
This KLIP is related because it removes `INSERT INTO` which inserts into an existing stream.
* [KLIP-20][20]: Remove `TERMINATE` statement: which requires the (1 - 0..1)
relationship between data source and persistent query that this KLIP will restore.
* [KLIP-21][21]: Correct 'INSERT VALUES' semantics: which depends on [KLIP-19][19].

@mjsax
Copy link
Member

mjsax commented Jan 2, 2020

@big-andy-coates @PeterLindner About ordering in merge. All you said is correct. Kafka Streams does not provides ordering guarantees because it's not really possible to provide them (more below). Kafka Streams also requires that both streams are co-partitioned, otherwise, it might compute the wrong result downstream (what is a known issues: https://issues.apache.org/jira/browse/KAFKA-7293). ksqlDB could (and should!) check for this conditions if UNION ALL is going to use merge() (what I assume).

The only "workaround" to merge streams based on timestamps would be via topics: ie, both streams must be written into a topic and a new stream is created from both as once:

-- no ordering guarantee:
stream3 = stream1.merge(stream2);

-- workaround
stream1.to("topic-1");
stream2.to("topic-2");
Set<String> topics = new HashSet();
topics.add("topic-1");
topics.add("topic-2");
stream3 = builder.stream(topics);

Note that this "workaround" is strictly required and even if merge() would allow timestamp synchronized merging, it would need to implement it that way, as some "buffer" is required to temporarily block one stream. Using max.task.idle.ms one can configure how long a stream should be blocked... Note that infinite blocking is not an option in general.

Hence, I think it is ok as a first step to implement UNION ALL without any synchronization guarantees. @PeterLindner raises a fair point with his example, however, to address issue like this, corresponding grace-periods must be set downstream (at least in Kafka Streams -- ksqlDB does not offer much control atm with this regard).

For a time synchronized UNION ALL a new keyword could be introduced though in a follow up KLIP, if there is demand for such a feature. Rerouting add data throw topic to time-synchronize them is an expensive operation and users should have both options IMHO, and would need to explicitly pick the one or the other operator.

@apurvam apurvam added the design-proposal Tag KLIP Prs with this label label Jan 2, 2020
@confluentinc confluentinc deleted a comment from mjsax Jan 6, 2020
@PeterLindner
Copy link

With Tiered Storage, the time difference may be very high which could be inappropriate for grace periods (think years) so I'd rather consider that as a workaround.

But I don't know if this is a real problem, just wanted to bring this to your minds. I'm perfectly fine with just supporting non-syncronized UNION ALL for now. But I guess it should at least be mentioned in the docs.

@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Feb 28, 2020

I've added details of ordering guarantees to the KLIP and partition count requirements. @mjsax can you review please?

For my own benefit, I'm assuming the co-partitioning issue arises because the sink topic is set to the max(partition count) of the sources, and each stream job is reading source partitions and explicitly setting the sink partition when producing to the sink?

For example, If source A has one partition, and source B has two, then the sink will have two partitions and there will be two stream tasks. The first stream task will read from A:1 and B:1 and produce to SINK:1. The second task will read from B:2 and produce to SINK:2.

Or is it something else?

@big-andy-coates big-andy-coates deleted the KLIP-17-Sql-Union branch June 30, 2020 16:14
@big-andy-coates big-andy-coates restored the KLIP-17-Sql-Union branch June 30, 2020 16:36
@aabrams
Copy link

aabrams commented Aug 5, 2020

Hi, is there a plan to assign this to a version? Is this direction still the relevant one?

I would very much like to see this implemented, as together with "CREATE OR REPLACE" in 0.12 it will greatly simplify our ability to modify queries in live clusters in programmatic fashion.

@big-andy-coates
Copy link
Contributor Author

It's not on any roadmap yet. I've been too busy elsewhere to progress this. When I get some time I'll kick start the process. It's still relevant , etc, just needs resourcing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design-proposal Tag KLIP Prs with this label
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants