diff --git a/jvm-packages/pom.xml b/jvm-packages/pom.xml index 9b6dddfc2773..85a2b3142b4e 100644 --- a/jvm-packages/pom.xml +++ b/jvm-packages/pom.xml @@ -335,25 +335,6 @@ - - org.jacoco - jacoco-maven-plugin - 0.7.9 - - - - prepare-agent - - - - report - test - - report - - - - diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala index b4b59950c89a..04fbfda975ff 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifier.scala @@ -285,12 +285,12 @@ class XGBoostClassificationModel private[ml]( val bBooster = dataset.sparkSession.sparkContext.broadcast(_booster) val appName = dataset.sparkSession.sparkContext.appName - val rdd = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator => + val inputRDD = dataset.asInstanceOf[Dataset[Row]].rdd + val predictionRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator => if (rowIterator.hasNext) { val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap Rabit.init(rabitEnv.asJava) - val (rowItr1, rowItr2) = rowIterator.duplicate - val featuresIterator = rowItr2.map(row => row.getAs[Vector]( + val featuresIterator = rowIterator.map(row => row.getAs[Vector]( $(featuresCol))).toList.iterator import DataUtils._ val cacheInfo = { @@ -307,19 +307,27 @@ class XGBoostClassificationModel private[ml]( val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) = producePredictionItrs(bBooster, dm) Rabit.shutdown() - produceResultIterator(rowItr1, rawPredictionItr, probabilityItr, predLeafItr, + Iterator(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) } finally { dm.delete() } } else { - Iterator[Row]() + Iterator() } } + val resultRDD = inputRDD.zipPartitions(predictionRDD, preservesPartitioning = true) { + case (inputIterator, predictionItr) => + if (inputIterator.hasNext) { + produceResultIterator(inputIterator, predictionItr.next(), predictionItr.next(), + predictionItr.next(), predictionItr.next()) + } else { + Iterator() + } + } bBooster.unpersist(blocking = false) - - dataset.sparkSession.createDataFrame(rdd, generateResultSchema(schema)) + dataset.sparkSession.createDataFrame(resultRDD, generateResultSchema(schema)) } private def produceResultIterator( diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala index 6d2197cdc143..20dd249964e8 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostRegressor.scala @@ -257,13 +257,12 @@ class XGBoostRegressionModel private[ml] ( val bBooster = dataset.sparkSession.sparkContext.broadcast(_booster) val appName = dataset.sparkSession.sparkContext.appName - - val rdd = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator => + val inputRDD = dataset.asInstanceOf[Dataset[Row]].rdd + val predictionRDD = dataset.asInstanceOf[Dataset[Row]].rdd.mapPartitions { rowIterator => if (rowIterator.hasNext) { val rabitEnv = Array("DMLC_TASK_ID" -> TaskContext.getPartitionId().toString).toMap Rabit.init(rabitEnv.asJava) - val (rowItr1, rowItr2) = rowIterator.duplicate - val featuresIterator = rowItr2.map(row => row.getAs[Vector]( + val featuresIterator = rowIterator.map(row => row.getAs[Vector]( $(featuresCol))).toList.iterator import DataUtils._ val cacheInfo = { @@ -273,7 +272,6 @@ class XGBoostRegressionModel private[ml] ( null } } - val dm = new DMatrix( XGBoost.removeMissingValues(featuresIterator.map(_.asXGB), $(missing)), cacheInfo) @@ -281,16 +279,25 @@ class XGBoostRegressionModel private[ml] ( val Array(originalPredictionItr, predLeafItr, predContribItr) = producePredictionItrs(bBooster, dm) Rabit.shutdown() - produceResultIterator(rowItr1, originalPredictionItr, predLeafItr, predContribItr) + Iterator(originalPredictionItr, predLeafItr, predContribItr) } finally { dm.delete() } } else { - Iterator[Row]() + Iterator() } } + val resultRDD = inputRDD.zipPartitions(predictionRDD, preservesPartitioning = true) { + case (inputIterator, predictionItr) => + if (inputIterator.hasNext) { + produceResultIterator(inputIterator, predictionItr.next(), predictionItr.next(), + predictionItr.next()) + } else { + Iterator() + } + } bBooster.unpersist(blocking = false) - dataset.sparkSession.createDataFrame(rdd, generateResultSchema(schema)) + dataset.sparkSession.createDataFrame(resultRDD, generateResultSchema(schema)) } private def produceResultIterator(