Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14257][SQL]Allow multiple continuous queries to be started from the same DataFrame #12049

Closed
wants to merge 7 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Mar 29, 2016

What changes were proposed in this pull request?

Make StreamingRelation store the closure to create the source in StreamExecution so that we can start multiple continuous queries from the same DataFrame.

How was this patch tested?

test("DataFrame reuse")

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54472 has finished for PR 12049 at commit 50c39b8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingRelation(

val source = sourceCreator()
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "_logicalPlan" has already used attributes of the previous `output`.
StreamingRelation(() => source, output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its confusing to have an opaque function that sometimes is creating a source and sometimes returning a static source. Its not going to be clear from explain() which mode you are in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its confusing to have an opaque function that sometimes is creating a source and sometimes returning a static source. Its not going to be clear from explain() which mode you are in.

How about adding a new Relation for a static source (maybe call it StreamExecutionRelation)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or StreamRelation could just hold a DataSource and we could have a Map[DataSource, Source] here thats initialized at startup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried Map[DataSource, Source] but failed because of RichSource.

  implicit class RichSource(s: Source) {
    def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingRelation(s))

    def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(s))
  }

If we only have StreamingRelaction(DataSource), then RichSource needs to create a DataSource for Source dynamically.

So the above codes will be changed to

  implicit class RichSource(s: Source) {
    def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingRelation(DataSource(sqlContext, className = ...)))

    def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(sqlContext, className = ...))
  }

Here I don't know what to fill for className. Without code generation, we won't be able to create a new class for different Source instances. This seems too complicated.

Therefore, I used the StreamExecutionRelation idea finally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not going to be clear from explain() which mode you are in.

By the way, the stream DataFrame has not yet supported explain. Should we fix it now?

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54653 has finished for PR 12049 at commit 6196790.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingRelation(dataSource: DataSource, output: Seq[Attribute]) extends LeafNode
    • case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode

@SparkQA
Copy link

SparkQA commented Apr 1, 2016

Test build #2722 has finished for PR 12049 at commit 6196790.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingRelation(dataSource: DataSource, output: Seq[Attribute]) extends LeafNode
    • case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode

@SparkQA
Copy link

SparkQA commented Apr 1, 2016

Test build #2723 has finished for PR 12049 at commit 6196790.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingRelation(dataSource: DataSource, output: Seq[Attribute]) extends LeafNode
    • case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode

@SparkQA
Copy link

SparkQA commented Apr 4, 2016

Test build #54878 has finished for PR 12049 at commit aa55afe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 4, 2016

Test build #54879 has finished for PR 12049 at commit ac51850.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 4, 2016

Test build #54881 has finished for PR 12049 at commit 9b5f007.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Materialize source to avoid creating it in every batch
val source = dataSource.createSource()
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "_logicalPlan" has already used attributes of the previous `output`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i don't see anything named _logicalPlan


/**
* Used to link a streaming [[DataSource]] into a
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include a description of how this gets turned into a StreamingExecutionRelation and who's responsibility that is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Apr 5, 2016

Test build #54932 has finished for PR 12049 at commit 527f55f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2016

Test build #54955 has finished for PR 12049 at commit 48d760e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])

@zsxwing
Copy link
Member Author

zsxwing commented Apr 5, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Apr 5, 2016

Test build #54994 has finished for PR 12049 at commit 48d760e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])

@marmbrus
Copy link
Contributor

marmbrus commented Apr 5, 2016

Thanks, merging to master.

@asfgit asfgit closed this in 463bac0 Apr 5, 2016
@zsxwing zsxwing deleted the df-reuse branch April 5, 2016 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants