Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38246][CORE][SQL][SS][WEBUI] Refactor KVUtils and add UTs related to RocksDB #35563

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
AppStatusStore.CURRENT_VERSION, logDir.toString())

try {
open(dbPath, metadata)
open(dbPath, metadata, conf)
} catch {
// If there's an error, remove the listing database and any existing UI database
// from the store directory, since it's extremely likely that they'll all contain
// incompatible information.
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
open(dbPath, metadata)
open(dbPath, metadata, conf)
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
// Get rid of the corrupted data and re-create it.
logWarning(s"Failed to load disk store $dbPath :", dbExc)
Utils.deleteRecursively(dbPath)
open(dbPath, metadata)
open(dbPath, metadata, conf)
}
}.getOrElse(new InMemoryStore())

Expand Down Expand Up @@ -1218,7 +1218,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// the existing data.
dm.openStore(appId, attempt.info.attemptId).foreach { path =>
try {
return KVUtils.open(path, metadata)
return KVUtils.open(path, metadata, conf)
} catch {
case e: Exception =>
logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e)
Expand Down Expand Up @@ -1284,14 +1284,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
try {
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
val diskStore = KVUtils.open(lease.tmpPath, metadata)
val diskStore = KVUtils.open(lease.tmpPath, metadata, conf)
hybridStore.setDiskStore(diskStore)
hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener {
override def onSwitchToDiskStoreSuccess: Unit = {
logInfo(s"Completely switched to diskStore for app $appId / ${attempt.info.attemptId}.")
diskStore.close()
val newStorePath = lease.commit(appId, attempt.info.attemptId)
hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata))
hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf))
memoryManager.release(appId, attempt.info.attemptId)
}
override def onSwitchToDiskStoreFail(e: Exception): Unit = {
Expand Down Expand Up @@ -1327,7 +1327,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(reader.totalSize, isCompressed)
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf)) { store =>
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
Expand All @@ -1345,7 +1345,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

KVUtils.open(newStorePath, metadata)
KVUtils.open(newStorePath, metadata, conf)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/status/KVUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ private[spark] object KVUtils extends Logging {
/** Use this to annotate constructor params to be used as KVStore indices. */
type KVIndexParam = KVIndex @getter

private lazy val backend =
HybridStoreDiskBackend.withName(new SparkConf().get(HYBRID_STORE_DISK_BACKEND))
private def backend(conf: SparkConf) =
HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND))

/**
* A KVStoreSerializer that provides Scala types serialization too, and uses the same options as
Expand All @@ -59,11 +59,12 @@ private[spark] object KVUtils extends Logging {
* @param metadata Metadata value to compare to the data in the store. If the store does not
* contain any metadata (e.g. it's a new store), this value is written as
* the store's metadata.
* @param conf SparkConf use to get `HYBRID_STORE_DISK_BACKEND`
*/
def open[M: ClassTag](path: File, metadata: M): KVStore = {
def open[M: ClassTag](path: File, metadata: M, conf: SparkConf): KVStore = {
require(metadata != null, "Metadata is required.")

val db = backend match {
val db = backend(conf) match {
case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.status.KVUtils
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest}
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.util.kvstore.KVStore

@ExtendedLevelDBTest
class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
abstract class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {

protected def backend: HybridStoreDiskBackend.Value

protected def extension: String

protected def conf: SparkConf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, backend.toString)

private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)

Expand All @@ -43,7 +50,7 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {

before {
testDir = Utils.createTempDir()
store = KVUtils.open(new File(testDir, "listing"), "test")
store = KVUtils.open(new File(testDir, "listing"), "test", conf)
}

after {
Expand Down Expand Up @@ -213,10 +220,20 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
}

test("SPARK-38095: appStorePath should use backend extensions") {
HybridStoreDiskBackend.values.zip(Seq(".ldb", ".rdb")).foreach { case (backend, extension) =>
val conf = new SparkConf().set(HYBRID_STORE_DISK_BACKEND, backend.toString)
val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock())
assert(manager.appStorePath("appId", None).getName.endsWith(extension))
}
val conf = new SparkConf().set(HYBRID_STORE_DISK_BACKEND, backend.toString)
val manager = new HistoryServerDiskManager(conf, testDir, store, new ManualClock())
assert(manager.appStorePath("appId", None).getName.endsWith(extension))
}
}

@ExtendedLevelDBTest
class HistoryServerDiskManagerUseLevelDBSuite extends HistoryServerDiskManagerSuite {
override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.LEVELDB
override protected def extension: String = ".ldb"
}

@ExtendedRocksDBTest
class HistoryServerDiskManagerUseRocksDBSuite extends HistoryServerDiskManagerSuite {
override protected def backend: HybridStoreDiskBackend.Value = HybridStoreDiskBackend.ROCKSDB
override protected def extension: String = ".rdb"
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend}
import org.apache.spark.internal.config.Status._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.ResourceProfile
Expand All @@ -36,15 +37,11 @@ import org.apache.spark.scheduler.cluster._
import org.apache.spark.status.ListenerEventsTestHelper._
import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest}
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}

@ExtendedLevelDBTest
class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
private val conf = new SparkConf()
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)
abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {

private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2)

Expand All @@ -53,7 +50,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
private var store: ElementTrackingStore = _
private var taskIdTracker = -1L

protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName())
protected def conf: SparkConf = new SparkConf()
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)

protected def createKVStore: KVStore = KVUtils.open(testDir, getClass().getName(), conf)

before {
time = 0L
Expand Down Expand Up @@ -1891,3 +1892,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
class AppStatusListenerWithInMemoryStoreSuite extends AppStatusListenerSuite {
override def createKVStore: KVStore = new InMemoryStore()
}

@ExtendedLevelDBTest
class AppStatusListenerWithLevelDBSuite extends AppStatusListenerSuite {
override def conf: SparkConf = super.conf
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString)
}

@ExtendedRocksDBTest
class AppStatusListenerWithRocksDBSuite extends AppStatusListenerSuite {
override def conf: SparkConf = super.conf
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.status

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend}
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}
Expand Down Expand Up @@ -81,7 +82,8 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

private def createAppStore(disk: Boolean, live: Boolean): AppStatusStore = {
private def createAppStore(disk: Boolean, diskStoreType: HybridStoreDiskBackend.Value = null,
live: Boolean): AppStatusStore = {
val conf = new SparkConf()
if (live) {
return AppStatusStore.createLiveStore(conf)
Expand All @@ -92,8 +94,9 @@ class AppStatusStoreSuite extends SparkFunSuite {
}

val store: KVStore = if (disk) {
conf.set(HYBRID_STORE_DISK_BACKEND, diskStoreType.toString)
val testDir = Utils.createTempDir()
val diskStore = KVUtils.open(testDir, getClass.getName)
val diskStore = KVUtils.open(testDir, getClass.getName, conf)
new ElementTrackingStore(diskStore, conf)
} else {
new ElementTrackingStore(new InMemoryStore, conf)
Expand All @@ -102,7 +105,8 @@ class AppStatusStoreSuite extends SparkFunSuite {
}

Seq(
"disk" -> createAppStore(disk = true, live = false),
"disk leveldb" -> createAppStore(disk = true, HybridStoreDiskBackend.LEVELDB, live = false),
"disk rocksdb" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = false),
"in memory" -> createAppStore(disk = false, live = false),
"in memory live" -> createAppStore(disk = false, live = true)
).foreach { case (hint, appStore) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import java.util.{Date, UUID}
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.execution.ui.StreamingQueryStatusStore
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
import org.apache.spark.sql.streaming
import org.apache.spark.status.{ElementTrackingStore, KVUtils}
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, RocksDB}
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}

class StreamingQueryStatusListenerSuite extends StreamTest {

Expand Down Expand Up @@ -221,8 +223,10 @@ class StreamingQueryStatusListenerSuite extends StreamTest {

test("SPARK-38056: test writing StreamingQueryData to a LevelDB store") {
assume(!Utils.isMacOnAppleSilicon)
val conf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString)
val testDir = Utils.createTempDir()
val kvStore = KVUtils.open(testDir, getClass.getName)
val kvStore = KVUtils.open(testDir, getClass.getName, conf)
try {
testStreamingQueryData(kvStore)
} finally {
Expand All @@ -233,8 +237,10 @@ class StreamingQueryStatusListenerSuite extends StreamTest {

test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") {
assume(!Utils.isMacOnAppleSilicon)
val conf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
val testDir = Utils.createTempDir()
val kvStore = new RocksDB(testDir)
val kvStore = KVUtils.open(testDir, getClass.getName, conf)
try {
testStreamingQueryData(kvStore)
} finally {
Expand Down