From dcd8b399f4bce9b6409dd347460a0f2bf48fbce1 Mon Sep 17 00:00:00 2001 From: Pengfei Xuan Date: Wed, 30 Mar 2016 10:56:42 -0400 Subject: [PATCH] Apply initial centroids on Spark Kmeans workload, which makes HiBench hold the same convergence condition on both MapReduce and Spark benchmark. --- .../spark/examples/mllib/JavaKMeans.java | 55 +++++++++++-------- workloads/kmeans/spark/java/bin/run.sh | 4 +- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/sparkbench/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java b/src/sparkbench/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java index 637114297..f997a0821 100644 --- a/src/sparkbench/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java +++ b/src/sparkbench/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java @@ -20,9 +20,9 @@ */ package org.apache.spark.examples.mllib; -import java.util.regex.Pattern; - import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.mahout.clustering.kmeans.Kluster; import org.apache.mahout.math.VectorWritable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -41,37 +41,25 @@ */ public final class JavaKMeans { - private static class ParsePoint implements Function { - private static final Pattern SPACE = Pattern.compile(" "); - - @Override - public Vector call(String line) { - String[] tok = SPACE.split(line); - double[] point = new double[tok.length]; - for (int i = 0; i < tok.length; ++i) { - point[i] = Double.parseDouble(tok[i]); - } - return Vectors.dense(point); - } - } - public static void main(String[] args) { - if (args.length < 3) { + if (args.length < 4) { System.err.println( - "Usage: JavaKMeans []"); + "Usage: JavaKMeans []"); System.exit(1); } String inputFile = args[0]; - int k = Integer.parseInt(args[1]); - int iterations = Integer.parseInt(args[2]); + String inputCluster = args[1]; + int k = Integer.parseInt(args[2]); + int iterations = Integer.parseInt(args[3]); int runs = 1; - if (args.length >= 4) { - runs = Integer.parseInt(args[3]); + if (args.length >= 5) { + runs = Integer.parseInt(args[4]); } SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans"); JavaSparkContext sc = new JavaSparkContext(sparkConf); + // Load input points JavaPairRDD data = sc.sequenceFile(inputFile, LongWritable.class, VectorWritable.class); @@ -87,7 +75,28 @@ public Vector call(Tuple2 e) { } }); - KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL()); + // Load initial centroids + JavaPairRDD clusters = sc.sequenceFile(inputCluster, Text.class, Kluster.class); + JavaRDD centroids = clusters.map(new Function, Vector>() { + @Override + public Vector call(Tuple2 e) { + org.apache.mahout.math.Vector centroid = e._2().getCenter(); + double[] v = new double[centroid.size()]; + for (int i = 0; i < centroid.size(); ++i) { + v[i] = centroid.get(i); + } + return Vectors.dense(v); + } + }); + + // Train model + KMeansModel initModel = new KMeansModel(centroids.collect()); + KMeansModel model = new KMeans() + .setK(k) + .setMaxIterations(iterations) + .setRuns(runs) + .setInitialModel(initModel) + .run(points.rdd()); System.out.println("Cluster centers:"); for (Vector center : model.clusterCenters()) { diff --git a/workloads/kmeans/spark/java/bin/run.sh b/workloads/kmeans/spark/java/bin/run.sh index 1696c6b64..d2194de27 100755 --- a/workloads/kmeans/spark/java/bin/run.sh +++ b/workloads/kmeans/spark/java/bin/run.sh @@ -27,7 +27,7 @@ rmr-hdfs $OUTPUT_HDFS || true SIZE=`dir_size $INPUT_HDFS` START_TIME=`timestamp` -run-spark-job org.apache.spark.examples.mllib.JavaKMeans $INPUT_HDFS/samples $K $MAX_ITERATION +run-spark-job org.apache.spark.examples.mllib.JavaKMeans $INPUT_HDFS/samples $INPUT_HDFS/cluster $K $MAX_ITERATION END_TIME=`timestamp` gen_report ${START_TIME} ${END_TIME} ${SIZE} @@ -38,5 +38,5 @@ leave_bench # run bench #run-spark-job org.apache.spark.examples.mllib.JavaKMeans $INPUT_HDFS $K $MAX_ITERATION || exit 1 -#$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master ${SPARK_MASTER} ${SPARK_EXAMPLES_JAR} $INPUT_HDFS $K $MAX_ITERATION +#$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master ${SPARK_MASTER} ${SPARK_EXAMPLES_JAR} $INPUT_HDFS/samples $INPUT_HDFS/cluster $K $MAX_ITERATION