-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14257][SQL]Allow multiple continuous queries to be started from the same DataFrame #12049
Conversation
Test build #54472 has finished for PR 12049 at commit
|
val source = sourceCreator() | ||
// We still need to use the previous `output` instead of `source.schema` as attributes in | ||
// "_logicalPlan" has already used attributes of the previous `output`. | ||
StreamingRelation(() => source, output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its confusing to have an opaque function that sometimes is creating a source
and sometimes returning a static source
. Its not going to be clear from explain()
which mode you are in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its confusing to have an opaque function that sometimes is creating a source and sometimes returning a static source. Its not going to be clear from explain() which mode you are in.
How about adding a new Relation for a static source (maybe call it StreamExecutionRelation)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or StreamRelation could just hold a DataSource
and we could have a Map[DataSource, Source]
here thats initialized at startup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried Map[DataSource, Source]
but failed because of RichSource.
implicit class RichSource(s: Source) {
def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingRelation(s))
def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(s))
}
If we only have StreamingRelaction(DataSource)
, then RichSource needs to create a DataSource for Source dynamically.
So the above codes will be changed to
implicit class RichSource(s: Source) {
def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingRelation(DataSource(sqlContext, className = ...)))
def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(sqlContext, className = ...))
}
Here I don't know what to fill for className
. Without code generation, we won't be able to create a new class for different Source instances. This seems too complicated.
Therefore, I used the StreamExecutionRelation
idea finally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not going to be clear from explain() which mode you are in.
By the way, the stream DataFrame has not yet supported explain
. Should we fix it now?
Test build #54653 has finished for PR 12049 at commit
|
Test build #2722 has finished for PR 12049 at commit
|
Test build #2723 has finished for PR 12049 at commit
|
Test build #54878 has finished for PR 12049 at commit
|
Test build #54879 has finished for PR 12049 at commit
|
Test build #54881 has finished for PR 12049 at commit
|
// Materialize source to avoid creating it in every batch | ||
val source = dataSource.createSource() | ||
// We still need to use the previous `output` instead of `source.schema` as attributes in | ||
// "_logicalPlan" has already used attributes of the previous `output`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i don't see anything named _logicalPlan
|
||
/** | ||
* Used to link a streaming [[DataSource]] into a | ||
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe include a description of how this gets turned into a StreamingExecutionRelation
and who's responsibility that is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #54932 has finished for PR 12049 at commit
|
Test build #54955 has finished for PR 12049 at commit
|
retest this please |
Test build #54994 has finished for PR 12049 at commit
|
Thanks, merging to master. |
What changes were proposed in this pull request?
Make StreamingRelation store the closure to create the source in StreamExecution so that we can start multiple continuous queries from the same DataFrame.
How was this patch tested?
test("DataFrame reuse")