Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dataset validation #10

Merged
merged 11 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jvm-packages/scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ This file is divided into 3 sections:
<parameter name="groups">java,scala,3rdParty,dmlc</parameter>
<parameter name="group.java">javax?\..*</parameter>
<parameter name="group.scala">scala\..*</parameter>
<parameter name="group.3rdParty">(?!ml\.dmlc\.xgboost4j\.).*</parameter>
<parameter name="group.3rdParty">(?!ml\.dmlc\.xgboost4j).*</parameter>
<parameter name="group.dmlc">ml.dmlc.xgboost4j.*</parameter>
</parameters>
</check>
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ml.dmlc.xgboost4j.scala.spark.GPUXGBoostPlugin
ml.dmlc.xgboost4j.scala.spark.GpuXGBoostPlugin
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import _root_.scala.collection.JavaConverters._
import ml.dmlc.xgboost4j.java.{Column, ColumnBatch, QuantileDMatrix => JQuantileDMatrix, XGBoostError}

class QuantileDMatrix private[scala](
private[scala] override val jDMatrix: JQuantileDMatrix) extends DMatrix(jDMatrix) {
private[scala] override val jDMatrix: JQuantileDMatrix) extends DMatrix(jDMatrix) {

/**
* Create QuantileDMatrix from iterator based on the cuda array interface
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Copyright (c) 2024 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ml.dmlc.xgboost4j.scala.spark

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.seqAsJavaListConverter

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.ColumnarRdd
import org.apache.spark.ml.param.Param
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, Dataset}

import ml.dmlc.xgboost4j.java.{CudfColumnBatch, GpuColumnBatch}
import ml.dmlc.xgboost4j.scala.QuantileDMatrix
import ml.dmlc.xgboost4j.scala.spark.params.HasGroupCol

/**
* GpuXGBoostPlugin is the XGBoost plugin which leverage spark-rapids
* to accelerate the XGBoost from ETL to train.
*/
class GpuXGBoostPlugin extends XGBoostPlugin {

/**
* Whether the plugin is enabled or not, if not enabled, fallback
* to the regular CPU pipeline
*
* @param dataset the input dataset
* @return Boolean
*/
override def isEnabled(dataset: Dataset[_]): Boolean = {
val conf = dataset.sparkSession.conf
val hasRapidsPlugin = conf.get("spark.sql.extensions", "").split(",").contains(
"com.nvidia.spark.rapids.SQLExecPlugin")
val rapidsEnabled = conf.get("spark.rapids.sql.enabled", "false").toBoolean
hasRapidsPlugin && rapidsEnabled
}

// TODO, support numeric type
private def preprocess[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M], dataset: Dataset[_]): Dataset[_] = {

// Columns to be selected for XGBoost training
val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty
val schema = dataset.schema

def selectCol(c: Param[String]) = {
// TODO support numeric types
if (estimator.isDefinedNonEmpty(c)) {
selectedCols.append(estimator.castToFloatIfNeeded(schema, estimator.getOrDefault(c)))
}
}

Seq(estimator.labelCol, estimator.weightCol, estimator.baseMarginCol).foreach(selectCol)
estimator match {
case p: HasGroupCol => selectCol(p.groupCol)
case _ =>
}

// TODO support array/vector feature
estimator.getFeaturesCols.foreach { name =>
val col = estimator.castToFloatIfNeeded(dataset.schema, name)
selectedCols.append(col)
}
val input = dataset.select(selectedCols: _*)
estimator.repartitionIfNeeded(input)
}

private def validate[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M],
dataset: Dataset[_]): Unit = {
require(estimator.getTreeMethod == "gpu_hist" || estimator.getDevice != "cpu",
"Using Spark-Rapids to accelerate XGBoost must set device=cuda")
}

/**
* Convert Dataset to RDD[Watches] which will be fed into XGBoost
*
* @param estimator which estimator to be handled.
* @param dataset to be converted.
* @return RDD[Watches]
*/
override def buildRddWatches[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M],
dataset: Dataset[_]): RDD[Watches] = {

validate(estimator, dataset)

val train = preprocess(estimator, dataset)
val schema = train.schema

val indices = estimator.buildColumnIndices(schema)

val maxBin = estimator.getMaxBins
val nthread = estimator.getNthread
val missing = estimator.getMissing

/** build QuantilDMatrix on the executor side */
def buildQuantileDMatrix(iter: Iterator[Table]): QuantileDMatrix = {
val colBatchIter = iter.map { table =>
withResource(new GpuColumnBatch(table, null)) { batch =>
new CudfColumnBatch(
batch.slice(indices.featureIds.get.map(Integer.valueOf).asJava),
batch.slice(indices.labelId),
batch.slice(indices.weightId.getOrElse(-1)),
batch.slice(indices.marginId.getOrElse(-1)));
}
}
new QuantileDMatrix(colBatchIter, missing, maxBin, nthread)
}

estimator.getEvalDataset().map { evalDs =>
val evalProcessed = preprocess(estimator, evalDs)
ColumnarRdd(train.toDF()).zipPartitions(ColumnarRdd(evalProcessed.toDF())) {
(trainIter, evalIter) =>
val trainDM = buildQuantileDMatrix(trainIter)
val evalDM = buildQuantileDMatrix(evalIter)
Iterator.single(new Watches(Array(trainDM, evalDM),
Array(Utils.TRAIN_NAME, Utils.VALIDATION_NAME), None))
}
}.getOrElse(
ColumnarRdd(train.toDF()).mapPartitions { iter =>
val dm = buildQuantileDMatrix(iter)
Iterator.single(new Watches(Array(dm), Array(Utils.TRAIN_NAME), None))
}
)
}

/** Executes the provided code block and then closes the resource */
def withResource[T <: AutoCloseable, V](r: T)(block: T => V): V = {
try {
block(r)
} finally {
r.close()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class XXXXXSuite extends AnyFunSuite with GpuTestSuite {

var Array(trainDf, validationDf) = df.randomSplit(Array(0.8, 0.2), seed = 1)

trainDf = trainDf.withColumn("validation", lit(false))
validationDf = validationDf.withColumn("validationDf", lit(true))
// trainDf = trainDf.withColumn("validation", lit(false))
// validationDf = validationDf.withColumn("validationDf", lit(true))

df = trainDf.union(validationDf)
// df = trainDf.union(validationDf)
//
// // Assemble the feature columns into a single vector column
// val assembler = new VectorAssembler()
Expand All @@ -63,7 +63,9 @@ class XXXXXSuite extends AnyFunSuite with GpuTestSuite {
// .setBaseMarginCol("base_margin")
.setFeaturesCol(features)
.setLabelCol(labelCol)
.setValidationIndicatorCol("validation")
.setDevice("cuda")
.setEvalDataset(validationDf)
// .setValidationIndicatorCol("validation")
// .setPredictionCol("")
.setRawPredictionCol("")
.setProbabilityCol("xxxx")
Expand All @@ -76,16 +78,16 @@ class XXXXXSuite extends AnyFunSuite with GpuTestSuite {
println(loadedEst.getNumRound)
println(loadedEst.getMaxDepth)

val model = loadedEst.fit(df)
val model = est.fit(trainDf)
println("-----------------------")
println(model.getNumRound)
println(model.getMaxDepth)

model.write.overwrite().save("/tmp/model/")
val loadedModel = XGBoostClassificationModel.load("/tmp/model")
println(loadedModel.getNumRound)
println(loadedModel.getMaxDepth)
model.transform(df).drop(features: _*).show(150, false)
// model.write.overwrite().save("/tmp/model/")
// val loadedModel = XGBoostClassificationModel.load("/tmp/model")
// println(loadedModel.getNumRound)
// println(loadedModel.getMaxDepth)
// model.transform(df).drop(features: _*).show(150, false)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import ml.dmlc.xgboost4j.java.{Communicator, RabitTracker, XGBoostError}
import ml.dmlc.xgboost4j.scala.{XGBoost => SXGBoost, _}

private[spark] case class RuntimeParams(
numWorkers: Int,
numRounds: Int,
obj: ObjectiveTrait,
eval: EvalTrait,
trackerConf: TrackerConf,
earlyStoppingRounds: Int,
device: String,
isLocal: Boolean,
runOnGpu: Boolean)
numWorkers: Int,
numRounds: Int,
obj: ObjectiveTrait,
eval: EvalTrait,
trackerConf: TrackerConf,
earlyStoppingRounds: Int,
device: String,
isLocal: Boolean,
runOnGpu: Boolean)

/**
* A trait to manage stage-level scheduling
Expand Down Expand Up @@ -195,7 +195,11 @@ private[spark] object XGBoost extends StageLevelScheduling {
rabitEnv.put("DMLC_TASK_ID", partitionId.toString)

try {
Communicator.init(rabitEnv)
try {
Communicator.init(rabitEnv)
} catch {
case e: Throwable => logger.error(e)
}
val numEarlyStoppingRounds = runtimeParams.earlyStoppingRounds
val metrics = Array.tabulate(watches.size)(_ =>
Array.ofDim[Float](runtimeParams.numRounds))
Expand Down Expand Up @@ -282,7 +286,11 @@ private[spark] object XGBoost extends StageLevelScheduling {
logger.error("XGBoost job was aborted due to ", t)
throw t
} finally {
tracker.stop()
try {
tracker.stop()
} catch {
case t: Throwable => logger.error(t)
}
}
}
}
Expand Down
Loading
Loading