-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23228][PYSPARK] Add Python Created jsparkSession to JVM's defaultSession #20404
Conversation
@zjffdu @HyukjinKwon please help to review. Thanks! |
Test build #86700 has finished for PR 20404 at commit
|
python/pyspark/sql/session.py
Outdated
@@ -225,6 +225,7 @@ def __init__(self, sparkContext, jsparkSession=None): | |||
if SparkSession._instantiatedSession is None \ | |||
or SparkSession._instantiatedSession._sc._jsc is None: | |||
SparkSession._instantiatedSession = self | |||
self._jvm.org.apache.spark.sql.SparkSession.setDefaultSession(self._jsparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it looks a bit odd test but should we better have a test like
spark._jvm.org.apache.spark.sql.SparkSession.getDefaultSession().nonEmpty()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setActiveSession
or setDefaultSession
? Which one is more proper to set here?
Btw, shall we clear it when stopping PySpark SparkSession?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By looking at Scala code, seems Scala getOrCreate
will set to defaultSession
I was thinking it is more proper to set to defaultSession
. Does PySpark support multiple sessions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, shall we clear it when stopping PySpark SparkSession?
JVM SparkSession will clear it when application is stopped (
defaultSession.set(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, It seems not because we don't call this code path. Stop and start logic is convoluted in PySpark in my humble opinion. Setting the default one fixes an actual issue and seems we are okay with it, at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplest way I can think of is just add it in def stop
:
self._jvm.org.apache.spark.sql.SparkSession.clearDefaultSession()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we can clear it in def stop
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is somewhat conflicted ... If I call SparkSession.getOrCreate in python and JVM already has a defaultSession, what should happen?
I suppose one approach would be to keep python independent.. but with this change it would overwrite the defaultSession that might be valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would overwrite. Can we try something like this?
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 6c84023c43f..0bdfc88153f 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -213,7 +213,10 @@ class SparkSession(object):
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
if jsparkSession is None:
- jsparkSession = self._jvm.SparkSession(self._jsc.sc())
+ if self._jvm.SparkSession.getDefaultSession().nonEmpty():
+ jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
+ else:
+ jsparkSession = self._jvm.SparkSession(self._jsc.sc())
self._jsparkSession = jsparkSession
self._jwrapped = self._jsparkSession.sqlContext()
self._wrapped = SQLContext(self._sc, self, self._jwrapped)
@@ -225,6 +228,8 @@ class SparkSession(object):
if SparkSession._instantiatedSession is None \
or SparkSession._instantiatedSession._sc._jsc is None:
SparkSession._instantiatedSession = self
+ if self._jvm.SparkSession.getDefaultSession().isEmpty():
+ self._jvm.SparkSession.setDefaultSession(self._jsparkSession)
def _repr_html_(self):
return """
@@ -759,6 +764,7 @@ class SparkSession(object):
"""Stop the underlying :class:`SparkContext`.
"""
self._sc.stop()
+ self._jvm.org.apache.spark.sql.SparkSession.clearDefaultSession()
SparkSession._instantiatedSession = None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @HyukjinKwon , let me update the code.
LGTM |
Test build #86752 has finished for PR 20404 at commit
|
python/pyspark/sql/session.py
Outdated
@@ -760,6 +764,7 @@ def stop(self): | |||
"""Stop the underlying :class:`SparkContext`. | |||
""" | |||
self._sc.stop() | |||
self._jvm.SparkSession.clearDefaultSession() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm.. If we didn't set it in L231, perhaps we shouldn't clear it?
Or if we are picking up the JVM one in L217, we shouldn't clear it either?
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, since we already stop the jvm SparkContext
with above line, it is not necessary to keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, let me make a PR to your branch @jerryshao to deal with the failure soon. I was looking into this out of my curiosity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also working on the failure, already figured out why.
Fix test failure and few minor clean up for tests - 20404
I think I made a duplicated effort .. thanks for taking this in. |
Thanks @HyukjinKwon for your help. |
Test build #86756 has finished for PR 20404 at commit
|
Jenkins, retest this please. |
Test build #86759 has finished for PR 20404 at commit
|
Test build #86760 has finished for PR 20404 at commit
|
jsparkSession = self._jvm.SparkSession(self._jsc.sc()) | ||
if self._jvm.SparkSession.getDefaultSession().isDefined() \ | ||
and not self._jvm.SparkSession.getDefaultSession().get() \ | ||
.sparkContext().isStopped(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. I will do it.
python/pyspark/sql/session.py
Outdated
@@ -225,6 +230,9 @@ def __init__(self, sparkContext, jsparkSession=None): | |||
if SparkSession._instantiatedSession is None \ | |||
or SparkSession._instantiatedSession._sc._jsc is None: | |||
SparkSession._instantiatedSession = self | |||
if self._jvm.SparkSession.getDefaultSession().isEmpty() \ | |||
or not jsparkSession.equals(self._jvm.SparkSession.getDefaultSession().get()): | |||
self._jvm.SparkSession.setDefaultSession(self._jsparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can simply overwrite the default session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@felixcheung has concern about simply overwriting the default session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might miss something, but I guess @felixcheung's concern was fixed by checking if the default session is defined and not stopped so we can put the valid session or the same session from JVM without checking anymore.
But I'm okay to leave it as it is as well.
Test build #86758 has finished for PR 20404 at commit
|
Test build #86762 has finished for PR 20404 at commit
|
Test build #86765 has finished for PR 20404 at commit
|
LGTM |
Test build #86766 has finished for PR 20404 at commit
|
Hi all, can you please review again, thanks! |
LGTM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this and thought we should be good.
I didn't quite know how this would be used though.
For instance, in a cross-language multi-threaded setting, it is possible that after getDefaultSession.get() is called here that someone is explicitly calling, say in scala, SparkSession.setDefaultSession such that it is no longer the same session python is holding reference to, and so we should not set to change it here?
@felixcheung I see, in that case, we should revert the last commit (cc4b851) to check the default session is updated or not? |
For perfectness, I think we should hold a lock with JVM instance but I wonder if it's easily possible. I roughly knew this but I think underestimated this because I thought that will quite unlikely happens. I think reverting cc4b851 doesn't fully resolve the issue because I think the same thing can also happen between |
How about something like this? --- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -213,10 +213,10 @@ class SparkSession(object):
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
if jsparkSession is None:
- if self._jvm.SparkSession.getDefaultSession().isDefined() \
- and not self._jvm.SparkSession.getDefaultSession().get() \
- .sparkContext().isStopped():
- jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
+ default_session = self._jvm.SparkSession.getDefaultSession()
+ if default_session.isDefined() \
+ and not default_session.get().sparkContext().isStopped():
+ jsparkSession = default_session
else:
jsparkSession = self._jvm.SparkSession(self._jsc.sc())
self._jsparkSession = jsparkSession
@@ -230,7 +230,7 @@ class SparkSession(object):
if SparkSession._instantiatedSession is None \
or SparkSession._instantiatedSession._sc._jsc is None:
SparkSession._instantiatedSession = self
- self._jvm.SparkSession.setDefaultSession(self._jsparkSession)
+ self._jvm.SparkSession.setDefaultSessionIfUnset(self._jsparkSession)
def _repr_html_(self):
return """
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 734573ba31f..99747ef88bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1031,6 +1031,18 @@ object SparkSession {
// Private methods from now on
////////////////////////////////////////////////////////////////////////////////////////
+ /**
+ * This if for Python blabla
+ */
+ private[sql] def setDefaultSessionIfUnset(session: SparkSession): Unit = {
+ // Global synchronization blabla
+ SparkSession.synchronized {
+ if (defaultSession.get() eq null) {
+ defaultSession.set(session)
+ }
+ }
+ }
+
/** The active SparkSession for the current thread. */
private val activeThreadSession = new InheritableThreadLocal[SparkSession] |
@HyukjinKwon I'm afraid that's not enough because we need to synchronize at all the places we touch the Fortunately I'm not sure whether we want to replace the default session which is already stopped, but in that case, we may also have to send the original default session to compare. |
Yup, sorry for rushing the suggestion. Wanted to show an idea. BTW, if we should replace it if it's already stopped, I assume we need the synchronization with |
Thanks all for your comments. I think @felixcheung 's case really makes thing complex, I'm not sure if user will use it in such way. I will try to address it. Appreciate your comments! |
I think the same issue also existed in Scala For example: val globalSession = SparkSession.build.getOrCreate()
val session1 = globalSession.newSession()
globalSession.sparkContext.stop()
// Thread1
val localSession = SparkSession.build.getOrCreate()
// Thread2 set default session during Thread1's getOrCreate
SparkSession.setDefaultSession(session1)
// After Thread1's getOrCreate is finished, it will still
// overwrite the default session to "localSession",
// not the one (session1) user explicitly set The case may not be typical. I think this is similar to what mentioned for PySpark. |
@felixcheung what is your opinion on this, do we really need to handle this case? |
It’s your call - there are some risks but seems like there are pre existing code that has the same problem.
|
Thanks Felix. I would incline to not fix the case mentioned by Felix. What's your opinion @HyukjinKwon @ueshin ? |
I am fine with not fixing the case here for now if we go for it only with master branch. Maybe, we can try to fix the pre existing issue too seperately by syncrhoziing everything if anyone feels strongly against within 2.4.0 timeline. To be honest, the case sounds rare and the fix could be a little bit overkill .. |
How about you @ueshin? |
I completely agree with @HyukjinKwon. |
Merged to master. |
Thanks all for your review, greatly appreciated. |
…ultSession In the current PySpark code, Python created `jsparkSession` doesn't add to JVM's defaultSession, this `SparkSession` object cannot be fetched from Java side, so the below scala code will be failed when loaded in PySpark application. ```scala class TestSparkSession extends SparkListener with Logging { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { case CreateTableEvent(db, table) => val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession) assert(session.isDefined) val tableInfo = session.get.sharedState.externalCatalog.getTable(db, table) logInfo(s"Table info ${tableInfo}") case e => logInfo(s"event $e") } } } ``` So here propose to add fresh create `jsparkSession` to `defaultSession`. Manual verification. Author: jerryshao <[email protected]> Author: hyukjinkwon <[email protected]> Author: Saisai Shao <[email protected]> Closes apache#20404 from jerryshao/SPARK-23228. (cherry picked from commit 3d0911b) Signed-off-by: jerryshao <[email protected]> Change-Id: Icaf904e718858ddf7ff7868b2cbf0b1ad2a14daa
What changes were proposed in this pull request?
In the current PySpark code, Python created
jsparkSession
doesn't add to JVM's defaultSession, thisSparkSession
object cannot be fetched from Java side, so the below scala code will be failed when loaded in PySpark application.So here propose to add fresh create
jsparkSession
todefaultSession
.How was this patch tested?
Manual verification.