-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Roll forward "[SPARK-23096][SS] Migrate rate source to V2" #20922
Conversation
## What changes were proposed in this pull request? This PR migrate micro batch rate source to V2 API and rewrite UTs to suite V2 test. ## How was this patch tested? UTs. Author: jerryshao <[email protected]> Closes apache#20688 from jerryshao/SPARK-23096.
@jerryshao - sorry to hijack the roll-forward - I'm excited about this PR and really want it in :) |
@@ -64,6 +64,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with | |||
if (loadTestDataBeforeTests) { | |||
loadTestData() | |||
} | |||
SparkSession.setActiveSession(spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The active session should be set before we execute the plan, right? For example, in QueryExecution for each query. What is the reason we need to do it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The active session is required for instantiating the DataSourceReader, which is done at planning time (spark.readStream.{...}.load()) in order to determine the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. What we should do instead is set the default session when the test spark session is initialized, since that initialization doesn't invoke SparkSession.getOrCreate() which normally sets it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This issue was spun off into #20926.)
Test build #88669 has finished for PR 20922 at commit
|
Test build #88674 has finished for PR 20922 at commit
|
Test build #88676 has finished for PR 20922 at commit
|
Thanks for the help @jose-torres . |
override def createMicroBatchReader( | ||
schema: Optional[StructType], | ||
checkpointLocation: String, | ||
options: DataSourceOptions): MicroBatchReader = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here if MicrobatchReadSupport
could pass in SparkSession
parameter like StreamSourceProvider#createSource
(sqlContext), then it is not required to get session from thread local variable or default variable in the specific source, also the UT doesn't required to setDefaultSession
.
That's what I thought when I did this refactoring work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think @jose-torres @tdas @gatorsmile ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that there's a mismatch here.
The reason it doesn't currently have this parameter is that one of the DataSourceV2 design goals (https://docs.google.com/document/d/1n_vUVbF4KD3gxTmkNEon5qdQ-Z8qU5Frf6WMQZ6jJVM/edit#heading=h.mi1fbff5f8f9) was to avoid API dependencies on upper level APIs like SparkSession. (IIRC Wenchen and I discussed SparkSession specifically in the design stage.) In this story, SparkSession.get{Active/Default}Session is just a way to keep our existing sources working rather than an encouraged development practice.
I agree that there's a mismatch which could be worth some discussion, but I think it's out of scope for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation @jose-torres . This seems like a quite common usage scenario, I also see that socket/kafka source and console sink require SparkSession, also in my customized hive streaming sink (https://github.com/jerryshao/spark-hive-streaming-sink/blob/7b3afcee280d2e70ffb12dde24184726b618829d/core/src/main/scala/com/hortonworks/spark/hive/HiveSourceProvider.scala#L47). If we add that parameter back, things might be much easier.
What's your opinion @cloud-fan ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give some concrete examples about why we need SparkSession
in data source implementations? If it's only for config, we should use DataSourceOptions
. If you wanna access some internal states of Spark, we should improve the interface to exposes these states explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example if one specific source requires HDFSMetadataLog
for recovery, then it requires SparkSession
. Also for my case, I need to get table description from catalog, it also requires SparkSession
. Though this may not be a typical use case, but I think it might be easy for user if we expose SparkSession
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does HDFSMetadataLog
need SparkSession?
For table description, I'd like to explicitly pass it via the interface, not SparkSession.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of constructor parameter of HDFSMetadataLog
is SparkSession
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can change it. It only needs a hadoop conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not pass SparkSession just for getting hadoopConf, right? cc @zsxwing
Test build #88734 has finished for PR 20922 at commit
|
retest this please |
Test build #88743 has finished for PR 20922 at commit
|
retest this please |
Test build #88746 has finished for PR 20922 at commit
|
thanks, merging to master! |
## What changes were proposed in this pull request? Roll forward c68ec4e (apache#20688). There are two minor test changes required: * An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException. * The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils. ## How was this patch tested? existing tests Author: Jose Torres <[email protected]> Author: jerryshao <[email protected]> Closes apache#20922 from jose-torres/ratefix.
Roll forward c68ec4e (apache#20688). There are two minor test changes required: * An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException. * The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils. existing tests Author: Jose Torres <[email protected]> Author: jerryshao <[email protected]> Closes apache#20922 from jose-torres/ratefix. Ref: LIHADOOP-48531
What changes were proposed in this pull request?
Roll forward c68ec4e (#20688).
There are two minor test changes required:
How was this patch tested?
existing tests