Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23228][PYSPARK] Add Python Created jsparkSession to JVM's defaultSession #20404

Closed
wants to merge 10 commits into from

Conversation

jerryshao
Copy link
Contributor

What changes were proposed in this pull request?

In the current PySpark code, Python created jsparkSession doesn't add to JVM's defaultSession, this SparkSession object cannot be fetched from Java side, so the below scala code will be failed when loaded in PySpark application.

class TestSparkSession extends SparkListener with Logging {
  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    event match {
      case CreateTableEvent(db, table) =>
        val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
        assert(session.isDefined)
        val tableInfo = session.get.sharedState.externalCatalog.getTable(db, table)
        logInfo(s"Table info ${tableInfo}")

      case e =>
        logInfo(s"event $e")

    }
  }
}

So here propose to add fresh create jsparkSession to defaultSession.

How was this patch tested?

Manual verification.

@jerryshao
Copy link
Contributor Author

@zjffdu @HyukjinKwon please help to review. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 26, 2018

Test build #86700 has finished for PR 20404 at commit d9189ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -225,6 +225,7 @@ def __init__(self, sparkContext, jsparkSession=None):
if SparkSession._instantiatedSession is None \
or SparkSession._instantiatedSession._sc._jsc is None:
SparkSession._instantiatedSession = self
self._jvm.org.apache.spark.sql.SparkSession.setDefaultSession(self._jsparkSession)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it looks a bit odd test but should we better have a test like

spark._jvm.org.apache.spark.sql.SparkSession.getDefaultSession().nonEmpty()

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setActiveSession or setDefaultSession? Which one is more proper to set here?

Btw, shall we clear it when stopping PySpark SparkSession?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By looking at Scala code, seems Scala getOrCreate will set to defaultSession I was thinking it is more proper to set to defaultSession. Does PySpark support multiple sessions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, shall we clear it when stopping PySpark SparkSession?

JVM SparkSession will clear it when application is stopped (

).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, It seems not because we don't call this code path. Stop and start logic is convoluted in PySpark in my humble opinion. Setting the default one fixes an actual issue and seems we are okay with it, at least.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplest way I can think of is just add it in def stop:

self._jvm.org.apache.spark.sql.SparkSession.clearDefaultSession()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we can clear it in def stop.

Copy link
Member

@felixcheung felixcheung Jan 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is somewhat conflicted ... If I call SparkSession.getOrCreate in python and JVM already has a defaultSession, what should happen?
I suppose one approach would be to keep python independent.. but with this change it would overwrite the defaultSession that might be valid?

Copy link
Member

@HyukjinKwon HyukjinKwon Jan 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would overwrite. Can we try something like this?

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 6c84023c43f..0bdfc88153f 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -213,7 +213,10 @@ class SparkSession(object):
         self._jsc = self._sc._jsc
         self._jvm = self._sc._jvm
         if jsparkSession is None:
-            jsparkSession = self._jvm.SparkSession(self._jsc.sc())
+            if self._jvm.SparkSession.getDefaultSession().nonEmpty():
+                jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
+            else:
+                jsparkSession = self._jvm.SparkSession(self._jsc.sc())
         self._jsparkSession = jsparkSession
         self._jwrapped = self._jsparkSession.sqlContext()
         self._wrapped = SQLContext(self._sc, self, self._jwrapped)
@@ -225,6 +228,8 @@ class SparkSession(object):
         if SparkSession._instantiatedSession is None \
                 or SparkSession._instantiatedSession._sc._jsc is None:
             SparkSession._instantiatedSession = self
