Skip to content

Commit

Permalink
[SPARK-25076][SQL] SQLConf should not be retrieved from a stopped Spa…
Browse files Browse the repository at this point in the history
…rkSession

## What changes were proposed in this pull request?

When a `SparkSession` is stopped, `SQLConf.get` should use the fallback conf to avoid weird issues like
```
sbt.ForkMain$ForkError: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:97)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:80)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:93)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(SparkSession.scala:120)
	at scala.Option.getOrElse(Option.scala:121)
...
```

## How was this patch tested?

a new test suite

Closes #22056 from cloud-fan/session.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Xiao Li <[email protected]>
(cherry picked from commit fec67ed)
Signed-off-by: Xiao Li <[email protected]>
  • Loading branch information
cloud-fan authored and gatorsmile committed Aug 9, 2018
1 parent 7d465d8 commit 9bfc55b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class SparkSession private(

// If there is no active SparkSession, uses the default SQL conf. Otherwise, use the session's.
SQLConf.setSQLConfGetter(() => {
SparkSession.getActiveSession.map(_.sessionState.conf).getOrElse(SQLConf.getFallbackConf)
SparkSession.getActiveSession.filterNot(_.sparkContext.isStopped).map(_.sessionState.conf)
.getOrElse(SQLConf.getFallbackConf)
})

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@ trait LocalSparkSession extends BeforeAndAfterEach with BeforeAndAfterAll { self

override def afterEach() {
try {
resetSparkContext()
LocalSparkSession.stop(spark)
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
spark = null
} finally {
super.afterEach()
}
}

def resetSparkContext(): Unit = {
LocalSparkSession.stop(spark)
spark = null
}

}

object LocalSparkSession {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.internal

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{LocalSparkSession, SparkSession}

class SQLConfGetterSuite extends SparkFunSuite with LocalSparkSession {

test("SPARK-25076: SQLConf should not be retrieved from a stopped SparkSession") {
spark = SparkSession.builder().master("local").getOrCreate()
assert(SQLConf.get eq spark.sessionState.conf,
"SQLConf.get should get the conf from the active spark session.")
spark.stop()
assert(SQLConf.get eq SQLConf.getFallbackConf,
"SQLConf.get should not get conf from a stopped spark session.")
}
}

0 comments on commit 9bfc55b

Please sign in to comment.