Skip to content

Commit

Permalink
[SPARK-38246][CORE][SQL][SS][WEBUI] Refactor KVUtils and add UTs re…
Browse files Browse the repository at this point in the history
…lated to RocksDB

### What changes were proposed in this pull request?
The main change of this pr as follows:

1. Refactor `KVUtils` to let the `open` method can use the passed `conf` to construct the corresponding `KVStore`
2. Use new `KVUtils#open` to add UTs related to `RocksDB`, the new UTs cover the scenarios `LevelDB` has tested.

### Why are the changes needed?
Add more test scenarios related to `RocksDB`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA and add new UTs

Closes #35563 from LuciferYang/kvutils-open.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Feb 19, 2022
1 parent 4399755 commit 8a70aec
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
AppStatusStore.CURRENT_VERSION, logDir.toString())

try {
open(dbPath, metadata)
open(dbPath, metadata, conf)
} catch {
// If there's an error, remove the listing database and any existing UI database
// from the store directory, since it's extremely likely that they'll all contain
// incompatible information.
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
open(dbPath, metadata)
open(dbPath, metadata, conf)
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
// Get rid of the corrupted data and re-create it.
logWarning(s"Failed to load disk store $dbPath :", dbExc)
Utils.deleteRecursively(dbPath)
open(dbPath, metadata)
open(dbPath, metadata, conf)
}
}.getOrElse(new InMemoryStore())

Expand Down Expand Up @@ -1218,7 +1218,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// the existing data.
dm.openStore(appId, attempt.info.attemptId).foreach { path =>
try {
return KVUtils.open(path, metadata)
return KVUtils.open(path, metadata, conf)
} catch {
case e: Exception =>
logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e)
Expand Down Expand Up @@ -1284,14 +1284,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
try {
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
val diskStore = KVUtils.open(lease.tmpPath, metadata)
val diskStore = KVUtils.open(lease.tmpPath, metadata, conf)
hybridStore.setDiskStore(diskStore)
hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener {
override def onSwitchToDiskStoreSuccess: Unit = {
logInfo(s"Completely switched to diskStore for app $appId / ${attempt.info.attemptId}.")
diskStore.close()
val newStorePath = lease.commit(appId, attempt.info.attemptId)
hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata))
hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf))
memoryManager.release(appId, attempt.info.attemptId)
}
override def onSwitchToDiskStoreFail(e: Exception): Unit = {
Expand Down Expand Up @@ -1327,7 +1327,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(reader.totalSize, isCompressed)
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf)) { store =>
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
Expand All @@ -1345,7 +1345,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

KVUtils.open(newStorePath, metadata)
KVUtils.open(newStorePath, metadata, conf)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/status/KVUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ private[spark] object KVUtils extends Logging {
/** Use this to annotate constructor params to be used as KVStore indices. */
type KVIndexParam = KVIndex @getter

private lazy val backend =
HybridStoreDiskBackend.withName(new SparkConf().get(HYBRID_STORE_DISK_BACKEND))
private def backend(conf: SparkConf) =
HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND))

/**
* A KVStoreSerializer that provides Scala types serialization too, and uses the same options as
Expand All @@ -59,11 +59,12 @@ private[spark] object KVUtils extends Logging {
* @param metadata Metadata value to compare to the data in the store. If the store does not
* contain any metadata (e.g. it's a new store), this value is written as
* the store's metadata.
* @param conf SparkConf use to get `HYBRID_STORE_DISK_BACKEND`
*/
def open[M: ClassTag](path: File, metadata: M): KVStore = {
def open[M: ClassTag](path: File, metadata: M, conf: SparkConf): KVStore = {
require(metadata != null, "Metadata is required.")

val db = backend match {
val db = backend(conf) match {
case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.status.KVUtils
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest}
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.util.kvstore.KVStore

@ExtendedLevelDBTest
class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {

protected def backend: HybridStoreDiskBackend.Value

protected def extension: String

protected def conf: SparkConf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, backend.toString)

private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)

Expand All @@ -43,7 +50,7 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {

before {
testDir = Utils.createTempDir()
store = KVUtils.open(new File(testDir, "listing"), "test")
store = KVUtils.open(new File(testDir, "listing"), "test", conf)
}

after {
Expand Down Expand Up @@ -213,10 +220,20 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
}

test("SPARK-38095: appStorePath should use backend extensions") {
HybridStoreDiskBackend.values.zip(Seq(".ldb", ".rdb")).foreach { case (backend, extension) =>
val conf = new SparkConf().set(HYBRID_STORE_DISK_BACKEND, backend.toString)
val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock())
assert(manager.appStorePath("appId", None).getName.endsWith(extension))
}
val conf = new SparkConf().set(HYBRID_STORE_DISK_BACKEND, backend.toString)
val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock())
assert(manager.appStorePath("appId", None).getName.endsWith(extension))
}
}

@ExtendedLevelDBTest
class HistoryServerDiskManagerUseLevelDBSuite extends HistoryServerDiskManagerSuite {
override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.LEVELDB
override protected def extension: String = ".ldb"
}

@ExtendedRocksDBTest
class HistoryServerDiskManagerUseRocksDBSuite extends HistoryServerDiskManagerSuite {
override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB
override protected def extension: String = ".rdb"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend}
import org.apache.spark.internal.config.Status._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.ResourceProfile
Expand All @@ -36,15 +37,11 @@ import org.apache.spark.scheduler.cluster._
import org.apache.spark.status.ListenerEventsTestHelper._
import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest}
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}

@ExtendedLevelDBTest
class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
private val conf = new SparkConf()
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)
abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {

private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2)

Expand All @@ -53,7 +50,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
private var store: ElementTrackingStore = _
private var taskIdTracker = -1L

protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName())
protected def conf: SparkConf = new SparkConf()
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)

protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName(), conf)

before {
time = 0L
Expand Down Expand Up @@ -1891,3 +1892,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
class AppStatusListenerWithInMemoryStoreSuite extends AppStatusListenerSuite {
override def createKVStore: KVStore = new InMemoryStore()
}

@ExtendedLevelDBTest
class AppStatusListenerWithLevelDBSuite extends AppStatusListenerSuite {
override def conf: SparkConf = super.conf
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString)
}

@ExtendedRocksDBTest
class AppStatusListenerWithRocksDBSuite extends AppStatusListenerSuite {
override def conf: SparkConf = super.conf
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.status

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend}
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}
Expand Down Expand Up @@ -81,7 +82,8 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

private def createAppStore(disk: Boolean, live: Boolean): AppStatusStore = {
private def createAppStore(disk: Boolean, diskStoreType: HybridStoreDiskBackend.Value = null,
live: Boolean): AppStatusStore = {
val conf = new SparkConf()
if (live) {
return AppStatusStore.createLiveStore(conf)
Expand All @@ -92,8 +94,9 @@ class AppStatusStoreSuite extends SparkFunSuite {
}

val store: KVStore = if (disk) {
conf.set(HYBRID_STORE_DISK_BACKEND, diskStoreType.toString)
val testDir = Utils.createTempDir()
val diskStore = KVUtils.open(testDir, getClass.getName)
val diskStore = KVUtils.open(testDir, getClass.getName, conf)
new ElementTrackingStore(diskStore, conf)
} else {
new ElementTrackingStore(new InMemoryStore, conf)
Expand All @@ -102,7 +105,8 @@ class AppStatusStoreSuite extends SparkFunSuite {
}

Seq(
"disk" -> createAppStore(disk = true, live = false),
"disk leveldb" -> createAppStore(disk = true, HybridStoreDiskBackend.LEVELDB, live = false),
"disk rocksdb" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = false),
"in memory" -> createAppStore(disk = false, live = false),
"in memory live" -> createAppStore(disk = false, live = true)
).foreach { case (hint, appStore) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import java.util.{Date, UUID}
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
import org.apache.spark.sql.streaming
import org.apache.spark.status.{ElementTrackingStore, KVUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, RocksDB}
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}

class StreamingQueryStatusListenerSuite extends StreamTest {

Expand Down Expand Up @@ -221,8 +223,10 @@ class StreamingQueryStatusListenerSuite extends StreamTest {

test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") {
assume(!Utils.isMacOnAppleSilicon)
val conf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString)
val testDir = Utils.createTempDir()
val kvStore = KVUtils.open(testDir, getClass.getName)
val kvStore = KVUtils.open(testDir, getClass.getName, conf)
try {
testStreamingQueryData(kvStore)
} finally {
Expand All @@ -233,8 +237,10 @@ class StreamingQueryStatusListenerSuite extends StreamTest {

test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") {
assume(!Utils.isMacOnAppleSilicon)
val conf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
val testDir = Utils.createTempDir()
val kvStore = new RocksDB(testDir)
val kvStore = KVUtils.open(testDir, getClass.getName, conf)
try {
testStreamingQueryData(kvStore)
} finally {
Expand Down

0 comments on commit 8a70aec

Please sign in to comment.