diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
new file mode 100644
index 0000000000000..a255ac4adc305
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering}
+import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter
+ with HasFeaturesCol with HasPredictionCol with HasWeightCol {
+
+ /**
+ * The number of clusters to create (k). Must be > 1. Default: 2.
+ * @group param
+ */
+ @Since("2.3.0")
+ final val k = new IntParam(this, "k", "The number of clusters to create. " +
+ "Must be > 1.", ParamValidators.gt(1))
+
+ /** @group getParam */
+ @Since("2.3.0")
+ def getK: Int = $(k)
+
+ /**
+ * Param for the initialization algorithm. This can be either "random" to use a random vector
+ * as vertex properties, or "degree" to use normalized sum similarities. Default: random.
+ */
+ @Since("2.3.0")
+ final val initMode = {
+ val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+ new Param[String](this, "initMode", "The initialization algorithm. " +
+ "Supported options: 'random' and 'degree'.", allowedParams)
+ }
+
+ /** @group expertGetParam */
+ @Since("2.3.0")
+ def getInitMode: String = $(initMode)
+
+ /**
+ * Param for the column name for ids returned by PowerIterationClustering.transform().
+ * Default: "id"
+ * @group param
+ */
+ @Since("2.3.0")
+ val idCol = new Param[String](this, "id", "column name for ids.")
+
+ /** @group getParam */
+ @Since("2.3.0")
+ def getIdCol: String = $(idCol)
+
+ /**
+ * Param for the column name for neighbors required by PowerIterationClustering.transform().
+ * Default: "neighbor"
+ * @group param
+ */
+ @Since("2.3.0")
+ val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.")
+
+ /** @group getParam */
+ @Since("2.3.0")
+ def getNeighborCol: String = $(neighborCol)
+
+ /**
+ * Validates the input schema
+ * @param schema input schema
+ */
+ protected def validateSchema(schema: StructType): Unit = {
+ SchemaUtils.checkColumnType(schema, $(idCol), LongType)
+ SchemaUtils.checkColumnType(schema, $(predictionCol), IntegerType)
+ }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
+ * Lin and Cohen. From the abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * Note that we implement [[PowerIterationClustering]] as a transformer. The [[transform]] is an
+ * expensive operation, because it uses PIC algorithm to cluster the whole input dataset.
+ *
+ * @see
+ * Spectral clustering (Wikipedia)
+ */
+@Since("2.3.0")
+@Experimental
+class PowerIterationClustering private[clustering] (
+ @Since("2.3.0") override val uid: String)
+ extends Transformer with PowerIterationClusteringParams with DefaultParamsWritable {
+
+ setDefault(
+ k -> 2,
+ maxIter -> 20,
+ initMode -> "random",
+ idCol -> "id",
+ weightCol -> "weight",
+ neighborCol -> "neighbor")
+
+ @Since("2.3.0")
+ override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra)
+
+ @Since("2.3.0")
+ def this() = this(Identifiable.randomUID("PowerIterationClustering"))
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setFeaturesCol(value: String): this.type = set(featuresCol, value)
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setPredictionCol(value: String): this.type = set(predictionCol, value)
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setK(value: Int): this.type = set(k, value)
+
+ /** @group expertSetParam */
+ @Since("2.3.0")
+ def setInitMode(value: String): this.type = set(initMode, value)
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setMaxIter(value: Int): this.type = set(maxIter, value)
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setIdCol(value: String): this.type = set(idCol, value)
+
+ /**
+ * Sets the value of param [[weightCol]].
+ * Default is "weight"
+ *
+ * @group setParam
+ */
+ @Since("2.3.0")
+ def setWeightCol(value: String): this.type = set(weightCol, value)
+
+ /**
+ * Sets the value of param [[neighborCol]].
+ * Default is "neighbor"
+ *
+ * @group setParam
+ */
+ @Since("2.3.0")
+ def setNeighborCol(value: String): this.type = set(neighborCol, value)
+
+ @Since("2.3.0")
+ override def transform(dataset: Dataset[_]): DataFrame = {
+ val sparkSession = dataset.sparkSession
+ val rdd: RDD[(Long, Long, Double)] =
+ dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap {
+ case Row(id: Long, nbr: Vector, weight: Vector) =>
+ require(nbr.size == weight.size,
+ "The length of neighbor list must be equal to the the length of the weight list.")
+ nbr.toArray.toIterator.zip(weight.toArray.toIterator)
+ .map(x => (id, x._1.toLong, x._2))}
+ val algorithm = new MLlibPowerIterationClustering()
+ .setK($(k))
+ .setInitializationMode($(initMode))
+ .setMaxIterations($(maxIter))
+ val model = algorithm.run(rdd)
+
+ val rows: RDD[Row] = model.assignments.map {
+ case assignment: Assignment => Row(assignment.id, assignment.cluster)
+ }
+
+ val schema = transformSchema(new StructType(Array(StructField($(idCol), LongType),
+ StructField($(predictionCol), IntegerType))))
+ val result = sparkSession.createDataFrame(rows, schema)
+
+ dataset.join(result, "id")
+ }
+
+ @Since("2.3.0")
+ override def transformSchema(schema: StructType): StructType = {
+ validateSchema(schema)
+ schema
+ }
+
+}
+
+@Since("2.3.0")
+object PowerIterationClustering extends DefaultParamsReadable[PowerIterationClustering] {
+
+ @Since("2.3.0")
+ override def load(path: String): PowerIterationClustering = super.load(path)
+}
+
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
new file mode 100644
index 0000000000000..1c7f01b0dfdff
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+
+class PowerIterationClusteringSuite extends SparkFunSuite
+ with MLlibTestSparkContext with DefaultReadWriteTest {
+
+ @transient var data: Dataset[_] = _
+ @transient var malData: Dataset[_] = _
+ final val r1 = 1.0
+ final val n1 = 10
+ final val r2 = 4.0
+ final val n2 = 40
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, n1, n2)
+ }
+
+ test("default parameters") {
+ val pic = new PowerIterationClustering()
+
+ assert(pic.getK === 2)
+ assert(pic.getMaxIter === 20)
+ assert(pic.getInitMode === "random")
+ assert(pic.getFeaturesCol === "features")
+ assert(pic.getPredictionCol === "prediction")
+ assert(pic.getIdCol === "id")
+ assert(pic.getWeightCol === "weight")
+ assert(pic.getNeighborCol === "neighbor")
+ }
+
+ test("set parameters") {
+ val pic = new PowerIterationClustering()
+ .setK(9)
+ .setMaxIter(33)
+ .setInitMode("degree")
+ .setFeaturesCol("test_feature")
+ .setPredictionCol("test_prediction")
+ .setIdCol("test_id")
+ .setWeightCol("test_weight")
+ .setNeighborCol("test_neighbor")
+
+ assert(pic.getK === 9)
+ assert(pic.getMaxIter === 33)
+ assert(pic.getInitMode === "degree")
+ assert(pic.getFeaturesCol === "test_feature")
+ assert(pic.getPredictionCol === "test_prediction")
+ assert(pic.getIdCol === "test_id")
+ assert(pic.getWeightCol === "test_weight")
+ assert(pic.getNeighborCol === "test_neighbor")
+ }
+
+ test("parameters validation") {
+ intercept[IllegalArgumentException] {
+ new PowerIterationClustering().setK(1)
+ }
+ intercept[IllegalArgumentException] {
+ new PowerIterationClustering().setInitMode("no_such_a_mode")
+ }
+ }
+
+ test("power iteration clustering") {
+ val n = n1 + n2
+
+ val model = new PowerIterationClustering()
+ .setK(2)
+ .setMaxIter(40)
+ val result = model.transform(data)
+
+ val predictions = Array.fill(2)(mutable.Set.empty[Long])
+ result.select("id", "prediction").collect().foreach {
+ case Row(id: Long, cluster: Integer) => predictions(cluster) += id
+ }
+ assert(predictions.toSet == Set((1 until n1).toSet, (n1 until n).toSet))
+
+ val result2 = new PowerIterationClustering()
+ .setK(2)
+ .setMaxIter(10)
+ .setInitMode("degree")
+ .transform(data)
+ val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
+ result2.select("id", "prediction").collect().foreach {
+ case Row(id: Long, cluster: Integer) => predictions2(cluster) += id
+ }
+ assert(predictions2.toSet == Set((1 until n1).toSet, (n1 until n).toSet))
+
+ val expectedColumns = Array("id", "prediction")
+ expectedColumns.foreach { column =>
+ assert(result2.columns.contains(column))
+ }
+ }
+
+ test("read/write") {
+ val t = new PowerIterationClustering()
+ .setK(4)
+ .setMaxIter(100)
+ .setInitMode("degree")
+ .setFeaturesCol("test_feature")
+ .setPredictionCol("test_prediction")
+ .setIdCol("test_id")
+ testDefaultReadWrite(t)
+ }
+}
+
+object PowerIterationClusteringSuite {
+
+ case class TestRow2(id: Long, neighbor: Vector, weight: Vector)
+ /** Generates a circle of points. */
+ private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
+ Array.tabulate(n) { i =>
+ val theta = 2.0 * math.Pi * i / n
+ (r * math.cos(theta), r * math.sin(theta))
+ }
+ }
+
+ /** Computes Gaussian similarity. */
+ private def sim(x: (Double, Double), y: (Double, Double)): Double = {
+ val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
+ math.exp(-dist2 / 2.0)
+ }
+
+ def generatePICData(spark: SparkSession, r1: Double, r2: Double,
+ n1: Int, n2: Int): DataFrame = {
+ // Generate two circles following the example in the PIC paper.
+ val n = n1 + n2
+ val points = genCircle(r1, n1) ++ genCircle(r2, n2)
+
+ val similarities = for (i <- 1 until n) yield {
+ val neighbor = for (j <- 0 until i) yield {
+ j.toLong
+ }
+ val weight = for (j <- 0 until i) yield {
+ sim(points(i), points(j))
+ }
+ (i.toLong, neighbor.toArray, weight.toArray)
+ }
+
+ val sc = spark.sparkContext
+
+ val rdd = sc.parallelize(similarities).map{
+ case (id: Long, nbr: Array[Long], weight: Array[Double]) =>
+ TestRow2(id, Vectors.dense(nbr.map(i => i.toDouble)), Vectors.dense(weight))}
+ spark.createDataFrame(rdd)
+ }
+
+}