From 9c50758ed4707cc02e094e0e1212c073cef26d59 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 20 Dec 2024 16:44:50 +0800 Subject: [PATCH] [jvm-packages] Supports external memory --- .../dmlc/xgboost4j/java/CudfColumnBatch.java | 12 +- .../dmlc/xgboost4j/java/QuantileDMatrix.java | 20 +- .../xgboost4j/scala/QuantileDMatrix.scala | 20 +- .../scala/spark/ExternalMemory.scala | 221 ++++++++++++ .../scala/spark/GpuXGBoostPlugin.scala | 46 +-- .../ml/dmlc/xgboost4j/java/BoosterTest.java | 4 +- .../ml/dmlc/xgboost4j/java/DMatrixTest.java | 8 +- .../scala/QuantileDMatrixSuite.scala | 79 ++++- .../scala/spark/ExternalMemorySuite.scala | 144 ++++++++ .../scala/spark/GpuXGBoostPluginSuite.scala | 325 +++++++++--------- .../ml/dmlc/xgboost4j/scala/spark/Utils.scala | 39 ++- .../scala/spark/XGBoostEstimator.scala | 4 +- .../scala/spark/params/XGBoostParams.scala | 18 +- .../xgboost4j/src/native/xgboost4j-gpu.cu | 138 +++++++- .../xgboost4j/src/native/xgboost4j.cpp | 20 ++ 15 files changed, 876 insertions(+), 222 deletions(-) create mode 100644 jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala create mode 100644 jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java index 2f1870c580be..c71295f86cc6 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/CudfColumnBatch.java @@ -1,5 +1,5 @@ /* - Copyright (c) 2021-2024 by Contributors + Copyright (c) 2021-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -86,6 +86,16 @@ private List initializeCudfColumns(Table table) { .collect(Collectors.toList()); } + // visible for testing + public Table getFeatureTable() { + return featureTable; + } + + // visible for testing + public Table getLabelTable() { + return labelTable; + } + public List getFeatures() { return features; } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java index ffb12a48927a..a5bf9c888d7f 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/java/ml/dmlc/xgboost4j/java/QuantileDMatrix.java @@ -1,5 +1,5 @@ /* - Copyright (c) 2021-2024 by Contributors + Copyright (c) 2021-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,14 +28,16 @@ public class QuantileDMatrix extends DMatrix { * @param missing the missing value * @param maxBin the max bin * @param nthread the parallelism + * @param useExternalMemory whether to use external memory or not * @throws XGBoostError */ public QuantileDMatrix( Iterator iter, float missing, int maxBin, - int nthread) throws XGBoostError { - this(iter, null, missing, maxBin, nthread); + int nthread, + boolean useExternalMemory) throws XGBoostError { + this(iter, null, missing, maxBin, nthread, useExternalMemory); } /** @@ -50,6 +52,7 @@ public QuantileDMatrix( * @param missing the missing value * @param maxBin the max bin * @param nthread the parallelism + * @param useExternalMemory whether to use external memory or not * @throws XGBoostError */ public QuantileDMatrix( @@ -57,10 +60,11 @@ public QuantileDMatrix( QuantileDMatrix refDMatrix, float missing, int maxBin, - int nthread) throws XGBoostError { + int nthread, + boolean useExternalMemory) throws XGBoostError { super(0); long[] out = new long[1]; - String conf = getConfig(missing, maxBin, nthread); + String conf = getConfig(missing, maxBin, nthread, useExternalMemory); long[] ref = null; if (refDMatrix != null) { ref = new long[1]; @@ -111,9 +115,9 @@ public void setGroup(int[] group) throws XGBoostError { throw new XGBoostError("QuantileDMatrix does not support setGroup."); } - private String getConfig(float missing, int maxBin, int nthread) { - return String.format("{\"missing\":%f,\"max_bin\":%d,\"nthread\":%d}", - missing, maxBin, nthread); + private String getConfig(float missing, int maxBin, int nthread, boolean useExternalMemory) { + return String.format("{\"missing\":%f,\"max_bin\":%d,\"nthread\":%d," + + "\"use_ext_mem\":%b}", missing, maxBin, nthread, useExternalMemory); } } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala index a9fac0245abf..c6981c1a4eac 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrix.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2021-2024 by Contributors + Copyright (c) 2021-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -30,31 +30,39 @@ class QuantileDMatrix private[scala]( * @param missing the missing value * @param maxBin the max bin * @param nthread the parallelism + * @param useExternalMemory whether to use external memory or not * @throws XGBoostError */ - def this(iter: Iterator[ColumnBatch], missing: Float, maxBin: Int, nthread: Int) { - this(new JQuantileDMatrix(iter.asJava, missing, maxBin, nthread)) + def this(iter: Iterator[ColumnBatch], + missing: Float, + maxBin: Int, + nthread: Int, + useExternalMemory: Boolean) { + this(new JQuantileDMatrix(iter.asJava, missing, maxBin, nthread, useExternalMemory)) } /** * Create QuantileDMatrix from iterator based on the array interface * * @param iter the XGBoost ColumnBatch batch to provide the corresponding array interface - * @param refDMatrix The reference QuantileDMatrix that provides quantile information, needed + * @param ref The reference QuantileDMatrix that provides quantile information, needed * when creating validation/test dataset with QuantileDMatrix. Supplying the * training DMatrix as a reference means that the same quantisation applied * to the training data is applied to the validation/test data * @param missing the missing value * @param maxBin the max bin * @param nthread the parallelism + * @param useExternalMemory whether to use external memory or not * @throws XGBoostError */ def this(iter: Iterator[ColumnBatch], ref: QuantileDMatrix, missing: Float, maxBin: Int, - nthread: Int) { - this(new JQuantileDMatrix(iter.asJava, ref.jDMatrix, missing, maxBin, nthread)) + nthread: Int, + useExternalMemory: Boolean) { + this(new JQuantileDMatrix(iter.asJava, ref.jDMatrix, missing, maxBin, nthread, + useExternalMemory)) } /** diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala new file mode 100644 index 000000000000..873a8a63944b --- /dev/null +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemory.scala @@ -0,0 +1,221 @@ +/* + Copyright (c) 2025 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j.scala.spark + +import java.io.File +import java.nio.file.{Files, Paths} + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf._ + +import ml.dmlc.xgboost4j.java.{ColumnBatch, CudfColumnBatch} +import ml.dmlc.xgboost4j.scala.spark.Utils.withResource + +private[spark] trait ExternalMemory[T] extends Iterator[Table] with AutoCloseable { + + protected val buffers = ArrayBuffer.empty[T] + private lazy val buffersIterator = buffers.toIterator + + /** + * Convert the table to T which will be cached + * + * @param table to be converted + * @return the content + */ + def convertTable(table: Table): T + + /** + * Load the content to the Table + * + * @param content to be loaded + * @return Table + */ + def loadTable(content: T): Table + + // Cache the table + def cacheTable(table: Table): Unit = { + val content = convertTable(table) + buffers.append(content) + } + + override def hasNext: Boolean = buffersIterator.hasNext + + override def next(): Table = loadTable(buffersIterator.next()) + + override def close(): Unit = {} +} + +// The data will be cached into disk. +private[spark] class DiskExternalMemoryIterator(val path: String) extends ExternalMemory[String] { + + private lazy val root = { + val tmp = path + "/xgboost" + createDirectory(tmp) + tmp + } + + private var counter = 0 + + private def createDirectory(dirPath: String): Unit = { + val path = Paths.get(dirPath) + if (!Files.exists(path)) { + Files.createDirectories(path) + } else { + } + } + + /** + * Convert the table to file path which will be cached + * + * @param table to be converted + * @return the content + */ + override def convertTable(table: Table): String = { + val names = (1 to table.getNumberOfColumns).map(_.toString) + val options = ArrowIPCWriterOptions.builder().withColumnNames(names: _*).build() + val path = root + "/table_" + counter + "_" + System.nanoTime(); + counter += 1 + withResource(Table.writeArrowIPCChunked(options, new File(path))) { writer => + writer.write(table) + } + path + } + + private def closeOnExcept[T <: AutoCloseable, V](r: ArrayBuffer[T]) + (block: ArrayBuffer[T] => V): V = { + try { + block(r) + } catch { + case t: Throwable => + r.foreach(_.close()) + throw t + } + } + + /** + * Load the path from disk to the Table + * + * @param name to be loaded + * @return Table + */ + override def loadTable(name: String): Table = { + val file = new File(name) + if (!file.exists()) { + throw new RuntimeException(s"The cached file ${name} not exist" ) + } + try { + withResource(Table.readArrowIPCChunked(file)) { reader => + val tables = ArrayBuffer.empty[Table] + closeOnExcept(tables) { tables => + var table = Option(reader.getNextIfAvailable()) + while (table.isDefined) { + tables.append(table.get) + table = Option(reader.getNextIfAvailable()) + } + } + if (tables.size > 1) { + closeOnExcept(tables) { tables => + Table.concatenate(tables.toArray: _*) + } + } else { + tables(0) + } + } + } catch { + case e: Throwable => + close() + throw e + } finally { + if (file.exists()) { + file.delete() + } + } + } + + override def close(): Unit = { + buffers.foreach { path => + val file = new File(path) + if (file.exists()) { + file.delete() + } + } + buffers.clear() + } +} + +private[spark] object ExternalMemory { + def apply(path: Option[String] = None): ExternalMemory[_] = { + path.map(new DiskExternalMemoryIterator(_)) + .getOrElse(throw new RuntimeException("No disk path provided")) + } +} + +/** + * ExternalMemoryIterator supports iterating the data twice if the `swap` is called. + * + * The first round iteration gets the input batch that will be + * 1. cached in the external memory + * 2. fed in QuantilDmatrix + * The second round iteration returns the cached batch got from external memory. + * + * @param input the spark input iterator + * @param indices column index + */ +private[scala] class ExternalMemoryIterator(val input: Iterator[Table], + val indices: ColumnIndices, + val path: Option[String] = None) + extends Iterator[ColumnBatch] { + + private var iter = input + + // Flag to indicate the input has been consumed. + private var inputIsConsumed = false + // Flag to indicate the input.next has been called which is valid + private var inputNextIsCalled = false + + // visible for testing + private[spark] val externalMemory = ExternalMemory(path) + + override def hasNext: Boolean = { + val value = iter.hasNext + if (!value && inputIsConsumed && inputNextIsCalled) { + externalMemory.close() + } + if (!inputIsConsumed && !value && inputNextIsCalled) { + inputIsConsumed = true + iter = externalMemory + } + value + } + + override def next(): ColumnBatch = { + inputNextIsCalled = true + withResource(new GpuColumnBatch(iter.next())) { batch => + if (iter == input) { + externalMemory.cacheTable(batch.table) + } + new CudfColumnBatch( + batch.select(indices.featureIds.get), + batch.select(indices.labelId), + batch.select(indices.weightId.getOrElse(-1)), + batch.select(indices.marginId.getOrElse(-1)), + batch.select(indices.groupId.getOrElse(-1))); + } + } + +} diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala index 6ab9f679d706..800fc8901e11 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPlugin.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2024 by Contributors + Copyright (c) 2024-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -127,22 +127,32 @@ class GpuXGBoostPlugin extends XGBoostPlugin { val nthread = estimator.getNthread val missing = estimator.getMissing + val useExtMem = estimator.getUseExternalMemory + val extMemPath = if (useExtMem) { + Some(dataset.sparkSession.conf.get("spark.local.dir", "/tmp")) + } else None + /** build QuantileDMatrix on the executor side */ - def buildQuantileDMatrix(iter: Iterator[Table], + def buildQuantileDMatrix(input: Iterator[Table], ref: Option[QuantileDMatrix] = None): QuantileDMatrix = { - val colBatchIter = iter.map { table => - withResource(new GpuColumnBatch(table)) { batch => - new CudfColumnBatch( - batch.select(indices.featureIds.get), - batch.select(indices.labelId), - batch.select(indices.weightId.getOrElse(-1)), - batch.select(indices.marginId.getOrElse(-1)), - batch.select(indices.groupId.getOrElse(-1))); - } + val (iterator, useExtMem) = extMemPath match { + case Some(_) => + (new ExternalMemoryIterator(input, indices, extMemPath), true) + case None => + (input.map { table => + withResource(new GpuColumnBatch(table)) { batch => + new CudfColumnBatch( + batch.select(indices.featureIds.get), + batch.select(indices.labelId), + batch.select(indices.weightId.getOrElse(-1)), + batch.select(indices.marginId.getOrElse(-1)), + batch.select(indices.groupId.getOrElse(-1))); + } + }, false) } - ref.map(r => new QuantileDMatrix(colBatchIter, r, missing, maxBin, nthread)).getOrElse( - new QuantileDMatrix(colBatchIter, missing, maxBin, nthread) - ) + + ref.map(r => new QuantileDMatrix(iterator, r, missing, maxBin, nthread, useExtMem)) + .getOrElse(new QuantileDMatrix(iterator, missing, maxBin, nthread, useExtMem)) } estimator.getEvalDataset().map { evalDs => @@ -295,7 +305,7 @@ class GpuXGBoostPlugin extends XGBoostPlugin { } } -private class GpuColumnBatch(table: Table) extends AutoCloseable { +private[scala] class GpuColumnBatch(val table: Table) extends AutoCloseable { def select(index: Int): Table = { select(Seq(index)) @@ -308,9 +318,5 @@ private class GpuColumnBatch(table: Table) extends AutoCloseable { new Table(indices.map(table.getColumn): _*) } - override def close(): Unit = { - if (Option(table).isDefined) { - table.close() - } - } + override def close(): Unit = Option(table).foreach(_.close()) } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/BoosterTest.java b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/BoosterTest.java index 7f64f3bfdf10..edfff7172422 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/BoosterTest.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/BoosterTest.java @@ -1,5 +1,5 @@ /* - Copyright (c) 2021-2024 by Contributors + Copyright (c) 2021-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -98,7 +98,7 @@ public void testBooster() throws XGBoostError { List tables = new LinkedList<>(); tables.add(batch); - DMatrix incrementalDMatrix = new QuantileDMatrix(tables.iterator(), Float.NaN, maxBin, 1); + DMatrix incrementalDMatrix = new QuantileDMatrix(tables.iterator(), Float.NaN, maxBin, 1, false); //set watchList HashMap watches1 = new HashMap<>(); watches1.put("train", incrementalDMatrix); diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java index 5a4c67bcca38..3937d3470dd2 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/java/ml/dmlc/xgboost4j/java/DMatrixTest.java @@ -1,5 +1,5 @@ /* - Copyright (c) 2021-2024 by Contributors + Copyright (c) 2021-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -120,7 +120,7 @@ public void testCreateFromColumnDataIterator() throws XGBoostError { tables.add(new CudfColumnBatch(X_0, y_0, w_0, m_0, q_0)); tables.add(new CudfColumnBatch(X_1, y_1, w_1, m_1, q_1)); - QuantileDMatrix dmat = new QuantileDMatrix(tables.iterator(), 0.0f, 256, 1); + QuantileDMatrix dmat = new QuantileDMatrix(tables.iterator(), 0.0f, 256, 1, false); float[] anchorLabel = convertFloatTofloat(label1, label2); float[] anchorWeight = convertFloatTofloat(weight1, weight2); float[] anchorBaseMargin = convertFloatTofloat(baseMargin1, baseMargin2); @@ -166,11 +166,11 @@ public void testGetQuantileCut() throws XGBoostError { ) { List tables = new LinkedList<>(); tables.add(new CudfColumnBatch(X_0, y_0, null, null, null)); - QuantileDMatrix train = new QuantileDMatrix(tables.iterator(), 0.0f, 256, 1); + QuantileDMatrix train = new QuantileDMatrix(tables.iterator(), 0.0f, 256, 1, false); tables.clear(); tables.add(new CudfColumnBatch(X_1, y_1, null, null, null)); - QuantileDMatrix eval = new QuantileDMatrix(tables.iterator(), train, 0.0f, 256, 1); + QuantileDMatrix eval = new QuantileDMatrix(tables.iterator(), train, 0.0f, 256, 1, false); DMatrix.QuantileCut trainCut = train.getQuantileCut(); DMatrix.QuantileCut evalCut = eval.getQuantileCut(); diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrixSuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrixSuite.scala index ceebcfd41f7a..821e13e4474a 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrixSuite.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/QuantileDMatrixSuite.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2021-2024 by Contributors + Copyright (c) 2021-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -16,18 +16,22 @@ package ml.dmlc.xgboost4j.scala +import java.io.File + import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.Table import org.scalatest.funsuite.AnyFunSuite -import ml.dmlc.xgboost4j.java.CudfColumnBatch +import ml.dmlc.xgboost4j.java.{ColumnBatch, CudfColumnBatch} +import ml.dmlc.xgboost4j.scala.rapids.spark.TmpFolderSuite +import ml.dmlc.xgboost4j.scala.spark.{ColumnIndices, ExternalMemoryIterator, GpuColumnBatch} import ml.dmlc.xgboost4j.scala.spark.Utils.withResource -class QuantileDMatrixSuite extends AnyFunSuite { - - test("QuantileDMatrix test") { +class QuantileDMatrixSuite extends AnyFunSuite with TmpFolderSuite { + private def runTest(buildIterator: (Iterator[Table], ColumnIndices) => + (Iterator[ColumnBatch], Boolean)) = { val label1 = Array[java.lang.Float](25f, 21f, 22f, 20f, 24f) val weight1 = Array[java.lang.Float](1.3f, 2.31f, 0.32f, 3.3f, 1.34f) val baseMargin1 = Array[java.lang.Float](1.2f, 0.2f, 1.3f, 2.4f, 3.5f) @@ -56,14 +60,39 @@ class QuantileDMatrixSuite extends AnyFunSuite { withResource(new Table.TestBuilder().column(weight2: _*).build) { w_1 => withResource(new Table.TestBuilder().column(baseMargin2: _*).build) { m_1 => withResource(new Table.TestBuilder().column(group2: _*).build) { q_2 => - val batches = new ArrayBuffer[CudfColumnBatch]() - batches += new CudfColumnBatch(X_0, y_0, w_0, m_0, q_0) - batches += new CudfColumnBatch(X_1, y_1, w_1, m_1, q_2) - val dmatrix = new QuantileDMatrix(batches.toIterator, 0.0f, 8, 1) - assert(dmatrix.getLabel.sameElements(label1 ++ label2)) - assert(dmatrix.getWeight.sameElements(weight1 ++ weight2)) - assert(dmatrix.getBaseMargin.sameElements(baseMargin1 ++ baseMargin2)) - assert(dmatrix.getGroup().sameElements(expectedGroup)) + val tables = new ArrayBuffer[Table]() + tables += new Table(X_0.getColumn(0), X_0.getColumn(1), y_0.getColumn(0), + w_0.getColumn(0), m_0.getColumn(0)) + tables += new Table(X_1.getColumn(0), X_1.getColumn(1), y_1.getColumn(0), + w_1.getColumn(0), m_1.getColumn(0)) + + val indices = ColumnIndices( + labelId = 2, + featureId = None, + featureIds = Option(Seq(0, 1)), + weightId = Option(3), + marginId = Option(4), + groupId = Option(5) + ) + val (iter, useExtMem) = buildIterator(tables.toIterator, indices) + val dmatrix = new QuantileDMatrix(iter, 0.0f, 8, 1, useExtMem) + + def check(dm: QuantileDMatrix) = { + assert(dm.getLabel.sameElements(label1 ++ label2)) + assert(dm.getWeight.sameElements(weight1 ++ weight2)) + assert(dm.getBaseMargin.sameElements(baseMargin1 ++ baseMargin2)) + } + check(dmatrix) + + if (!useExtMem) { + val batches = new ArrayBuffer[CudfColumnBatch]() + batches += new CudfColumnBatch(X_0, y_0, w_0, m_0, q_0) + batches += new CudfColumnBatch(X_1, y_1, w_1, m_1, q_2) + val dmatrix1 = new QuantileDMatrix(batches.toIterator, 0.0f, 8, 1, + useExtMem) + check(dmatrix1) + assert(dmatrix1.getGroup().sameElements(expectedGroup)) + } } } } @@ -75,4 +104,28 @@ class QuantileDMatrixSuite extends AnyFunSuite { } } } + + test("QuantileDMatrix test on host") { + val buildIter = (input: Iterator[Table], indices: ColumnIndices) => { + (input.map { table => + withResource(new GpuColumnBatch(table)) { batch => + new CudfColumnBatch( + batch.select(indices.featureIds.get), + batch.select(indices.labelId), + batch.select(indices.weightId.getOrElse(-1)), + batch.select(indices.marginId.getOrElse(-1)), + batch.select(indices.groupId.getOrElse(-1))); + } + }, false) + } + runTest(buildIter) + } + + test("QuantileDMatrix test on disk") { + val buildIter = (input: Iterator[Table], indices: ColumnIndices) => + (new ExternalMemoryIterator(input, indices, + Option(new File(tempDir.toFile, "xgboost").getPath)), true) + runTest(buildIter) + } + } diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala new file mode 100644 index 000000000000..fb0812e553b8 --- /dev/null +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/ExternalMemorySuite.scala @@ -0,0 +1,144 @@ +/* + Copyright (c) 2021-2025 by Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package ml.dmlc.xgboost4j.scala.spark + +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.{ArrowIPCWriterOptions, HostBufferConsumer, HostBufferProvider, HostMemoryBuffer, Table, TableDebug} + +import ml.dmlc.xgboost4j.java.CudfColumnBatch +import ml.dmlc.xgboost4j.scala.rapids.spark.GpuTestSuite +import ml.dmlc.xgboost4j.scala.spark.Utils.withResource + +class ExternalMemorySuite extends GpuTestSuite { + + private def assertColumnBatchEqual(lhs: Array[CudfColumnBatch], + rhs: Array[CudfColumnBatch]): Unit = { + def assertTwoTable(lhsTable: Table, rhsTable: Table): Unit = { + assert(lhsTable.getNumberOfColumns === rhsTable.getNumberOfColumns) + for (i <- 0 until lhsTable.getNumberOfColumns) { + val lColumn = lhsTable.getColumn(i) + val rColumn = rhsTable.getColumn(i) + + val lHost = lColumn.copyToHost() + val rHost = rColumn.copyToHost() + + assert(lHost.getRowCount === rHost.getRowCount) + for (j <- 0 until lHost.getRowCount.toInt) { + assert(lHost.getFloat(j) === rHost.getFloat(j)) + } + } + } + + assert(lhs.length === rhs.length) + for ((l, r) <- lhs.zip(rhs)) { + assertTwoTable(l.getFeatureTable, r.getFeatureTable) + assertTwoTable(l.getLabelTable, r.getLabelTable) + } + } + + test("HostExternalMemoryxx") { + withResource(new Table.TestBuilder() + .column(1.0f, 2.0f, 3.0f.asInstanceOf[java.lang.Float]) + .column(4.0f, 5.0f, 6.0f.asInstanceOf[java.lang.Float]) + .build) { table => + + val buffers = ArrayBuffer.empty[HostMemoryBuffer] + class MyHostBufferConsumer extends HostBufferConsumer { + + override def handleBuffer(hostMemoryBuffer: HostMemoryBuffer, l: Long): Unit = { + buffers.append(hostMemoryBuffer) + println(s"------------ $l") + } + } + + val options = ArrowIPCWriterOptions.builder().withColumnNames(Seq("a", "b"): _*).build() + val writer = Table.writeArrowIPCChunked(options, new MyHostBufferConsumer()) + writer.write(table) + writer.close() + + class MyHostBufferProvider extends HostBufferProvider { + var offset: Long = 0L + + override def readInto(hostMemoryBuffer: HostMemoryBuffer, l: Long): Long = { + println("readInto " + l) + val amountLeft = buffers(0).getLength - offset + val amountToCopy = Math.max(0, Math.min(l, amountLeft)) + if (amountToCopy > 0) { + hostMemoryBuffer.copyFromHostBuffer(0, buffers(0), offset, amountToCopy) + offset += amountToCopy + } + amountToCopy + } + } + val reader = Table.readArrowIPCChunked(new MyHostBufferProvider()) + val t = reader.getNextIfAvailable + TableDebug.get().debug("t", t) + reader.close() + val x = 10 + + } + } + + def runExternalMemoryTest(buildExternalMemory: (Iterator[Table], ColumnIndices) => + ExternalMemoryIterator): Unit = { + + withResource(new Table.TestBuilder() + .column(1.0f, 2.0f, 3.0f.asInstanceOf[java.lang.Float]) + .column(4.0f, 5.0f, 6.0f.asInstanceOf[java.lang.Float]) + .column(7.0f, 8.0f, 9.0f.asInstanceOf[java.lang.Float]) + .build) { table1 => + + withResource(new Table.TestBuilder() + .column(11.0f, 12.0f, 13.0f.asInstanceOf[java.lang.Float]) + .column(14.0f, 15.0f, 16.0f.asInstanceOf[java.lang.Float]) + .column(17.0f, 18.0f, 19.0f.asInstanceOf[java.lang.Float]) + .build) { table2 => + + val tables = Seq(table1, table2) + + val indices = ColumnIndices(labelId = 0, featureIds = Some(Seq(1, 2)), featureId = None, + weightId = None, marginId = None, groupId = None) + val extMemIter = buildExternalMemory(tables.toIterator, indices) + val expectTables = ArrayBuffer.empty[CudfColumnBatch] + while (extMemIter.hasNext) { + val table = extMemIter.next().asInstanceOf[CudfColumnBatch] + expectTables.append(table) + } + // The hasNext has swap the iterator internally, so we can still get the + // value for the next round of iteration + + val targetTables = ArrayBuffer.empty[CudfColumnBatch] + while (extMemIter.hasNext) { + val table = extMemIter.next().asInstanceOf[CudfColumnBatch] + targetTables.append(table) + } + + assertColumnBatchEqual(expectTables.toArray, targetTables.toArray) + } + } + } + + test("DiskExternalMemory") { + val buildIterator = (input: Iterator[Table], indices: ColumnIndices) => { + val iter = new ExternalMemoryIterator(input, indices, Some("/tmp/")) + assert(iter.externalMemory.isInstanceOf[DiskExternalMemoryIterator]) + iter + } + runExternalMemoryTest(buildIterator) + } +} diff --git a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala index a5ff2ba0f589..6d468523d065 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/test/scala/ml/dmlc/xgboost4j/scala/spark/GpuXGBoostPluginSuite.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2024 by Contributors + Copyright (c) 2024-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -358,25 +358,101 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { } } - Seq("binary:logistic", "multi:softprob").foreach { case objective => - test(s"$objective: XGBoost-Spark should match xgboost4j") { - withGpuSparkSession() { spark => - import spark.implicits._ + Seq(false, true).foreach { useExtMem => + Seq("binary:logistic", "multi:softprob").foreach { objective => + test(s"$objective: XGBoost-Spark should match xgboost4j with useExtMem=$useExtMem") { + withGpuSparkSession() { spark => + import spark.implicits._ - val numRound = 100 - var xgboostParams: Map[String, Any] = Map( - "objective" -> objective, - "device" -> "cuda" - ) + val numRound = 100 + var xgboostParams: Map[String, Any] = Map( + "objective" -> objective, + "device" -> "cuda" + ) + + val (trainPath, testPath) = if (objective == "binary:logistic") { + (writeFile(Classification.train.toDF("label", "weight", "c1", "c2", "c3")), + writeFile(Classification.test.toDF("label", "weight", "c1", "c2", "c3"))) + } else { + xgboostParams = xgboostParams ++ Map("num_class" -> 6) + (writeFile(MultiClassification.train.toDF("label", "weight", "c1", "c2", "c3")), + writeFile(MultiClassification.test.toDF("label", "weight", "c1", "c2", "c3"))) + } + + val df = spark.read.parquet(trainPath) + val testdf = spark.read.parquet(testPath) + + val features = Array("c1", "c2", "c3") + val featuresIndices = features.map(df.schema.fieldIndex) + val label = "label" + + val classifier = new XGBoostClassifier(xgboostParams) + .setFeaturesCol(features) + .setLabelCol(label) + .setNumRound(numRound) + .setLeafPredictionCol("leaf") + .setContribPredictionCol("contrib") + .setDevice("cuda") + .setUseExternalMemory(useExtMem) + + val xgb4jModel = withResource(new GpuColumnBatch( + Table.readParquet(new File(trainPath)))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), + batch.select(df.schema.fieldIndex(label)), null, null, null + ) + val qdm = new QuantileDMatrix(Seq(cb).iterator, classifier.getMissing, + classifier.getMaxBins, classifier.getNthread, false) + ScalaXGBoost.train(qdm, xgboostParams, numRound) + } + + val (xgb4jLeaf, xgb4jContrib, xgb4jProb, xgb4jRaw) = withResource(new GpuColumnBatch( + Table.readParquet(new File(testPath)))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null + ) + val qdm = new DMatrix(cb, classifier.getMissing, classifier.getNthread) + (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), + xgb4jModel.predict(qdm), xgb4jModel.predict(qdm, outPutMargin = true)) + } + + val rows = classifier.fit(df).transform(testdf).collect() + + // Check Leaf + val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) + checkEqual(xgb4jLeaf, xgbSparkLeaf) + + // Check contrib + val xgbSparkContrib = rows.map(row => + row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) + checkEqual(xgb4jContrib, xgbSparkContrib) + + // Check probability + var xgbSparkProb = rows.map(row => + row.getAs[DenseVector]("probability").toArray.map(_.toFloat)) + if (objective == "binary:logistic") { + xgbSparkProb = xgbSparkProb.map(v => Array(v(1))) + } + checkEqual(xgb4jProb, xgbSparkProb) + + // Check raw + var xgbSparkRaw = rows.map(row => + row.getAs[DenseVector]("rawPrediction").toArray.map(_.toFloat)) + if (objective == "binary:logistic") { + xgbSparkRaw = xgbSparkRaw.map(v => Array(v(1))) + } + checkEqual(xgb4jRaw, xgbSparkRaw) - val (trainPath, testPath) = if (objective == "binary:logistic") { - (writeFile(Classification.train.toDF("label", "weight", "c1", "c2", "c3")), - writeFile(Classification.test.toDF("label", "weight", "c1", "c2", "c3"))) - } else { - xgboostParams = xgboostParams ++ Map("num_class" -> 6) - (writeFile(MultiClassification.train.toDF("label", "weight", "c1", "c2", "c3")), - writeFile(MultiClassification.test.toDF("label", "weight", "c1", "c2", "c3"))) } + } + } + } + + Seq(false, true).foreach { useExtMem => + test(s"Regression: XGBoost-Spark should match xgboost4j with useExtMem=$useExtMem") { + withGpuSparkSession() { spark => + import spark.implicits._ + + val trainPath = writeFile(Regression.train.toDF("label", "weight", "c1", "c2", "c3")) + val testPath = writeFile(Regression.test.toDF("label", "weight", "c1", "c2", "c3")) val df = spark.read.parquet(trainPath) val testdf = spark.read.parquet(testPath) @@ -385,34 +461,40 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { val featuresIndices = features.map(df.schema.fieldIndex) val label = "label" - val classifier = new XGBoostClassifier(xgboostParams) + val numRound = 100 + val xgboostParams: Map[String, Any] = Map( + "device" -> "cuda" + ) + + val regressor = new XGBoostRegressor(xgboostParams) .setFeaturesCol(features) .setLabelCol(label) .setNumRound(numRound) .setLeafPredictionCol("leaf") .setContribPredictionCol("contrib") .setDevice("cuda") + .setUseExternalMemory(useExtMem) val xgb4jModel = withResource(new GpuColumnBatch( Table.readParquet(new File(trainPath)))) { batch => val cb = new CudfColumnBatch(batch.select(featuresIndices), batch.select(df.schema.fieldIndex(label)), null, null, null ) - val qdm = new QuantileDMatrix(Seq(cb).iterator, classifier.getMissing, - classifier.getMaxBins, classifier.getNthread) + val qdm = new QuantileDMatrix(Seq(cb).iterator, regressor.getMissing, + regressor.getMaxBins, regressor.getNthread, false) ScalaXGBoost.train(qdm, xgboostParams, numRound) } - val (xgb4jLeaf, xgb4jContrib, xgb4jProb, xgb4jRaw) = withResource(new GpuColumnBatch( + val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( Table.readParquet(new File(testPath)))) { batch => val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null ) - val qdm = new DMatrix(cb, classifier.getMissing, classifier.getNthread) + val qdm = new DMatrix(cb, regressor.getMissing, regressor.getNthread) (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), - xgb4jModel.predict(qdm), xgb4jModel.predict(qdm, outPutMargin = true)) + xgb4jModel.predict(qdm)) } - val rows = classifier.fit(df).transform(testdf).collect() + val rows = regressor.fit(df).transform(testdf).collect() // Check Leaf val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) @@ -423,87 +505,11 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) checkEqual(xgb4jContrib, xgbSparkContrib) - // Check probability - var xgbSparkProb = rows.map(row => - row.getAs[DenseVector]("probability").toArray.map(_.toFloat)) - if (objective == "binary:logistic") { - xgbSparkProb = xgbSparkProb.map(v => Array(v(1))) - } - checkEqual(xgb4jProb, xgbSparkProb) - - // Check raw - var xgbSparkRaw = rows.map(row => - row.getAs[DenseVector]("rawPrediction").toArray.map(_.toFloat)) - if (objective == "binary:logistic") { - xgbSparkRaw = xgbSparkRaw.map(v => Array(v(1))) - } - checkEqual(xgb4jRaw, xgbSparkRaw) - - } - } - } - - test(s"Regression: XGBoost-Spark should match xgboost4j") { - withGpuSparkSession() { spark => - import spark.implicits._ - - val trainPath = writeFile(Regression.train.toDF("label", "weight", "c1", "c2", "c3")) - val testPath = writeFile(Regression.test.toDF("label", "weight", "c1", "c2", "c3")) - - val df = spark.read.parquet(trainPath) - val testdf = spark.read.parquet(testPath) - - val features = Array("c1", "c2", "c3") - val featuresIndices = features.map(df.schema.fieldIndex) - val label = "label" - - val numRound = 100 - val xgboostParams: Map[String, Any] = Map( - "device" -> "cuda" - ) - - val regressor = new XGBoostRegressor(xgboostParams) - .setFeaturesCol(features) - .setLabelCol(label) - .setNumRound(numRound) - .setLeafPredictionCol("leaf") - .setContribPredictionCol("contrib") - .setDevice("cuda") - - val xgb4jModel = withResource(new GpuColumnBatch( - Table.readParquet(new File(trainPath)))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), - batch.select(df.schema.fieldIndex(label)), null, null, null - ) - val qdm = new QuantileDMatrix(Seq(cb).iterator, regressor.getMissing, - regressor.getMaxBins, regressor.getNthread) - ScalaXGBoost.train(qdm, xgboostParams, numRound) - } - - val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( - Table.readParquet(new File(testPath)))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null - ) - val qdm = new DMatrix(cb, regressor.getMissing, regressor.getNthread) - (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), - xgb4jModel.predict(qdm)) + // Check prediction + val xgbSparkPred = rows.map(row => + Array(row.getAs[Double]("prediction").toFloat)) + checkEqual(xgb4jPred, xgbSparkPred) } - - val rows = regressor.fit(df).transform(testdf).collect() - - // Check Leaf - val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) - checkEqual(xgb4jLeaf, xgbSparkLeaf) - - // Check contrib - val xgbSparkContrib = rows.map(row => - row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) - checkEqual(xgb4jContrib, xgbSparkContrib) - - // Check prediction - val xgbSparkPred = rows.map(row => - Array(row.getAs[Double]("prediction").toFloat)) - checkEqual(xgb4jPred, xgbSparkPred) } } @@ -591,71 +597,74 @@ class GpuXGBoostPluginSuite extends GpuTestSuite { } } - test("Ranker: XGBoost-Spark should match xgboost4j") { - withGpuSparkSession() { spark => - import spark.implicits._ + Seq(false, true).foreach { useExtMem => + test(s"Ranker: XGBoost-Spark should match xgboost4j with useExtMem=$useExtMem") { + withGpuSparkSession() { spark => + import spark.implicits._ - val trainPath = writeFile(Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3")) - val testPath = writeFile(Ranking.test.toDF("label", "weight", "group", "c1", "c2", "c3")) + val trainPath = writeFile(Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3")) + val testPath = writeFile(Ranking.test.toDF("label", "weight", "group", "c1", "c2", "c3")) - val df = spark.read.parquet(trainPath) - val testdf = spark.read.parquet(testPath) + val df = spark.read.parquet(trainPath) + val testdf = spark.read.parquet(testPath) - val features = Array("c1", "c2", "c3") - val featuresIndices = features.map(df.schema.fieldIndex) - val label = "label" - val group = "group" + val features = Array("c1", "c2", "c3") + val featuresIndices = features.map(df.schema.fieldIndex) + val label = "label" + val group = "group" - val numRound = 100 - val xgboostParams: Map[String, Any] = Map( - "device" -> "cuda", - "objective" -> "rank:ndcg" - ) + val numRound = 100 + val xgboostParams: Map[String, Any] = Map( + "device" -> "cuda", + "objective" -> "rank:ndcg" + ) - val ranker = new XGBoostRanker(xgboostParams) - .setFeaturesCol(features) - .setLabelCol(label) - .setNumRound(numRound) - .setLeafPredictionCol("leaf") - .setContribPredictionCol("contrib") - .setGroupCol(group) - .setDevice("cuda") + val ranker = new XGBoostRanker(xgboostParams) + .setFeaturesCol(features) + .setLabelCol(label) + .setNumRound(numRound) + .setLeafPredictionCol("leaf") + .setContribPredictionCol("contrib") + .setGroupCol(group) + .setDevice("cuda") + .setUseExternalMemory(useExtMem) - val xgb4jModel = withResource(new GpuColumnBatch( - Table.readParquet(new File(trainPath) - ).orderBy(OrderByArg.asc(df.schema.fieldIndex(group))))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), - batch.select(df.schema.fieldIndex(label)), null, null, - batch.select(df.schema.fieldIndex(group))) - val qdm = new QuantileDMatrix(Seq(cb).iterator, ranker.getMissing, - ranker.getMaxBins, ranker.getNthread) - ScalaXGBoost.train(qdm, xgboostParams, numRound) - } + val xgb4jModel = withResource(new GpuColumnBatch( + Table.readParquet(new File(trainPath) + ).orderBy(OrderByArg.asc(df.schema.fieldIndex(group))))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), + batch.select(df.schema.fieldIndex(label)), null, null, + batch.select(df.schema.fieldIndex(group))) + val qdm = new QuantileDMatrix(Seq(cb).iterator, ranker.getMissing, + ranker.getMaxBins, ranker.getNthread, false) + ScalaXGBoost.train(qdm, xgboostParams, numRound) + } - val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( - Table.readParquet(new File(testPath)))) { batch => - val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null - ) - val qdm = new DMatrix(cb, ranker.getMissing, ranker.getNthread) - (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), - xgb4jModel.predict(qdm)) - } + val (xgb4jLeaf, xgb4jContrib, xgb4jPred) = withResource(new GpuColumnBatch( + Table.readParquet(new File(testPath)))) { batch => + val cb = new CudfColumnBatch(batch.select(featuresIndices), null, null, null, null + ) + val qdm = new DMatrix(cb, ranker.getMissing, ranker.getNthread) + (xgb4jModel.predictLeaf(qdm), xgb4jModel.predictContrib(qdm), + xgb4jModel.predict(qdm)) + } - val rows = ranker.fit(df).transform(testdf).collect() + val rows = ranker.fit(df).transform(testdf).collect() - // Check Leaf - val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) - checkEqual(xgb4jLeaf, xgbSparkLeaf) + // Check Leaf + val xgbSparkLeaf = rows.map(row => row.getAs[DenseVector]("leaf").toArray.map(_.toFloat)) + checkEqual(xgb4jLeaf, xgbSparkLeaf) - // Check contrib - val xgbSparkContrib = rows.map(row => - row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) - checkEqual(xgb4jContrib, xgbSparkContrib) + // Check contrib + val xgbSparkContrib = rows.map(row => + row.getAs[DenseVector]("contrib").toArray.map(_.toFloat)) + checkEqual(xgb4jContrib, xgbSparkContrib) - // Check prediction - val xgbSparkPred = rows.map(row => - Array(row.getAs[Double]("prediction").toFloat)) - checkEqual(xgb4jPred, xgbSparkPred) + // Check prediction + val xgbSparkPred = rows.map(row => + Array(row.getAs[Double]("prediction").toFloat)) + checkEqual(xgb4jPred, xgbSparkPred) + } } } diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala index cae44ab9aef1..40798c52df08 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/Utils.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2014-2024 by Contributors + Copyright (c) 2014-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -121,4 +121,41 @@ private[scala] object Utils { r.close() } } + + /** Executes the provided code block and then closes the sequence of resources */ + def withResource[T <: AutoCloseable, V](r: Seq[T])(block: Seq[T] => V): V = { + try { + block(r) + } finally { + r.safeClose() + } + } + + implicit class AutoCloseableSeq[A <: AutoCloseable](val in: collection.SeqLike[A, _]) { + /** + * safeClose: Is an implicit on a sequence of AutoCloseable classes that tries to close each + * element of the sequence, even if prior close calls fail. In case of failure in any of the + * close calls, an Exception is thrown containing the suppressed exceptions (getSuppressed), + * if any. + */ + def safeClose(error: Throwable = null): Unit = if (in != null) { + var closeException: Throwable = null + in.foreach { element => + if (element != null) { + try { + element.close() + } catch { + case e: Throwable if error != null => error.addSuppressed(e) + case e: Throwable if closeException == null => closeException = e + case e: Throwable => closeException.addSuppressed(e) + } + } + } + if (closeException != null) { + // an exception happened while we were trying to safely close + // resources, throw the exception to alert the caller + throw closeException + } + } + } } diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala index 0bfbf5ad2599..61e7cd8c6f15 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2024 by Contributors + Copyright (c) 2024-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ import ml.dmlc.xgboost4j.scala.spark.params._ /** * Hold the column index */ -private[spark] case class ColumnIndices( +private[scala] case class ColumnIndices( labelId: Int, featureId: Option[Int], // the feature type is VectorUDT or Array featureIds: Option[Seq[Int]], // the feature type is columnar diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala index d7a83eb1b76a..416ee3eec3a6 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala @@ -1,5 +1,5 @@ /* - Copyright (c) 2024 by Contributors + Copyright (c) 2024-2025 by Contributors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -179,14 +179,24 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe final def getFeatureTypes: Array[String] = $(featureTypes) + final val useExternalMemory = new BooleanParam(this, "useExternalMemory", "Whether to use " + + "the external memory or not when building QuantileDMatrix. Please note that " + + "useExternalMemory is useful only when `device` is set to `cuda` or `gpu`. When " + + "useExternalMemory is enabled, the directory specified by spark.local.dir if set will be " + + "used to cache the temporary files, if spark.local.dir is not set, the /tmp directory " + + "will be used.") + + final def getUseExternalMemory: Boolean = $(useExternalMemory) + setDefault(numRound -> 100, numWorkers -> 1, inferBatchSize -> (32 << 10), numEarlyStoppingRounds -> 0, forceRepartition -> false, missing -> Float.NaN, featuresCols -> Array.empty, customObj -> null, customEval -> null, - featureNames -> Array.empty, featureTypes -> Array.empty) + featureNames -> Array.empty, featureTypes -> Array.empty, useExternalMemory -> false) addNonXGBoostParam(numWorkers, numRound, numEarlyStoppingRounds, inferBatchSize, featuresCol, labelCol, baseMarginCol, weightCol, predictionCol, leafPredictionCol, contribPredictionCol, - forceRepartition, featuresCols, customEval, customObj, featureTypes, featureNames) + forceRepartition, featuresCols, customEval, customObj, featureTypes, featureNames, + useExternalMemory) final def getNumWorkers: Int = $(numWorkers) @@ -224,6 +234,8 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe def setFeatureTypes(value: Array[String]): T = set(featureTypes, value).asInstanceOf[T] + def setUseExternalMemory(value: Boolean): T = set(useExternalMemory, value).asInstanceOf[T] + protected[spark] def featureIsArrayType(schema: StructType): Boolean = schema(getFeaturesCol).dataType.isInstanceOf[ArrayType] diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu index a9798465686f..5bdbbd79ba61 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu +++ b/jvm-packages/xgboost4j/src/native/xgboost4j-gpu.cu @@ -6,6 +6,7 @@ #include "../../../../src/common/cuda_pinned_allocator.h" #include "../../../../src/common/device_vector.cuh" // for device_vector +#include "../../../../src/common/json_utils.h" #include "../../../../src/data/array_interface.h" #include "jvm_utils.h" @@ -391,6 +392,116 @@ class DataIteratorProxy { }; }; +// An iterator proxy for external memory. +class ExternalMemoryIteratorProxy { + jobject jiter_; + JNIEnv *jenv_; + DMatrixHandle proxy_; + int jni_status_; + jobject last_batch_ {nullptr}; + + public: + explicit ExternalMemoryIteratorProxy(jobject jiter): jiter_(jiter) { + XGProxyDMatrixCreate(&proxy_); + jni_status_ = + GlobalJvm()->GetEnv(reinterpret_cast(&jenv_), JNI_VERSION_1_6); + } + + ~ExternalMemoryIteratorProxy() { + XGDMatrixFree(proxy_); + } + + DMatrixHandle GetDMatrixHandle() const { return proxy_; } + + void CloseJvmBatch() { + if (last_batch_) { + jclass batch_class = CheckJvmCall(jenv_->GetObjectClass(last_batch_), jenv_); + jmethodID closeMethod = CheckJvmCall(jenv_->GetMethodID(batch_class, "close", "()V"), jenv_); + jenv_->CallVoidMethod(last_batch_, closeMethod); + last_batch_ = nullptr; + } + } + + void SetArrayInterface(std::string interface_str) { + auto json_interface = + Json::Load({interface_str.c_str(), interface_str.size()}); + CHECK(!IsA(json_interface)); + + std::string str; + Json features = json_interface["features"]; + Json::Dump(features, &str); + XGProxyDMatrixSetDataCudaColumnar(proxy_, str.c_str()); + + // set the meta info. + auto json_map = get(json_interface); + if (json_map.find("label") == json_map.cend()) { + LOG(FATAL) << "Must have a label field."; + } + Json label = json_interface["label"]; + CHECK(!IsA(label)); + Json::Dump(label, &str); + XGDMatrixSetInfoFromInterface(proxy_, "label", str.c_str()); + + if (json_map.find("weight") != json_map.cend()) { + Json weight = json_interface["weight"]; + CHECK(!IsA(weight)); + Json::Dump(weight, &str); + XGDMatrixSetInfoFromInterface(proxy_, "weight", str.c_str()); + } + + if (json_map.find("baseMargin") != json_map.cend()) { + Json basemargin = json_interface["baseMargin"]; + Json::Dump(basemargin, &str); + XGDMatrixSetInfoFromInterface(proxy_, "base_margin", str.c_str()); + } + + if (json_map.find("qid") != json_map.cend()) { + Json qid = json_interface["qid"]; + Json::Dump(qid, &str); + XGDMatrixSetInfoFromInterface(proxy_, "qid", str.c_str()); + } + } + + int Next() { + try { + this->CloseJvmBatch(); + jclass iterClass = jenv_->FindClass("java/util/Iterator"); + jmethodID has_next = CheckJvmCall(jenv_->GetMethodID(iterClass, "hasNext", "()Z"), jenv_); + jmethodID next = CheckJvmCall( + jenv_->GetMethodID(iterClass, "next", "()Ljava/lang/Object;"), jenv_); + + if (jenv_->CallBooleanMethod(jiter_, has_next)) { + // batch should be ColumnBatch from jvm + jobject batch = CheckJvmCall(jenv_->CallObjectMethod(jiter_, next), jenv_); + jclass batch_class = CheckJvmCall(jenv_->GetObjectClass(batch), jenv_); + jmethodID toJson = CheckJvmCall( + jenv_->GetMethodID(batch_class, "toJson", "()Ljava/lang/String;"), jenv_); + + auto jinterface = static_cast(jenv_->CallObjectMethod(batch, toJson)); + CheckJvmCall(jinterface, jenv_); + char const *c_interface_str = CheckJvmCall( + jenv_->GetStringUTFChars(jinterface, nullptr), jenv_); + this->SetArrayInterface(c_interface_str); + jenv_->ReleaseStringUTFChars(jinterface, c_interface_str); + last_batch_ = batch; + return 1; + } else { + return 0; + } + } catch (dmlc::Error const &e) { + if (jni_status_ == JNI_EDETACHED) { + GlobalJvm()->DetachCurrentThread(); + } + LOG(FATAL) << e.what(); + } + return 0; + } + + void Reset() { + this->CloseJvmBatch(); + } +}; + namespace { void Reset(DataIterHandle self) { static_cast(self)->Reset(); @@ -400,6 +511,14 @@ int Next(DataIterHandle self) { return static_cast(self)->Next(); } +void ExternalMemoryReset(DataIterHandle self) { + static_cast(self)->Reset(); +} + +int ExternalMemoryNext(DataIterHandle self) { + return static_cast(self)->Next(); +} + template using Deleter = std::function; } // anonymous namespace @@ -407,7 +526,6 @@ using Deleter = std::function; XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass, jobject jdata_iter, jlongArray jref, char const *config, jlongArray jout) { - xgboost::jni::DataIteratorProxy proxy(jdata_iter); DMatrixHandle result; DMatrixHandle ref{nullptr}; @@ -420,10 +538,22 @@ XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass, jobjec ref = reinterpret_cast(refptr.get()[0]); } - auto ret = XGQuantileDMatrixCreateFromCallback(&proxy, proxy.GetDMatrixHandle(), ref, Reset, Next, - config, &result); + auto jconfig = Json::Load(StringView{config}); + auto use_ext_mem = OptionalArg(jconfig, "use_ext_mem", false); + int ret = 0; + if (use_ext_mem) { + xgboost::jni::ExternalMemoryIteratorProxy proxy(jdata_iter); + ret = XGQuantileDMatrixCreateFromCallback(&proxy, proxy.GetDMatrixHandle(), ref, + ExternalMemoryReset, ExternalMemoryNext, + config, &result); + } else { + xgboost::jni::DataIteratorProxy proxy(jdata_iter); + ret = XGQuantileDMatrixCreateFromCallback(&proxy, proxy.GetDMatrixHandle(), ref, Reset, + Next, config, &result); + } setHandle(jenv, jout, result); return ret; } + } // namespace jni -} // namespace xgboost +} // namespace xgboost \ No newline at end of file diff --git a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp index 01706beb6b45..f851a6324fec 100644 --- a/jvm-packages/xgboost4j/src/native/xgboost4j.cpp +++ b/jvm-packages/xgboost4j/src/native/xgboost4j.cpp @@ -1308,6 +1308,9 @@ namespace xgboost::jni { XGB_DLL int XGQuantileDMatrixCreateFromCallbackImpl(JNIEnv *jenv, jclass jcls, jobject jdata_iter, jobject jref_iter, char const *config, jlongArray jout); +XGB_DLL int XGQuantileDMatrixCreateFromExternalMemoryCallbackImpl + (JNIEnv *jenv, jclass jcls, jobject jdata_iter, jobject jref_iter, char const *config, + jlongArray jout); } // namespace xgboost::jni /* @@ -1326,6 +1329,23 @@ JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGQuantileDMatrixC conf.get(), jout); } +/* + * Class: ml_dmlc_xgboost4j_java_XGBoostJNI + * Method: XGQuantileDMatrixCreateFromExternalMemoryCallback + * Signature: (Ljava/util/Iterator;[JLjava/lang/String;[J)I + */ +JNIEXPORT jint JNICALL Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGQuantileDMatrixCreateFromExternalMemoryCallback( + JNIEnv *jenv, jclass jcls, jobject jdata_iter, jlongArray jref, jstring jconf, + jlongArray jout) { + std::unique_ptr> conf{jenv->GetStringUTFChars(jconf, nullptr), + [&](char const *ptr) { + jenv->ReleaseStringUTFChars(jconf, ptr); + }}; + return xgboost::jni::XGQuantileDMatrixCreateFromExternalMemoryCallbackImpl(jenv, jcls, jdata_iter, + jref, conf.get(), jout); + +} + /* * Class: ml_dmlc_xgboost4j_java_XGBoostJNI * Method: XGDMatrixSetInfoFromInterface