Skip to content

Commit

Permalink
Merge pull request apache#36 from RevolutionAnalytics/vectorize-examples
Browse files Browse the repository at this point in the history
Vectorize examples
  • Loading branch information
shivaram committed Apr 15, 2014
2 parents 49f5f5a + 059ae41 commit bfe7e26
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 55 deletions.
6 changes: 2 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
*.class

# Package Files #
*.jar
*.war
*.ear

.RData
.Rhistory

target/
lib/
work/

# vim tmps
.*.swp
.Rproj.user
SparkR-pkg.Rproj
16 changes: 8 additions & 8 deletions examples/dfc/DFC.R
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Import required matrix and C interface packages
require(SparkR)
require(MASS)
require('Matrix')
require(Rcpp)
require(svd)
library(SparkR)
library(MASS)
library('Matrix')
library(Rcpp)
library(svd)

# Get the command line arguments
args <- commandArgs(trailing = TRUE)
Expand Down Expand Up @@ -133,9 +133,9 @@ apgBase <- function(mat,maxiter) {
tbase <- proc.time() # Timing code

# load required packages
require('Matrix')
require(Rcpp)
require(svd)
library('Matrix')
library(Rcpp)
library(svd)

# Load and compile the fast C++ code
sourceCpp('maskUV.cpp')
Expand Down
74 changes: 39 additions & 35 deletions examples/kmeans.R
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
require(SparkR)
library(SparkR)

# Logistic regression in Spark.
# Note: unlike the example in Scala, a point here is represented as a vector of
# doubles.

parseVector <- function(line) {
nums = strsplit(line, " ")[[1]]
sapply(nums, as.double)
parseVectors <- function(lines) {
lines <- strsplit(as.character(lines) , " ", fixed = TRUE)
list(matrix(as.numeric(unlist(lines)), ncol = length(lines[[1]])))
}

closestPoint <- function(p, centers) {
bestIndex <- 0
closest <- .Machine$double.xmax
for (i in 1:length(centers)) {
tempDist <- sum((p - centers[[i]]) ** 2)
if (tempDist < closest) {
closest <- tempDist
bestIndex <- i
dist.fun <- function(P, C) {
apply(
C,
1,
function(x) {
colSums((t(P) - x)^2))
}
}
bestIndex
}

closestPoint <- function(P, C) {
max.col(-dist.fun(P, C))
}
# Main program

args <- commandArgs(trailing = TRUE)
args <- commandArgs(trailing = TRUE)

if (length(args) != 4) {
print("Usage: kmeans <master> <file> <K> <convergeDist>")
Expand All @@ -36,34 +35,39 @@ K <- as.integer(args[[3]])
convergeDist <- as.double(args[[4]])

lines <- textFile(sc, args[[2]])
points <- cache(lapply(lines, parseVector))
points <- cache(lapplyPartition(lines, parseVectors))
# kPoints <- take(points, K)
kPoints <- takeSample(points, FALSE, K, 16189L)
kPoints <- do.call(rbind, takeSample(points, FALSE, K, 16189L))
tempDist <- 1.0

while (tempDist > convergeDist) {
closest <- lapply(points,
function(p) { list(closestPoint(p, kPoints), list(p, 1)) })

closest <- lapplyPartition(
lapply(points,
function(p) {
cp <- closestPoint(p, kPoints);
mapply(list, unique(cp), split.data.frame(cbind(1, p), cp), SIMPLIFY=FALSE)
}),
function(x) {do.call(c, x)
})

pointStats <- reduceByKey(closest,
function(p1, p2) {
list(p1[[1]] + p2[[1]], p1[[2]] + p2[[2]])
t(colSums(rbind(p1, p2)))
},
2L)

newPoints <- collect(lapply(pointStats,
function(tup) {
list(tup[[1]], tup[[2]][[1]] / tup[[2]][[2]])
}))

tempDist <- sum(sapply(newPoints,
function(tup) {
sum((kPoints[[tup[[1]]]] - tup[[2]]) ** 2)
}))

for (tup in newPoints)
kPoints[[tup[[1]]]] <- tup[[2]]


newPoints <- do.call(
rbind,
collect(lapply(pointStats,
function(tup) {
point.sum <- tup[[2]][, -1]
point.count <- tup[[2]][, 1]
point.sum/point.count
})))

D <- dist.fun(kPoints, newPoints)
tempDist <- sum(D[cbind(1:3, max.col(-D))])
kPoints <- newPoints
cat("Finished iteration (delta = ", tempDist, ")\n")
}

Expand Down
10 changes: 5 additions & 5 deletions examples/logistic_regression.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require(SparkR)
library(SparkR)

args <- commandArgs(trailing = TRUE)

Expand All @@ -12,10 +12,9 @@ sc <- sparkR.init(args[[1]], "LogisticRegressionR")
iterations <- as.integer(args[[3]])
D <- 10

readPartition <- function(part) {
m <- t(sapply(part, function(line) {
as.numeric(strsplit(line, " ")[[1]])
}))
readPartition <- function(part){
part = strsplit(part, " ", fixed = T)
list(matrix(as.numeric(unlist(part)), ncol = length(part[[1]])))
}

# Read data points and convert each partition to a matrix
Expand All @@ -27,6 +26,7 @@ cat("Initial w: ", w, "\n")

# Compute logistic regression gradient for a matrix of data points
gradient <- function(partition) {
partition = partition[[1]]
Y <- partition[, 1] # point labels (first column of input file)
X <- partition[, -1] # point coordinates

Expand Down
13 changes: 11 additions & 2 deletions examples/pi.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require(SparkR)
library(SparkR)

args <- commandArgs(trailing = TRUE)

Expand All @@ -19,7 +19,16 @@ piFunc <- function(elem) {
val
}


piFuncVec <- function(elems) {
message(length(elems))
rands1 <- runif(n = length(elems), min = -1, max = 1)
rands2 <- runif(n = length(elems), min = -1, max = 1)
val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
sum(val)
}

rdd <- parallelize(sc, 1:n, slices)
count <- reduce(lapply(rdd, piFunc), sum)
count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
cat("Pi is roughly", 4.0 * count / n, "\n")
cat("Num elements in RDD ", count(rdd), "\n")
2 changes: 1 addition & 1 deletion examples/wordcount.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require(SparkR)
library(SparkR)

args <- commandArgs(trailing = TRUE)

Expand Down

0 comments on commit bfe7e26

Please sign in to comment.