Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Roll forward "[SPARK-23096][SS] Migrate rate source to V2" #20922

Closed
wants to merge 6 commits into from

Conversation

jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

Roll forward c68ec4e (#20688).

There are two minor test changes required:

  • An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException.
  • The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils.

How was this patch tested?

existing tests

jerryshao and others added 2 commits March 28, 2018 10:30
## What changes were proposed in this pull request?

This PR migrate micro batch rate source to V2 API and rewrite UTs to suite V2 test.

## How was this patch tested?

UTs.

Author: jerryshao <[email protected]>

Closes apache#20688 from jerryshao/SPARK-23096.
@jose-torres
Copy link
Contributor Author

@jerryshao - sorry to hijack the roll-forward - I'm excited about this PR and really want it in :)

@tdas @gatorsmile

@jose-torres jose-torres changed the title Ratefix Roll forward "[SPARK-23096][SS] Migrate rate source to V2" (#20921) Mar 28, 2018
@jose-torres jose-torres changed the title Roll forward "[SPARK-23096][SS] Migrate rate source to V2" (#20921) Roll forward "[SPARK-23096][SS] Migrate rate source to V2" Mar 28, 2018
@@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with
if (loadTestDataBeforeTests) {
loadTestData()
}
SparkSession.setActiveSession(spark)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The active session should be set before we execute the plan, right? For example, in QueryExecution for each query. What is the reason we need to do it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The active session is required for instantiating the DataSourceReader, which is done at planning time (spark.readStream.{...}.load()) in order to determine the schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. What we should do instead is set the default session when the test spark session is initialized, since that initialization doesn't invoke SparkSession.getOrCreate() which normally sets it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This issue was spun off into #20926.)

@SparkQA
Copy link

SparkQA commented Mar 28, 2018

Test build #88669 has finished for PR 20922 at commit 8e19125.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88674 has finished for PR 20922 at commit f494fb9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88676 has finished for PR 20922 at commit 2fec90b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

Thanks for the help @jose-torres .

override def createMicroBatchReader(
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceOptions): MicroBatchReader = {
Copy link
Contributor

@jerryshao jerryshao Mar 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here if MicrobatchReadSupport could pass in SparkSession parameter like StreamSourceProvider#createSource (sqlContext), then it is not required to get session from thread local variable or default variable in the specific source, also the UT doesn't required to setDefaultSession.

That's what I thought when I did this refactoring work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think @jose-torres @tdas @gatorsmile ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that there's a mismatch here.

The reason it doesn't currently have this parameter is that one of the DataSourceV2 design goals (https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit#heading=h.mi1fbff5f8f9) was to avoid API dependencies on upper level APIs like SparkSession. (IIRC Wenchen and I discussed SparkSession specifically in the design stage.) In this story, SparkSession.get{Active/Default}Session is just a way to keep our existing sources working rather than an encouraged development practice.

I agree that there's a mismatch which could be worth some discussion, but I think it's out of scope for this PR.

Copy link
Contributor

@jerryshao jerryshao Mar 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @jose-torres . This seems like a quite common usage scenario, I also see that socket/kafka source and console sink require SparkSession, also in my customized hive streaming sink (https://github.com/jerryshao/spark-hive-streaming-sink/blob/7b3afcee280d2e70ffb12dde24184726b618829d/core/src/main/scala/com/hortonworks/spark/hive/HiveSourceProvider.scala#L47). If we add that parameter back, things might be much easier.

What's your opinion @cloud-fan ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give some concrete examples about why we need SparkSession in data source implementations? If it's only for config, we should use DataSourceOptions. If you wanna access some internal states of Spark, we should improve the interface to exposes these states explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example if one specific source requires HDFSMetadataLog for recovery, then it requires SparkSession. Also for my case, I need to get table description from catalog, it also requires SparkSession. Though this may not be a typical use case, but I think it might be easy for user if we expose SparkSession.

Copy link
Contributor

@cloud-fan cloud-fan Mar 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does HDFSMetadataLog need SparkSession?

For table description, I'd like to explicitly pass it via the interface, not SparkSession.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of constructor parameter of HDFSMetadataLog is SparkSession.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can change it. It only needs a hadoop conf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not pass SparkSession just for getting hadoopConf, right? cc @zsxwing

@SparkQA
Copy link

SparkQA commented Mar 30, 2018

Test build #88734 has finished for PR 20922 at commit 9129f72.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 30, 2018

Test build #88743 has finished for PR 20922 at commit 9129f72.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 30, 2018

Test build #88746 has finished for PR 20922 at commit 9129f72.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 5b5a36e Mar 30, 2018
mshtelma pushed a commit to mshtelma/spark that referenced this pull request Apr 5, 2018
## What changes were proposed in this pull request?

Roll forward c68ec4e (apache#20688).

There are two minor test changes required:

* An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException.
* The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils.

## How was this patch tested?

existing tests

Author: Jose Torres <[email protected]>
Author: jerryshao <[email protected]>

Closes apache#20922 from jose-torres/ratefix.
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
Roll forward c68ec4e (apache#20688).

There are two minor test changes required:

* An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException.
* The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils.

existing tests

Author: Jose Torres <[email protected]>
Author: jerryshao <[email protected]>

Closes apache#20922 from jose-torres/ratefix.

Ref: LIHADOOP-48531
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants