-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming support for DataFusion #1544
Comments
I personally would love to see this and agree that making it extendable / trait based would be great. then we could also use aws kinesis for example. |
I think it would be great to get first class streaming support in datafusion. In the long run, it might even make sense to rebuild our batch processing engine as a special case for the streaming engine. I feel like what we have right now is already pretty close to a micro batching streaming system, we are just pushing down large batches from table sources into downstream execution plans :P +1 for iterating this out side of the main repo, at least for the initial PoC. |
@houqp we will at least have to develop the API inside datafusion for now it would seem unless we can do it all via contrib. I'm not totally aware of all the package scoping that might prevent it from existing in contrib |
I am excited to see this proposed. I am quite familiar with Flink and happy to participate in design and discussions. It would be nice if we can converge on an initial set of design principles, for example, delivery guarentees, out of order event handling, etc. From my experience the streaming engine will eventually be different enough from the batch one so it is probably a good idea to start separately. But if we do a good job to keep the DataFrame API and the operator API generic enough to work with both engines I think it will be a very promising project. |
@realno So I can update my initial comment to flesh out that design as we walk through it.
|
WOW, recently I also studied streaming, lol. I can join you and learn more about streaming with you. |
@hntd187 -- I think the proposal sounds good. I think you were asking about mechanics here:
Here is what I suggest:
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="401271377cd84dc1546827f66bda1b242860a6a8", default-features = false, package = "datafusion" } Protip: while developing you can change this to point at your own local checkout like this [patch.crates-io]
datafusion = { path='/path/to/you/local/arrow-datafusion/checkout' } Then as you work on the streaming implementation, add whatever new DataFusion API / changes you need to a branch (perhaps against your own datafusion fork) Once you have settled on an API, then open a PR to the main DataFusion repo to incorporate your new API This will let you iterate on the new DataFusion API without having to worry about the test suite, reviews, delays in merging, etc. |
TIL |
I was using the local checkout method prior it really helps with prototyping. :) @alamb it was somewhat two questions both not worded well, I was asking mechanics but also where we thought the api level types should live for the time being. |
I suggest for the timing being, have the API level types in your DF fork or a WIP PR to the upstream repo :) |
I've used kafka-streams, flink and beam professionally, the point of streaming is to execute windowed functions, join aggregated data with in-memory tables and distribute these computations (DAGs) according to partition keys, so that the right value gets sent to the right thread. In terms of priority, it looks to me like this would not be a reasonable thing to do before having methods like |
We can add that to the design if we have to implement it. We maybe able to be dumb about it in the meantime. let ec = ExecutionContext::new();
let dfs = DFSchema::new(
vec![
DFField::new(Some("topic1"), "key", DataType::Binary, false),
DFField::new(Some("topic1"), "value", DataType::Binary, false),
]
).unwrap();
let lp = LogicalPlan::StreamingScan(StreamScan {
topic_name: "topic1".to_string(),
source: Arc::new(KafkaExecutionPlan {
time_window: Duration::from_millis(200),
topic: "topic1".to_string(),
batch_size: 5,
conf: consumer_config("0", None),
}),
schema: Arc::new(dfs),
batch_size: Some(5),
});
let df = DataFrameImpl::new(ec.state, &lp);
timeout(Duration::from_secs(10), async move {
let mut b = df.execute_stream().await.unwrap();
let batch = b.next().await;
dbg!(batch);
}).await.unwrap(); Based on my awful awful base implementation this gets data back. Hooking this up has taken me into the depths of DF more than I hoped, I don't know if outside of a custom DataFrame impl this would be possible in a contrib module. I guess the struggle I am facing here is it seems like the dataframe right now has to get a "wait or end" of data reading notification, where obviously for streaming this really never comes. Is anyone more readily aware of where this occurs or if I'm on the right track? |
There is prior art, the simplest of which is Kafka streams, but you can also look at materialize. As for the issue here, I don't understand the issue ? If you want to communicate between async tasks, use channels. Streaming operations are typically of a different kind, all common operations require a specific implementation to be able to be distributed and partitioned according to a topology |
If this is not clear from the get go, you can also complete the PoC within datafusion and then we can look at the diff together to see which part should be factored out into extensions.
I am also not following the question here. The |
I think I may have just confused myself thinking while I was writing the original post, execute stream seems good for now. Sorry for the confusion. |
I checking my basic PoC work, I have much to do still but for anyone looking to assist or just review. |
If ready, please ping me! |
@hntd187 if you want you can transfer that to datafusion-contrib for better visibility. |
Perfect I wasn’t sure what my permissions were I’ll do that right now. Edit: nevermind I don't have permission, could someone help? :-) |
So just updating on this I'm starting to look into state management for streaming now. Tell me, do we have any concept right now if accessing or manipulating partitions or partition level information in data fusion? |
@hntd187 most of the file partition pruning and partition column population logic lives in the ListingTable module as far as I know. |
Just an update I am still working on this, I've just been inundated with work so my progress had been slower than I'd like. I don't have any of the larger parts in a working state yet, but I am working on stream state management and sort merge joins right now. |
Also, @matthewmturner I know I agreed that we'd agree upon a general streaming interface here for DataFusion, I have a general idea of what that might look like, how do you want to formalize discussion so maybe I can split that component off so I can still make sure I make good on my Q1 agreement for that interface. |
I think that @yjshen has lead the way with some great examples on designing new features like this with his work on ObjectStore MemoryManager et al |
@matthewmturner here is a first pass are people able to start editing this doc? https://docs.google.com/document/d/1ZR171NcI_lrn7e7yVjSZFtCkNM7w2yzia_pH3b4QL_c/edit?usp=sharing |
@hntd187 sry for delay. right now its not public so cant edit. can you update? then we can share with broader group so start collecting feedback. |
No problem, I updated to be editable. |
@hntd187 I just posted to slack - sry for delay, i lost track of this. |
This seems quite interesting to me. I was thinking about creating something similar with datafusion + rocksdb (similar to kstreams, but with full SQL support in the datastore). If you go on, please post here. I was looking too to BookKeeper for filesystem sync. |
I just trying to integrate datafusion with kafka, final goal is to have end-to-end streaming. But I started from a "different side" => step 1 is to publish output to kafka, so I copied code/ created kafka publisher: https://github.com/yarenty/arrow-datafusion/tree/master/datafusion/core/src/physical_plan/kafka Test case is here: All finished with something like this: #[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;
let df = ctx
.sql("SELECT a, MIN(b) as bmin FROM example GROUP BY a ORDER BY a LIMIT 100")
.await?;
// kafka context
let stream_ctx = KafkaContext::with_config(
KafkaConfig::new("test_topic")
.set("bootstrap.servers", "127.0.0.1:9092")
.set("compression.codec", "snappy"),
);
df.publish_to_kafka( stream_ctx).await?;
Ok(())
} Still not sure if this is the correct way to do it and if I put code in the proper places ... still ... you are learning something new every day. Is there any other place where you can share code / check ideas? |
There is https://github.com/datafusion-contrib/datafusion-streams which I haven't updated my work in a long time to, but probably a good place if you want to experiment. Probably good to have a sink for kafka, but I need to get back into this. |
Let's move the discussion to #4285 which is actively being worked on. Thanks everyone who has reviewed and contributed so far |
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Consume a streaming source from Kafka. (to start) Datafusion already has a batch oriented processing framework that works pretty well. I wanna extend this to also be able to consume from streaming sources such as a kafka topic.
Describe the solution you'd like
So I think it makes sense to start with what Spark Streaming did for it's Streaming implementation which is the idea of micro-batching.
Generally the idea pictured above is DF will listen to a topic for some period of time (defined at start up) then execute operations on that collected batch window of events. In the case of Kafka there normally these are either JSON or Avro which already has encoders in DataFusion.
I spent sometime looking at the types in data source and I came to the conclusion that it would probably be possible to implement this on top of the current API, but frankly, it would suck. The source traits all have a naming convention centered around tables and files, which a Kafka topic is technically neither. Basically what I am saying is an implementation here would be highly confusing to anyone trying to understand why things are named what they are. I propose we add a set additional traits specifically for streaming sources. These would map to an execution plan like the other data sources, but should have ability to manage the stream information such as committing offsets, checkpointing, watermarking. These are probably secondary things to come a bit after a "get the thing to work" implementation, but I wanted to just put it out there that these traits initially would look rather bare and not have much difference from the other data sources. They would though quickly diverge from those contracts into ones that support managing these stream operations.
What I am not sure about is while these types should likely live in DataFusion the actual implementation probably should not. At least start as a contrib module and maybe be promoted into the main repo eventually if it makes sense.
Does this make sense? I can start a draft PR soon to get the ball rolling on discussion into actual code.
TODO:
df.show()
)map_partitions
andSortMergeJoin
Describe alternatives you've considered
I don't know, I could play more video games or something, but I'd rather do this. Joking aside, Flock has streaming capabilities but it's mostly based around cloud offerings and not a long running process like a Spark streaming job.
Additional context
@alamb additional ideas, how you wanna get started?
The text was updated successfully, but these errors were encountered: