Skip to content

Commit

Permalink
Support customized objective (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
wbo4958 authored Jun 21, 2024
1 parent ef7f8b6 commit 693860a
Show file tree
Hide file tree
Showing 11 changed files with 545 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,33 @@

package ml.dmlc.xgboost4j.scala.spark

import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.json4s.{DefaultFormats, FullTypeHints, JField, JValue, NoTypeHints, TypeHints}

import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}

// based on org.apache.spark.util copy /paste
object Utils {

private[spark] implicit class XGBLabeledPointFeatures(
val labeledPoint: XGBLabeledPoint
) extends AnyVal {
/** Converts the point to [[MLLabeledPoint]]. */
private[spark] def asML: MLLabeledPoint = {
MLLabeledPoint(labeledPoint.label, labeledPoint.features)
}

/**
* Returns feature of the point as [[org.apache.spark.ml.linalg.Vector]].
*/
def features: Vector = if (labeledPoint.indices == null) {
Vectors.dense(labeledPoint.values.map(_.toDouble))
} else {
Vectors.sparse(labeledPoint.size, labeledPoint.indices, labeledPoint.values.map(_.toDouble))
}
}

private[spark] implicit class MLVectorToXGBLabeledPoint(val v: Vector) extends AnyVal {
/**
* Converts a [[Vector]] to a data point with a dummy label.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import ml.dmlc.xgboost4j.scala.{XGBoost => SXGBoost, _}
private[spark] case class RuntimeParams(
numWorkers: Int,
numRounds: Int,
obj: ObjectiveTrait,
eval: EvalTrait,
trackerConf: TrackerConf,
earlyStoppingRounds: Int,
device: String,
isLocal: Boolean,
runOnGpu: Boolean)
runOnGpu: Boolean,
obj: Option[ObjectiveTrait] = None,
eval: Option[EvalTrait] = None)

/**
* A trait to manage stage-level scheduling
Expand Down Expand Up @@ -216,8 +216,8 @@ private[spark] object XGBoost extends StageLevelScheduling {
logger.info("Leveraging gpu device " + gpuId + " to train")
params = params + ("device" -> s"cuda:$gpuId")
}
SXGBoost.train(watches.toMap("train"), params, runtimeParams.numRounds,
watches.toMap, metrics, runtimeParams.obj, runtimeParams.eval,
SXGBoost.train(watches.toMap("train"), params, runtimeParams.numRounds, watches.toMap,
metrics, runtimeParams.obj.getOrElse(null), runtimeParams.eval.getOrElse(null),
earlyStoppingRound = numEarlyStoppingRounds)
} catch {
case xgbException: XGBoostError =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,18 +298,18 @@ private[spark] abstract class XGBoostEstimator[

protected def createModel(booster: Booster, summary: XGBoostTrainingSummary): M

private def getRuntimeParameters(isLocal: Boolean): RuntimeParams = {
private[spark] def getRuntimeParameters(isLocal: Boolean): RuntimeParams = {
val runOnGpu = if (getDevice != "cpu" || getTreeMethod == "gpu_hist") true else false
RuntimeParams(
getNumWorkers,
getNumRound,
null, // TODO support ObjectiveTrait
null, // TODO support EvalTrait
TrackerConf(getRabitTrackerTimeout, getRabitTrackerHostIp, getRabitTrackerPort),
getNumEarlyStoppingRounds,
getDevice,
isLocal,
runOnGpu
runOnGpu,
Option(getCustomObj),
Option(getCustomEval)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private[spark] trait TreeBoosterParams extends Params {

private[spark] object BoosterParams {

val supportedTreeMethods = HashSet("auto", "exact", "approx", "hist")
val supportedTreeMethods = HashSet("auto", "exact", "approx", "hist", "gpu_hist")

val supportedUpdaters = HashSet("grow_colmaker", "grow_histmaker", "grow_quantile_histmaker",
"grow_gpu_hist", "grow_gpu_approx", "sync", "refresh", "prune")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.xgboost.SparkUtils
import org.apache.spark.sql.types.{DoubleType, StructType}

import ml.dmlc.xgboost4j.scala.{EvalTrait, ObjectiveTrait}


trait HasLeafPredictionCol extends Params {
/**
Expand Down Expand Up @@ -152,13 +154,23 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe

final def getMissing: Float = $(missing)

final val customObj = new CustomObjParam(this, "customObj", "customized objective function " +
"provided by user")

final def getCustomObj: ObjectiveTrait = $(customObj)

final val customEval = new CustomEvalParam(this, "customEval",
"customized evaluation function provided by user")

final def getCustomEval: EvalTrait = $(customEval)

setDefault(numRound -> 100, numWorkers -> 1, inferBatchSize -> (32 << 10),
numEarlyStoppingRounds -> 0, forceRepartition -> false, missing -> Float.NaN,
featuresCols -> Array.empty)
featuresCols -> Array.empty, customObj -> null, customEval -> null)

addNonXGBoostParam(numWorkers, numRound, numEarlyStoppingRounds, inferBatchSize, featuresCol,
labelCol, baseMarginCol, weightCol, predictionCol, leafPredictionCol, contribPredictionCol,
forceRepartition, missing, featuresCols)
forceRepartition, missing, featuresCols, customEval, customObj)

final def getNumWorkers: Int = $(numWorkers)

Expand Down Expand Up @@ -188,6 +200,10 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe

def setMissing(value: Float): T = set(missing, value).asInstanceOf[T]

def setCustomObj(value: ObjectiveTrait): T = set(customObj, value).asInstanceOf[T]

def setCustomEval(value: EvalTrait): T = set(customEval, value).asInstanceOf[T]

def setRabitTrackerTimeout(value: Int): T = set(rabitTrackerTimeout, value).asInstanceOf[T]

def setRabitTrackerHostIp(value: String): T = set(rabitTrackerHostIp, value).asInstanceOf[T]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright (c) 2021 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ml.dmlc.xgboost4j.scala.spark

import scala.collection.mutable.ListBuffer

import org.apache.commons.logging.LogFactory

import ml.dmlc.xgboost4j.java.XGBoostError
import ml.dmlc.xgboost4j.scala.{DMatrix, ObjectiveTrait}


/**
* loglikelihood loss obj function
*/
class CustomObj(val customParameter: Int = 0) extends ObjectiveTrait {

val logger = LogFactory.getLog(classOf[CustomObj])

/**
* user define objective function, return gradient and second order gradient
*
* @param predicts untransformed margin predicts
* @param dtrain training data
* @return List with two float array, correspond to first order grad and second order grad
*/
override def getGradient(predicts: Array[Array[Float]], dtrain: DMatrix)
: List[Array[Float]] = {
val nrow = predicts.length
val gradients = new ListBuffer[Array[Float]]
var labels: Array[Float] = null
try {
labels = dtrain.getLabel
} catch {
case e: XGBoostError =>
logger.error(e)
throw e
case e: Throwable => throw e
}
val grad = new Array[Float](nrow)
val hess = new Array[Float](nrow)
val transPredicts = transform(predicts)

for (i <- 0 until nrow) {
val predict = transPredicts(i)(0)
grad(i) = predict - labels(i)
hess(i) = predict * (1 - predict)
}
gradients += grad
gradients += hess
gradients.toList
}

/**
* simple sigmoid func
*
* @param input
* @return Note: this func is not concern about numerical stability, only used as example
*/
def sigmoid(input: Float): Float = {
(1 / (1 + Math.exp(-input))).toFloat
}

def transform(predicts: Array[Array[Float]]): Array[Array[Float]] = {
val nrow = predicts.length
val transPredicts = Array.fill[Float](nrow, 1)(0)
for (i <- 0 until nrow) {
transPredicts(i)(0) = sigmoid(predicts(i)(0))
}
transPredicts
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright (c) 2014 by Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ml.dmlc.xgboost4j.scala.spark

import org.apache.commons.logging.LogFactory

import ml.dmlc.xgboost4j.java.XGBoostError
import ml.dmlc.xgboost4j.scala.{DMatrix, EvalTrait}

class EvalError extends EvalTrait {

val logger = LogFactory.getLog(classOf[EvalError])

private[xgboost4j] var evalMetric: String = "custom_error"

/**
* get evaluate metric
*
* @return evalMetric
*/
override def getMetric: String = evalMetric

/**
* evaluate with predicts and data
*
* @param predicts predictions as array
* @param dmat data matrix to evaluate
* @return result of the metric
*/
override def eval(predicts: Array[Array[Float]], dmat: DMatrix): Float = {
var error: Float = 0f
var labels: Array[Float] = null
try {
labels = dmat.getLabel
} catch {
case ex: XGBoostError =>
logger.error(ex)
return -1f
}
require(predicts.length == labels.length, s"predicts length ${predicts.length} has to be" +
s" equal with label length ${labels.length}")
val nrow: Int = predicts.length
for (i <- 0 until nrow) {
if (labels(i) == 0.0 && predicts(i)(0) > 0) {
error += 1
} else if (labels(i) == 1.0 && predicts(i)(0) <= 0) {
error += 1
}
}
error / labels.length
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.spark.sql._
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite

import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}

trait PerTest extends BeforeAndAfterEach {
self: AnyFunSuite =>

Expand Down Expand Up @@ -69,6 +71,19 @@ trait PerTest extends BeforeAndAfterEach {
}
}

protected def buildDataFrame(
labeledPoints: Seq[XGBLabeledPoint],
numPartitions: Int = numWorkers): DataFrame = {
import Utils.XGBLabeledPointFeatures
val it = labeledPoints.iterator.zipWithIndex
.map { case (labeledPoint: XGBLabeledPoint, id: Int) =>
(id, labeledPoint.label, labeledPoint.features)
}

ss.createDataFrame(sc.parallelize(it.toList, numPartitions))
.toDF("id", "label", "features")
}

protected def compareTwoFiles(lhs: String, rhs: String): Boolean = {
withResource(new FileInputStream(lhs)) { lfis =>
withResource(new FileInputStream(rhs)) { rfis =>
Expand Down
Loading

0 comments on commit 693860a

Please sign in to comment.