Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming support for DataFusion #1544

Closed
5 tasks
hntd187 opened this issue Jan 11, 2022 · 32 comments
Closed
5 tasks

Streaming support for DataFusion #1544

hntd187 opened this issue Jan 11, 2022 · 32 comments
Labels
enhancement New feature or request

Comments

@hntd187
Copy link
Contributor

hntd187 commented Jan 11, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Consume a streaming source from Kafka. (to start) Datafusion already has a batch oriented processing framework that works pretty well. I wanna extend this to also be able to consume from streaming sources such as a kafka topic.

Describe the solution you'd like
So I think it makes sense to start with what Spark Streaming did for it's Streaming implementation which is the idea of micro-batching.

Blank diagram
Generally the idea pictured above is DF will listen to a topic for some period of time (defined at start up) then execute operations on that collected batch window of events. In the case of Kafka there normally these are either JSON or Avro which already has encoders in DataFusion.

I spent sometime looking at the types in data source and I came to the conclusion that it would probably be possible to implement this on top of the current API, but frankly, it would suck. The source traits all have a naming convention centered around tables and files, which a Kafka topic is technically neither. Basically what I am saying is an implementation here would be highly confusing to anyone trying to understand why things are named what they are. I propose we add a set additional traits specifically for streaming sources. These would map to an execution plan like the other data sources, but should have ability to manage the stream information such as committing offsets, checkpointing, watermarking. These are probably secondary things to come a bit after a "get the thing to work" implementation, but I wanted to just put it out there that these traits initially would look rather bare and not have much difference from the other data sources. They would though quickly diverge from those contracts into ones that support managing these stream operations.

What I am not sure about is while these types should likely live in DataFusion the actual implementation probably should not. At least start as a contrib module and maybe be promoted into the main repo eventually if it makes sense.

Does this make sense? I can start a draft PR soon to get the ball rolling on discussion into actual code.

TODO:

  • Basic hookup (I can run a df.show())
  • Start to develop API contract
  • Delivery Guarantees
  • Water marking
  • Research on map_partitions and SortMergeJoin

Describe alternatives you've considered
I don't know, I could play more video games or something, but I'd rather do this. Joking aside, Flock has streaming capabilities but it's mostly based around cloud offerings and not a long running process like a Spark streaming job.

Additional context
@alamb additional ideas, how you wanna get started?

@hntd187 hntd187 added the enhancement New feature or request label Jan 11, 2022
@matthewmturner
Copy link
Contributor

I personally would love to see this and agree that making it extendable / trait based would be great. then we could also use aws kinesis for example.

@houqp
Copy link
Member

houqp commented Jan 11, 2022

I think it would be great to get first class streaming support in datafusion. In the long run, it might even make sense to rebuild our batch processing engine as a special case for the streaming engine. I feel like what we have right now is already pretty close to a micro batching streaming system, we are just pushing down large batches from table sources into downstream execution plans :P

+1 for iterating this out side of the main repo, at least for the initial PoC.

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 11, 2022

@houqp we will at least have to develop the API inside datafusion for now it would seem unless we can do it all via contrib. I'm not totally aware of all the package scoping that might prevent it from existing in contrib

@realno
Copy link
Contributor

realno commented Jan 11, 2022

I am excited to see this proposed. I am quite familiar with Flink and happy to participate in design and discussions. It would be nice if we can converge on an initial set of design principles, for example, delivery guarentees, out of order event handling, etc. From my experience the streaming engine will eventually be different enough from the batch one so it is probably a good idea to start separately. But if we do a good job to keep the DataFrame API and the operator API generic enough to work with both engines I think it will be a very promising project.

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 11, 2022

@realno So I can update my initial comment to flesh out that design as we walk through it.

  1. For delivery guarantees, it seems to make sense to do what Spark did which is exactly once delivery. Idempotent sinks, write ahead logs and checkpointing to ensure the offsets of the stream are always managed. This might be over kill for a first pass and maybe we don't have to initially recover from all failure scenarios but it seems like a good point to get to eventually.

  2. For out of order delivery, a first pass to support watermarking seems at the very least necessary. A lazy solution beyond that would be to tell the user to "deal with the possibility" which I think is fine for an MVP level implementation but we want to do better than that. @realno any suggestions I'm not aware of how other streaming engines handle the out of order possibility without doing some research.

@xudong963
Copy link
Member

WOW, recently I also studied streaming, lol. I can join you and learn more about streaming with you.

@alamb
Copy link
Contributor

alamb commented Jan 11, 2022

@hntd187 -- I think the proposal sounds good.

I think you were asking about mechanics here:

we will at least have to develop the API inside datafusion for now it would seem unless we can do it all via contrib. I'm not totally aware of all the package scoping that might prevent it from existing in contrib

Here is what I suggest:

  1. Create your contrib repo
  2. in Crates.toml link to a datafusion git repo rather than the version published on crates.io . For example:
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="401271377cd84dc1546827f66bda1b242860a6a8", default-features = false, package = "datafusion" }

Protip: while developing you can change this to point at your own local checkout like this

[patch.crates-io]
datafusion = { path='/path/to/you/local/arrow-datafusion/checkout' }

Then as you work on the streaming implementation, add whatever new DataFusion API / changes you need to a branch (perhaps against your own datafusion fork)

Once you have settled on an API, then open a PR to the main DataFusion repo to incorporate your new API

This will let you iterate on the new DataFusion API without having to worry about the test suite, reviews, delays in merging, etc.

@matthewmturner
Copy link
Contributor

Protip: while developing you can change this to point at your own local checkout like this

[patch.crates-io]
datafusion = { path='/path/to/you/local/arrow-datafusion/checkout' }

TIL

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 11, 2022

I was using the local checkout method prior it really helps with prototyping. :)

@alamb it was somewhat two questions both not worded well, I was asking mechanics but also where we thought the api level types should live for the time being.

@houqp
Copy link
Member

houqp commented Jan 12, 2022

I was asking mechanics but also where we thought the api level types should live for the time being.

I suggest for the timing being, have the API level types in your DF fork or a WIP PR to the upstream repo :)

@Igosuki
Copy link
Contributor

Igosuki commented Jan 14, 2022

I've used kafka-streams, flink and beam professionally, the point of streaming is to execute windowed functions, join aggregated data with in-memory tables and distribute these computations (DAGs) according to partition keys, so that the right value gets sent to the right thread.

In terms of priority, it looks to me like this would not be a reasonable thing to do before having methods like DataFrame::map_partition and traits like SortMergeJoin

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 14, 2022

I've used kafka-streams, flink and beam professionally, the point of streaming is to execute windowed functions, join aggregated data with in-memory tables and distribute these computations (DAGs) according to partition keys, so that the right value gets sent to the right thread.

In terms of priority, it looks to me like this would not be a reasonable thing to do before having methods like DataFrame::map_partition and traits like SortMergeJoin

We can add that to the design if we have to implement it. We maybe able to be dumb about it in the meantime.

    let ec = ExecutionContext::new();
    let dfs = DFSchema::new(
        vec![
            DFField::new(Some("topic1"), "key", DataType::Binary, false),
            DFField::new(Some("topic1"), "value", DataType::Binary, false),
        ]
    ).unwrap();
    let lp = LogicalPlan::StreamingScan(StreamScan {
        topic_name: "topic1".to_string(),
        source: Arc::new(KafkaExecutionPlan {
            time_window: Duration::from_millis(200),
            topic: "topic1".to_string(),
            batch_size: 5,
            conf: consumer_config("0", None),
        }),
        schema: Arc::new(dfs),
        batch_size: Some(5),
    });


    let df = DataFrameImpl::new(ec.state, &lp);


    timeout(Duration::from_secs(10), async move {
        let mut b = df.execute_stream().await.unwrap();
        let batch = b.next().await;
        dbg!(batch);
    }).await.unwrap();

Based on my awful awful base implementation this gets data back. Hooking this up has taken me into the depths of DF more than I hoped, I don't know if outside of a custom DataFrame impl this would be possible in a contrib module. I guess the struggle I am facing here is it seems like the dataframe right now has to get a "wait or end" of data reading notification, where obviously for streaming this really never comes. Is anyone more readily aware of where this occurs or if I'm on the right track?

@Igosuki
Copy link
Contributor

Igosuki commented Jan 14, 2022

There is prior art, the simplest of which is Kafka streams, but you can also look at materialize.

As for the issue here, I don't understand the issue ? If you want to communicate between async tasks, use channels.

Streaming operations are typically of a different kind, all common operations require a specific implementation to be able to be distributed and partitioned according to a topology
Aggregation operations accumulate data until a trigger sends the new value downstream.
Join operations wait for a left or right value and send the combination downstream when a new one arrives, for instance

@houqp
Copy link
Member

houqp commented Jan 15, 2022

I don't know if outside of a custom DataFrame impl this would be possible in a contrib module.

If this is not clear from the get go, you can also complete the PoC within datafusion and then we can look at the diff together to see which part should be factored out into extensions.

I guess the struggle I am facing here is it seems like the dataframe right now has to get a "wait or end" of data reading notification

I am also not following the question here. The SendableRecordBatchStream returned from execute_stream should be able to represent a never ending stream. Do you have a more concrete example of what you are trying to achieve?

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 15, 2022

I don't know if outside of a custom DataFrame impl this would be possible in a contrib module.

If this is not clear from the get go, you can also complete the PoC within datafusion and then we can look at the diff together to see which part should be factored out into extensions.

I guess the struggle I am facing here is it seems like the dataframe right now has to get a "wait or end" of data reading notification

I am also not following the question here. The SendableRecordBatchStream returned from execute_stream should be able to represent a never ending stream. Do you have a more concrete example of what you are trying to achieve?

I think I may have just confused myself thinking while I was writing the original post, execute stream seems good for now. Sorry for the confusion.

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 17, 2022

I checking my basic PoC work, I have much to do still but for anyone looking to assist or just review.

https://github.com/datafusion-contrib/datafusion-streams

@xudong963
Copy link
Member

anyone looking to assist or just review.

If ready, please ping me!

@jimexist
Copy link
Member

@hntd187 if you want you can transfer that to datafusion-contrib for better visibility.

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 17, 2022

@hntd187 if you want you can transfer that to datafusion-contrib for better visibility.

Perfect I wasn’t sure what my permissions were I’ll do that right now. Edit: nevermind I don't have permission, could someone help? :-)

@hntd187
Copy link
Contributor Author

hntd187 commented Jan 31, 2022

So just updating on this I'm starting to look into state management for streaming now. Tell me, do we have any concept right now if accessing or manipulating partitions or partition level information in data fusion?

@houqp
Copy link
Member

houqp commented Jan 31, 2022

@hntd187 most of the file partition pruning and partition column population logic lives in the ListingTable module as far as I know.

@hntd187
Copy link
Contributor Author

hntd187 commented Feb 22, 2022

Just an update I am still working on this, I've just been inundated with work so my progress had been slower than I'd like. I don't have any of the larger parts in a working state yet, but I am working on stream state management and sort merge joins right now.

@hntd187
Copy link
Contributor Author

hntd187 commented Feb 23, 2022

Also, @matthewmturner I know I agreed that we'd agree upon a general streaming interface here for DataFusion, I have a general idea of what that might look like, how do you want to formalize discussion so maybe I can split that component off so I can still make sure I make good on my Q1 agreement for that interface.

@matthewmturner
Copy link
Contributor

I think that @yjshen has lead the way with some great examples on designing new features like this with his work on ObjectStore and MemoryManager. Below are the two docs he drafted for that which i think worked really well for collecting feedback from the community and memorializing the design. Given we are are still at a fairly early stage, and if it isnt too much work on your side, do you think you could make something similar? Personally, I'm less concerned about hitting the timeline and more getting the design / interface / implementation details ironed out.

ObjectStore
https://docs.google.com/document/d/1ZEZqvdohrot0ewtTNeaBtqczOIJ1Q0OnX9PqMMxpOF8/edit#heading=h.358nvuimx7yr

MemoryManager et al
https://docs.google.com/document/d/1BT5HH-2sKq-Jxo51PNE6l9NNd_F-FyyYcyC3SKTnkIA/edit#heading=h.ims7dd49jei1

@hntd187
Copy link
Contributor Author

hntd187 commented Mar 5, 2022

@matthewmturner here is a first pass are people able to start editing this doc?

https://docs.google.com/document/d/1ZR171NcI_lrn7e7yVjSZFtCkNM7w2yzia_pH3b4QL_c/edit?usp=sharing

@matthewmturner
Copy link
Contributor

@hntd187 sry for delay. right now its not public so cant edit. can you update? then we can share with broader group so start collecting feedback.

@hntd187
Copy link
Contributor Author

hntd187 commented Mar 10, 2022

No problem, I updated to be editable.

@matthewmturner
Copy link
Contributor

@hntd187 I just posted to slack - sry for delay, i lost track of this.

@gadLinux
Copy link

This seems quite interesting to me. I was thinking about creating something similar with datafusion + rocksdb (similar to kstreams, but with full SQL support in the datastore). If you go on, please post here. I was looking too to BookKeeper for filesystem sync.

@yarenty
Copy link

yarenty commented Jun 24, 2022

I just trying to integrate datafusion with kafka, final goal is to have end-to-end streaming. But I started from a "different side" => step 1 is to publish output to kafka, so I copied code/ created kafka publisher: https://github.com/yarenty/arrow-datafusion/tree/master/datafusion/core/src/physical_plan/kafka

Test case is here:
https://github.com/yarenty/arrow-datafusion/blob/master/datafusion/core/tests/ordered_sql_to_kafka.rs

All finished with something like this:

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();
    ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;

    let df = ctx
        .sql("SELECT a, MIN(b) as bmin FROM example GROUP BY a ORDER BY a LIMIT 100")
        .await?;

    // kafka context
    let stream_ctx = KafkaContext::with_config(
        KafkaConfig::new("test_topic")
            .set("bootstrap.servers", "127.0.0.1:9092")
            .set("compression.codec", "snappy"),
    );

    df.publish_to_kafka( stream_ctx).await?;

    Ok(())
}

Still not sure if this is the correct way to do it and if I put code in the proper places ... still ... you are learning something new every day.

Is there any other place where you can share code / check ideas?

@hntd187
Copy link
Contributor Author

hntd187 commented Jun 24, 2022

There is https://github.com/datafusion-contrib/datafusion-streams which I haven't updated my work in a long time to, but probably a good place if you want to experiment. Probably good to have a sink for kafka, but I need to get back into this.

@alamb
Copy link
Contributor

alamb commented Mar 5, 2023

Let's move the discussion to #4285 which is actively being worked on. Thanks everyone who has reviewed and contributed so far

@alamb alamb closed this as completed Mar 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

10 participants