Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Being able to know how advanced the stream processing is #1751

Open
sneko opened this issue Aug 17, 2018 · 12 comments
Open

Being able to know how advanced the stream processing is #1751

sneko opened this issue Aug 17, 2018 · 12 comments

Comments

@sneko
Copy link

sneko commented Aug 17, 2018

Hi,

I searched in the docs but didn't find anything so I post here:

Assuming that I create a persistent STREAM named view_usersAndOrders that joins my users and orders topics. If those topics have 6 GB of data each it could take a moment for the KSQL cluster to process the whole data to put them in the Kafa topic view_usersAndOrders.

So if I launch my microservice consuming the view_usersAndOrders topic, how could it be able to know that it has received enough events to have approximately the "final" state of the view?

I mean I don't want to accept user requests onto this microservice until it's ready.

A first step could be to compare the offset of the microservice consumer and the offset of the last event in the view_usersAndOrders. Like that I know if my consumer has the most recent event in this topic (we could consider the microservice as ready from the moment it has received an event timestamped less than 10 seconds, because if the topic receives 100 events per second it's possible that the consumer never reach the "latest" events within the topic)

But that's not enough, because having the almost latest event present in the view_usersAndOrders topic doesn't mean that the KSQL cluster has finished to process the almost entirety of the users and orders topics. Example:

  • Microservice consumer has consumed 100% of the view_usersAndOrders
  • KSQL cluster has processed 25% of the JOINs between my users and orders topics
  • So my microservice consumer has in reality just consumed 25% of the data it needs

Since my JOIN could be about important data, I can't afford to consider my microservice as ready since it is not aware of the latest events not yet processed by KSQL cluster.

I guess there is a way to compare the offsets that my KSQL cluster has about what it consumes from users and orders topics. But I don't know where to access this kind of information... 😢

I hope that what I just wrote is understable 👍

Thank you,

@apurvam
Copy link
Contributor

apurvam commented Aug 22, 2018

Unfortunately at this time, there is no programmatic way to figure out how much of the inputs to a join have been processed.

@sneko
Copy link
Author

sneko commented Aug 22, 2018

Hi @apurvam! I will try to not repeat myself but after reading some Confluent’s articles advising to use an optimized view in each microservices I tried to make it working with KSQL as described there:
confluentinc/confluent-kafka-go#181 (comment)

The advancement of my KSQL processing is really important to set my microservice as ready. I guess according to your answer there is currently no other way to work around this...

But is it on your roadmap? And if it isn’t could you ask the Confluent team what would be the better way to make my “approach” working correctly with kind of “readiness” metrics?

@miguno said my approach seems good but if KSQL doesn’t support how long the stream has been processed I don’t understand what would you imagine?

Thank you for answering me and for all your amazing work 😎

@sneko
Copy link
Author

sneko commented Sep 18, 2018

Hi @apurvam 😃

Is there any plan to support this feature? It's totally impossible to set my microservices as ready without it since I can't guarantee my view topic generated by KSQL is almost sync to latest source data. I really think it's an important feature for KSQL.

Thank you,

@apurvam
Copy link
Contributor

apurvam commented Sep 21, 2018

I think from an engineering perspective, encoding the 'progress' of a join in a stream so that downstream consumers can make decisions (like whether they should serve traffic) based on this information is going to be a big investment and may even be impossible to do with 100% accuracy.

If we can collect more use cases, then we can gauge whether the ROI makes sense. cc @MichaelDrogalis @miguno may have more information here.

Having said that, KStreams and KSQL are open source, so if this is important enough, please feel free to explore ways to solve it for your use case.

@sneko
Copy link
Author

sneko commented Sep 21, 2018

@apurvam thank you for your answer 😃

I know it could be hard to be accurate, but for the use cases Confluent explains by using local view (generated by KSQL) in our microservices, it would be better to allow inbound traffic only when the view is almost updated.

So I don't expect having an exact information at a specific time, but something like having both of the following information for a specific KSQL request:

  • how many % the KSQL cluster has processed for each of the source topics "bound" to the KSQL request
  • what is the datetime of the latest event KSQL processed for this KSQL request

Like that I'm able on my side to make my own rules, to simply set my microservice as ready if:

  1. the latest event processed by KSQL for this KSQL request is dated less than 30 seconds
  2. OR in case the KSQL cluster receives too many events per second and that it's impossible for this one to process fast enough, I could check it has processed at minimum 80% of my source topics

I don't know exactly where to share this data, but it could be in a topic, with a frequency of 1 minute for example to do not flood the topic. But the compaction could also be enabled on it.

Another idea would be to expose it through the KSQL Server API without using topics, but I guess each KSQL instances of a cluster is not aware of its siblings and cannot share information between them?

As you said, the ROI has to make sense to implement it. Since by creating a view of my topics through KSQL and right after launching microservice there is a high risk of providing wrong (or old) information to users reaching this microservice... I think it's important enough no?

Maybe I missed something? Maybe other people using local view with KSQL processing already have a solution? I just try to understand how the pattern Confluent shows could fill the need of serving updated view.

Thank you,

@MichaelDrogalis
Copy link
Contributor

I understand the problem, but as @apurvam mentioned, this is a non-trivial addition. One thing that's striking is that the notion of "progress" is primarily defined as application logic.

In the interest of making progress (no pun intended), can we back up for a moment and talk about the schemas over the two streams being joined? Is there a property of the events themselves that can signal staleness as part of a materialized view?

@sneko
Copy link
Author

sneko commented Sep 24, 2018

@MichaelDrogalis I understand it depends on the application logic, that's why I'm not talking about KSQL that sets a view as ready, but rather it exposes some "metrics" the microservice is able to deal with to make its own decision about the readiness.

That's still generic if KSQL exposes the source topics processing for its STREAM/TABLE. That's not intended for my specific use case (I think 😄 )

BTW, I'm not sure if the both last questions are for me but the RAWTIME value is inside events:
If we have 2 source topics (users and orders) and we would like to create a stream "orders_view", we could SELECT users.RAWTIME userEventTime, orders.RAWTIME orderEventTime and then on the microservice I could set some rules like:

  • for each event I receive, I choose the most recent RAWTIME between the userEventTime and orderEventTime
  • if I receive an event with the RAWTIME dated less than 10 seconds ago I can consider my microservice as ready
  • OR if I don't receive new events during 30 seconds, probably KSQL finished to populate the topic view and I can set my microservice as ready

But I see some drawbacks with this solution:

  • It's based on data pushed inside the topic view. But it may not reflect the reality (if something goes wrong on KSQL side)
  • I need to pollute the view topic with all the RAWTIMEs from my sources to make it working. It doesn't seem very user-friendly 😀 ... We could improve it a bit by concatenating all RAWTIMEs from sources inside a same field lastEventTime formatted with $AAA_RAWTIME:$BBB_RAWTIME:$CCC_RAWTIME:.... Like that I'm able to implement a generic way on my microservice whatever how many source the view results in.

I hope we will find a "proper" solution to handle the local view use case 🔨

@spena
Copy link
Member

spena commented Oct 25, 2019

I'll close this question as it hasn't been updated for almost a year. If you still have questions about this, feel free to re-open it or open a new issue.

@spena spena closed this as completed Oct 25, 2019
@sneko
Copy link
Author

sneko commented Oct 25, 2019

Hey @spena, had a few months ago a call with @MichaelDrogalis about that. The issue described here has been understood even if Confluent is not working on it (correct me if I'm wrong).

I think that's needed to reopen it (I cannot on my side), it would bring real value to KSQL to manage event sourcing on sensible data.

Thank you,

@MichaelDrogalis
Copy link
Contributor

Hey @spena - can confirm, I think this has advanced from question to feature. I understand what @sneko is asking about, and it's a legitimate need. It is hard to build an app against KSQL when the state stores are being rematerialized, since the stores appear to go backwards in time.

@spena
Copy link
Member

spena commented Oct 28, 2019

Got it. Thanks @MichaelDrogalis for re-opened it. I thought this was an old question.

@dttouchdata
Copy link

By any chance, is there any update on this? It looks like we are in this similar situation of this ask. We just started creating analytic app based on ksqldb where we created a table by doing stream left join table. In our scenario, when an event occurs in front-end app(upstream) it generates a message FIRST in the table side topic and then generates another message in the left side stream topic. These two events occur almost within a second(s) (think of order_detail (stream) join order_header (table)). When we created table first (long after topics loaded), join produced the results properly i.e., both stream and table columns populated with correct values. But after table creation, any newly created events are having issue i.e., only stream side columns are getting populated and nulls for all right side table columns. I am wondering if there is a way to delay stream-table join until table is updated to latest state. This is very important to us whether to proceed with ksqldb or not. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants