DBSCAN implementation on Apache Spark.
Output Change the output now includes noisy data and will have a clusterID of "0".
I've update the core DBSCAN code (DBSCAN2
) to include noise data that is close to a cluster as part of the cluster. Thanks to Randall W. and Erik H. And please remember, in this implementation the concept of proximity is based on bounding box calculation and not distance squared.
The build process is based on Apache Maven - However I've added an SBT build for reference.
mvn clean package
Create a python virtual environment - For OS X users, check out Install Python 2.7, virtualenv and virtualenvwrapper on OS X Mavericks/Yosemite
source /usr/local/bin/virtualenvwrapper.sh
mkvirtualenv my-env
pip install --upgrade pip
pip install numpy scipy sklearn matplotlib
import numpy as np
from sklearn import datasets
np.random.seed(0)
n_samples = 100000
noisy_moons = datasets.make_moons(n_samples=n_samples, noise=.05)
X,_ = noisy_moons
with open("/tmp/moons.txt", "wb") as fw:
for i in range(n_samples):
fw.write( "{} {} {}\n".format(i,X[i][0]*100,X[i][1]*100))
spark-submit\
--master "local[*]"\
--driver-java-options="-server -Xms1g -Xmx16g"\
target/dbscan-spark-0.4.jar\
moons.properties
Content of moons.properties
:
input.path=/tmp/moons.txt
output.path=/tmp/output
dbscan.eps=5
dbscan.min.points=5
Aggregate the output content into a single file for later visualization:
cat /tmp/output/part* > /tmp/parts.csv
rm -rf /tmp/output
We will use Matplotlib. Check out Working with Matplotlib in Virtual environments.
Had to add the following to my .bashrc
:
function frameworkpython {
if [[ ! -z "$VIRTUAL_ENV" ]]; then
PYTHONHOME=$VIRTUAL_ENV /usr/local/bin/python "$@"
else
/usr/local/bin/python "$@"
fi
}
Start a Python session:
frameworkpython
import numpy as np
import matplotlib.colors as clr
import matplotlib.pyplot as plt
%matplotlib inline
%matplotlib notebook
moons = np.genfromtxt('/tmp/moons.txt', delimiter=' ', names=['id', 'x', 'y'])
plt.figure(1)
plt.scatter(moons['x'],moons['y'],color='r')
parts = np.genfromtxt('/tmp/parts.csv', delimiter=',', names=['id', 'x', 'y', 'c'])
colors = ['black','red','green','blue','purple']
plt.figure(2)
plt.scatter(parts['x'],parts['y'],c=parts['c'], cmap=clr.ListedColormap(colors), lw=0)
plt.show(block=False)
First, check out DBSCAN and how it works. Basically, it continuously group into the same cluster the points that are near each other (or within a certain distance from each other). The following is the process of how do to this on a distributed, share-nothing platform.
Given the above points, a virtual fishnet is cast on the point space.
The points in the same cells are processed together in a distributed matter. This togetherness is achieved by mapping the point coordinates (x,y) to the cell lower left corner (r,c). All the points that have the same (r,c) are grouped together and locally processed on different nodes. This is a very simple partitioning scheme to distribute spatial data, where the user can specify the fishnet cell size. By default, the fishnet cell size is in this implementation. Below are references to implementations where the partitioning is proportional to the density of the data. The data is scanned through and subdivided to generate a fishnet with different cell sizes.
DBSCAN clusters points based on their proximity to each other. When the point space is partitioned based on cells and executed on different nodes, the edge points do not "see" all their neighbors as they are not in the processing partition.
In the above figure, the points in the center cell are processed together on one node and each surrounding cell with its points is processed by a different node.
So the question is; How do I make a point A in cell 1 "see" a point B in cell 2 on a share-nothing platform ?
Simple, you make point A part of cell 2 and you make point B part of cell 1 ! Let me illustrate:
So, all points in a cell that are away from the edge are emitted to the neighboring cells, in such that now, we can perform per node a local DBSCAN. This double processing of the same edge point by two nodes is actually a blessing, as we can now relate and merge two neighboring local clusters into one global one. The handling of cluster merging by edge points is inspired from the Hadoop MapReduce implementation described in MR-DBSCAN: a scalable MapReduce-based DBSCAN algorithm for heavily skewed data. Let me further illustrate:
The node that is "reducing" Cell 1
performs a local DBSCAN on C,A,B
to produce cluster P
.
The node that is "reducing" Cell 2
performs a local DBSCAN on A,B,D
to produce cluster Q
.
Since A
is in both cluster P
and Q
that means P
and Q
should be merged into one global cluster G
.
We relabel all the points explicitly in the bounding box of the cell from their local cluster class to their associated global cluster class.
In Cell 1
, the points C
and A
are classed as G
and in Cell 2
, the points B
and D
are classed as G
too.
The last column in the output CSV files contains the cluster id as a string. If a formed cluster is wholly within a cell, then the points in that cluster will have a cluster id in the form "cell-row:cell-col:-1". If a formed cluster straddles multiple cells, then the points in that cluster will have a cluster id in the form "clusterId", where the content of clusterId consists of only numerical digits, for example "1", "2", etc...
The local DBSCAN implementation does not return 'noisy' points.- The neighborhood search in
SpatialIndex
is based on a bounding box search where the box is centered about the given point and the width and height of the box is . All the points that fall into the box are considered neighbors.
- Implement the Graph using GraphX.
Return 'noisy' points.- Make the
Point
implementation multidimensional with custom proximity calculation - (Note to self, is there a boundary issue ?) - I wonder if bandwidth as described here can be used as a "good"
- https://twitter.github.io/scala_school/
- http://oldfashionedsoftware.com/2009/07/10/scala-code-review-foldleft-and-foldright/
- http://codereview.stackexchange.com/questions/29699/bfs-and-dfs-in-scala
- http://stackoverflow.com/questions/5471234/how-to-implement-a-dfs-with-immutable-data-types
- http://people.cs.nctu.edu.tw/~rsliang/dbscan/testdatagen.html
- https://code.google.com/p/guava-libraries/source/browse/guava/src/com/google/common/collect/Hashing.java
- http://scikit-learn.org/stable/auto_examples/cluster/plot_cluster_comparison.html#example-cluster-plot-cluster-comparison-py
- http://www.sersc.org/journals/IJDTA/vol7_no3/5.pdf
- https://github.com/alitouka/spark_dbscan
- https://github.com/irvingc/dbscan-on-spark
- https://github.com/aizook/SparkAI
- https://www.oreilly.com/ideas/clustering-geolocated-data-using-spark-and-dbscan
docker run\
--rm\
--privileged=true\
--hostname=quickstart.cloudera\
--name quickstart\
--volume $(pwd):/dbscan-spark\
-t -i cloudera/quickstart\
/bin/bash
DAEMONS="\
zookeeper-server\
hadoop-hdfs-datanode\
hadoop-hdfs-namenode\
hadoop-httpfs\
hadoop-yarn-nodemanager\
hadoop-yarn-resourcemanager\
"
for daemon in ${DAEMONS}; do
sudo service ${daemon} start
done
cd /dbscan-spark
./ss-yarn.sh
jupyter notebook --ip=0.0.0.0 --port=8888
import matplotlib.colors as clr
import matplotlib.pyplot as plt
import numpy as np
% matplotlib inline
parts = np.genfromtxt('target/smiley2.csv', delimiter=',', names=['id', 'x', 'y', 'c'])
colors = ['black', 'red', 'green', 'blue', 'purple']
plt.figure(2)
plt.scatter(parts['x'], parts['y'], c=parts['c'], cmap=clr.ListedColormap(colors), lw=0)
plt.show(block=False)
docker pull python:2.7
docker run -i -t --rm -v $(pwd):/dbscan-spark python:2.7 bash
csrutil disable; reboot
pip uninstall numpy
sudo rm -rf /System/Library/Frameworks/Python.framework/Versions/2.7/Extras/lib/python/numpy