+            if self._jvm.SparkSession.getDefaultSession().isEmpty():
+               self._jvm.SparkSession.setDefaultSession(self._jsparkSession)

     def _repr_html_(self):
         return """
@@ -759,6 +764,7 @@ class SparkSession(object):
         """Stop the underlying :class:`SparkContext`.
         """
         self._sc.stop()
+        self._jvm.org.apache.spark.sql.SparkSession.clearDefaultSession()
         SparkSession._instantiatedSession = None

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HyukjinKwon , let me update the code.

@HyukjinKwon
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86752 has finished for PR 20404 at commit eec4386.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -760,6 +764,7 @@ def stop(self):
"""Stop the underlying :class:`SparkContext`.
"""
self._sc.stop()
self._jvm.SparkSession.clearDefaultSession()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. If we didn't set it in L231, perhaps we shouldn't clear it?
Or if we are picking up the JVM one in L217, we shouldn't clear it either?

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, since we already stop the jvm SparkContext with above line, it is not necessary to keep it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, let me make a PR to your branch @jerryshao to deal with the failure soon. I was looking into this out of my curiosity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also working on the failure, already figured out why.

@HyukjinKwon
Copy link
Member

I think I made a duplicated effort .. thanks for taking this in.

@jerryshao
Copy link
Contributor Author

Thanks @HyukjinKwon for your help.

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86756 has finished for PR 20404 at commit dd1c991.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86759 has finished for PR 20404 at commit ec94c05.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86760 has finished for PR 20404 at commit d9f77ea.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

jsparkSession = self._jvm.SparkSession(self._jsc.sc())
if self._jvm.SparkSession.getDefaultSession().isDefined() \
and not self._jvm.SparkSession.getDefaultSession().get() \
.sparkContext().isStopped():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this change at 4ba3aa2 is enough to fix the previous test failure (ERROR: test_sparksession_with_stopped_sparkcontext (pyspark.sql.tests.SQLTests2)) and we can revert moving self._jvm.SparkSession.clearDefaultSession() to SparkContext.stop() at
0319fa5 now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. I will do it.

@@ -225,6 +230,9 @@ def __init__(self, sparkContext, jsparkSession=None):
if SparkSession._instantiatedSession is None \
or SparkSession._instantiatedSession._sc._jsc is None:
SparkSession._instantiatedSession = self
if self._jvm.SparkSession.getDefaultSession().isEmpty() \
or not jsparkSession.equals(self._jvm.SparkSession.getDefaultSession().get()):
self._jvm.SparkSession.setDefaultSession(self._jsparkSession)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can simply overwrite the default session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung has concern about simply overwriting the default session.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss something, but I guess @felixcheung's concern was fixed by checking if the default session is defined and not stopped so we can put the valid session or the same session from JVM without checking anymore.
But I'm okay to leave it as it is as well.

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86758 has finished for PR 20404 at commit dd1c991.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86762 has finished for PR 20404 at commit 1ed62ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86765 has finished for PR 20404 at commit e5f4b58.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

LGTM

@SparkQA
Copy link

SparkQA commented Jan 29, 2018

Test build #86766 has finished for PR 20404 at commit cc4b851.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

Hi all, can you please review again, thanks!

@ueshin
Copy link
Member

ueshin commented Jan 30, 2018

LGTM.
I'd like to leave this to @felixcheung to confirm setting the default session is okay or not (#20404 (comment)).

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this and thought we should be good.

I didn't quite know how this would be used though.
For instance, in a cross-language multi-threaded setting, it is possible that after getDefaultSession.get() is called here that someone is explicitly calling, say in scala, SparkSession.setDefaultSession such that it is no longer the same session python is holding reference to, and so we should not set to change it here?

@ueshin
Copy link
Member

ueshin commented Jan 30, 2018

@felixcheung I see, in that case, we should revert the last commit (cc4b851) to check the default session is updated or not?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 30, 2018

For perfectness, I think we should hold a lock with JVM instance but I wonder if it's easily possible. I roughly knew this but I think underestimated this because I thought that will quite unlikely happens. I think reverting cc4b851 doesn't fully resolve the issue because I think the same thing can also happen between if and the next line.

@HyukjinKwon
Copy link
Member

How about something like this?

--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -213,10 +213,10 @@ class SparkSession(object):
         self._jsc = self._sc._jsc
         self._jvm = self._sc._jvm
         if jsparkSession is None:
-            if self._jvm.SparkSession.getDefaultSession().isDefined() \
-                    and not self._jvm.SparkSession.getDefaultSession().get() \
-                        .sparkContext().isStopped():
-                jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
+            default_session = self._jvm.SparkSession.getDefaultSession()
+            if default_session.isDefined() \
+                    and not default_session.get().sparkContext().isStopped():
+                jsparkSession = default_session
             else:
                 jsparkSession = self._jvm.SparkSession(self._jsc.sc())
         self._jsparkSession = jsparkSession
@@ -230,7 +230,7 @@ class SparkSession(object):
         if SparkSession._instantiatedSession is None \
                 or SparkSession._instantiatedSession._sc._jsc is None:
             SparkSession._instantiatedSession = self
-            self._jvm.SparkSession.setDefaultSession(self._jsparkSession)
+            self._jvm.SparkSession.setDefaultSessionIfUnset(self._jsparkSession)

     def _repr_html_(self):
         return """
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 734573ba31f..99747ef88bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1031,6 +1031,18 @@ object SparkSession {
   // Private methods from now on
   ////////////////////////////////////////////////////////////////////////////////////////

+  /**
+   * This if for Python blabla
+   */
+  private[sql] def setDefaultSessionIfUnset(session: SparkSession): Unit = {
+    // Global synchronization blabla
+    SparkSession.synchronized {
+      if (defaultSession.get() eq null) {
+        defaultSession.set(session)
+      }
+    }
+  }
+
   /** The active SparkSession for the current thread. */
   private val activeThreadSession = new InheritableThreadLocal[SparkSession]

@ueshin
Copy link
Member

ueshin commented Jan 30, 2018

@HyukjinKwon I'm afraid that's not enough because we need to synchronize at all the places we touch the defaultSession, otherwise another thread can set the value between defaultSession.get() eq null and defaultSession.set(session).

Fortunately defaultSession is an AtomicReference, so maybe we can use defaultSession.compareAndSet(null, session) without the synchronization in the method setDefaultSessionIfUnset() if we only need to set the default session is null.

I'm not sure whether we want to replace the default session which is already stopped, but in that case, we may also have to send the original default session to compare.

@HyukjinKwon
Copy link
Member

Yup, sorry for rushing the suggestion. Wanted to show an idea. BTW, if we should replace it if it's already stopped, I assume we need the synchronization with defaultSession anyway if I didn't misunderstand. Did I maybe miss something?

@jerryshao
Copy link
Contributor Author

Thanks all for your comments. I think @felixcheung 's case really makes thing complex, I'm not sure if user will use it in such way. I will try to address it. Appreciate your comments!

@jerryshao
Copy link
Contributor Author

jerryshao commented Jan 30, 2018

I think the same issue also existed in Scala SparkSession code, because setDefaultSession doesn't hold a lock which holds by getOrCreate (SparkSession).

For example:

val globalSession = SparkSession.build.getOrCreate()
val session1 = globalSession.newSession()

globalSession.sparkContext.stop()

// Thread1
val localSession = SparkSession.build.getOrCreate()

// Thread2 set default session during Thread1's getOrCreate
SparkSession.setDefaultSession(session1)

// After Thread1's getOrCreate is finished, it will still 
// overwrite the default session to "localSession", 
// not the one (session1) user explicitly set 

The case may not be typical. I think this is similar to what mentioned for PySpark.

@jerryshao
Copy link
Contributor Author

@felixcheung what is your opinion on this, do we really need to handle this case?

@felixcheung
Copy link
Member

felixcheung commented Jan 31, 2018 via email

@jerryshao
Copy link
Contributor Author

Thanks Felix. I would incline to not fix the case mentioned by Felix. What's your opinion @HyukjinKwon @ueshin ?

@HyukjinKwon
Copy link
Member

I am fine with not fixing the case here for now if we go for it only with master branch. Maybe, we can try to fix the pre existing issue too seperately by syncrhoziing everything if anyone feels strongly against within 2.4.0 timeline. To be honest, the case sounds rare and the fix could be a little bit overkill ..

@HyukjinKwon
Copy link
Member

How about you @ueshin?

@ueshin
Copy link
Member

ueshin commented Jan 31, 2018

I completely agree with @HyukjinKwon.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 3d0911b Jan 31, 2018
@jerryshao
Copy link
Contributor Author

Thanks all for your review, greatly appreciated.

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…ultSession

In the current PySpark code, Python created `jsparkSession` doesn't add to JVM's defaultSession, this `SparkSession` object cannot be fetched from Java side, so the below scala code will be failed when loaded in PySpark application.

```scala
class TestSparkSession extends SparkListener with Logging {
  override def onOtherEvent(event: SparkListenerEvent): Unit = {
    event match {
      case CreateTableEvent(db, table) =>
        val session = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
        assert(session.isDefined)
        val tableInfo = session.get.sharedState.externalCatalog.getTable(db, table)
        logInfo(s"Table info ${tableInfo}")

      case e =>
        logInfo(s"event $e")

    }
  }
}
```

So here propose to add fresh create `jsparkSession` to `defaultSession`.

Manual verification.

Author: jerryshao <[email protected]>
Author: hyukjinkwon <[email protected]>
Author: Saisai Shao <[email protected]>

Closes apache#20404 from jerryshao/SPARK-23228.

(cherry picked from commit 3d0911b)
Signed-off-by: jerryshao <[email protected]>

Change-Id: Icaf904e718858ddf7ff7868b2cbf0b1ad2a14daa
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants