-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathREADME
187 lines (140 loc) · 8.58 KB
/
README
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
* UPDATE[29/11/2010] : Added support for 'Aggregate' function
PHOEBUS
=======
Contents
--------
* Introduction
* Computational Model
* Distributed Processing
* Getting it to work
* Pluggable Storage and True Multi-Node Distribution with HDFS and Thrift
* Next Steps
* References
Introduction
------------
Phoebus is a system written in Erlang for Distributed processing of very large graphs that span billions of vertices and edges. It is basically an implementation of Google's Pregel[1] paper.
It supports a Distributed model of computation similar to MapReduce[2], but more tuned to Graph processing.
Computational Model
-------------------
* A Graph is partitioned into a groups of Records.
* A Record consists of a Vertex and its outgoing Edges (An Edge is a Tuple consisting of the edge weight and the target vertex name).
* A User specifies a 'Compute' function that is applied to each Record.
* Computation on the graph happens in a sequence of incremental Super Steps.
* At each Super step, the Compute function is applied to all 'active' vertice of the graph.
* Vertices communicate with each other via Message Passing.
* The Compute function is provided with the Vertex record and all Messages sent to the Vertex in the previous SuperStep.
* An 'Aggregate' can be used to pass some global state to each for the vertices that are executing in the current step via the 'Compute' function.
* A User may additionally specify an Aggregate function and an initial value for that aggregate function.
* A Compute funtion can
- Mutate the value associated to a vertex
- Add/Remove outgoing edges.
- Mutate Edge weight
- Send a Message to any other vertex in the graph.
- Change state of the vertex from 'active' to 'hold'.
- Make some decision based on the value of the 'Aggregate' for that step.
- Return a modified 'Aggregate'.
* The Aggregate function folds over all the aggregates returned by the Compute function. This new value is then used as the aggregate for the next step.
* At the begining of each SuperStep, if there are no more active vertices -and- if there are no messages to be sent to any vertex, the algorithm terminates.
* A User may additionally specify a 'MaxSteps' to stop the algorithm after a some number of super steps.
* A User may additionally specify a 'Combine' funtion that is applied to the all the Messages targetted at a Vertex before the Compute function is applied to it.
Distributed Processing
----------------------
* The Computational model allows the algorithm to be parallelly performed by a cluster of phoebus nodes.
* A 'Job' submitted to a Phoebus cluster is managed by a 'Master' process running on the node that receives the Job.
* The Master partitions the input graph and spawns a 'Worker' for each partition on one of the nodes of the cluster.
* The Master then askes the Worker to perform a Super step on its partition and awaits notification from the Worker of Step completion.
* The Step number is incremented untill all Workers report that they have no more 'active' Vertices and no more outstanding messages to be deliverd.
Getting it to work (Tested on Mac OS X Snow Leopard)
------------------
Requirement:
* rebar (Download from https://github.com/downloads/basho/rebar/rebar and place in a direcotry that is in your PATH)
* git
* erlang (tested on R14B)
1) Clone github..
$ git clone git://github.com/xslogic/phoebus.git
$ cd phoebus
2) Compile and create release..
$ ./generate
==> rel (compile)
==> phoebus (compile)
....
==> rel (generate)
Usage: phoebus {start|stop|restart|reboot|ping|console|attach}
3) Create a sample output directory
$ mkdir /tmp/output
4) Start a two node Phoebus cluster...
Terminal 1:
$ ./run_phoebus 1
.....
Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true]
Eshell V5.7.5 (abort with ^G)
(phoebusr1@my-machine)1>
Terminal 2:
$ ./run_phoebus 2
.....
Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true]
Eshell V5.7.5 (abort with ^G)
(phoebusr2@my-machine)1>
5) Creating input data set: Currently, Phoebus requires each Input record be line delimited. It must be of the form
"<VertexName>\t<VertexValue>\t<EdgeWeight1>\t<TargetVertexName1>\t<EdgeWeight2>\t<TargetVertexName2>...\n"
The module "algos" that comes with Phoebus has a utility function that can generate Binary Tree as a sample input data set.
(phoebusr1@my-machine)1> algos:create_binary_tree("/tmp/input", 4, 1000).
ok
The create_binary_tree function has created an input data set in the directory "/tmp/input".
It has created a 1000 node binary tree with root as "1"
It has split the input into 4 files.
$ head -5 infile1
1 1 1 2 1 3
2 2 1 4 1 5
3 3 1 6 1 7
4 4 1 8 1 9
5 5 1 10 1 11
6) Running a sample algo: The module "algos" has a sample Compute function that calculates shortest path to a Node
(phoebusr1@my-machine)1> AlgoFun = fun algos:shortest_path/3.
#Fun<algos.shortest_path.2>
(phoebusr1@my-machine)2> AggFun = fun(X, Y) -> integer_to_list(erlang:max(list_to_integer(X), list_to_integer(Y))) end.
(phoebusr1@my-machine)3> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AlgoFun}, {aggregate_val, "0"}, {aggregate_fun, AggFun}, {input_dir, "file:///tmp/input/"}, {output_dir, "file:///tmp/output/"}]).
ok
Since the input has 4 files, phoebous spawns 4 workers.. 2 on each node...
7) Running the example will print some inermmediate results on screen.
For eg :
....
["79"]Recvd msgs : ["39:19:9:4:2:1"] : Aggregate : "127"
["79"]Sending msgs : [{"159","79:39:19:9:4:2:1"},{"158","79:39:19:9:4:2:1"}]
....
This is just the output of the Combine function. "79" is the name of the vertex currently running the compute function, the Aggregate "127" is the largest vertex seen by any of the vertex (across ALL the phoebus nodes) currently running the algorithm. "Recvd msgs" are all the messages received by the vertex in that step. At the end of the compute function, vertex "79" has sent two messages.. one to vertex "159" and another to vertex "158".
8) Wait for Algorithm to end.. Once it finishes.. output will be written to "/tmp/output". Listing all Vertices with names starting with "20"...
$ cat /tmp/output/* | grep '^20'
200 100:50:25:12:6:3:1 1 400 1 401
204 102:51:25:12:6:3:1 1 408 1 409
20 10:5:2:1 1 40 1 41
201 100:50:25:12:6:3:1 1 402 1 403
203 101:50:25:12:6:3:1 1 406 1 407
The Second column (The value of the Vertex) gives the shortest path to the vertex from the root of the binary tree
Pluggable Storage and True Multi-Node Distribution with HDFS[3] and Thrift[4]
-----------------------------------------------------------------------------
* A true Multi-Node setup requires a Distributed storage layer.
* Phoebus defines an 'external_store' behaviour, an implementation of which is 'external_store_file'.
* 'external_store_file' was used in the above section to read/store vertices from the local file system.
* Phobues can be mode to read/store to HDFS using the 'external_store_hdfs' module.
* The store layer is decided by the URI scheme of the "input_dir" and "output_dir" parameters passed to 'phoebus_master' when submitting a job.
for eg:
....
(phoebusr1@my-machine)1> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AFun}, {input_dir, "hdfs://localhost:9000/tmp/input/"}, {output_dir, "file:///tmp/output/"}]).
....
* HDFS exposes only Java APIs. Thus, 'external_store_hdfs' is implemented as a wrapper over an erlang thrift client that talks to an external thrift server.
* Phoebus has been tested with the HDFS thrift server that is bundled with Cloudera CDH3 Hadoop distribution[5].
Next Steps
----------
* Need to fix Fault tolerence and Error Handling.. If Worker dies, master must ask another worker on another node to take up work
* [DONE] The Pregel paper talks of an 'Aggregate' Function... need to implement..
* Support Jobs written in Python.
* Add support for Disco DFS[6].
References
----------
1. (http://portal.acm.org/citation.cfm?id=1582716.1582723) Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser, and Grzegorz Czajkowski, Pregel: A System for Large-Scale Graph Processing
2. MapReduce (http://en.wikipedia.org/wiki/MapReduce)
3. Hadoop Distributed File system (http://hadoop.apache.org/hdfs/)
4. Thrift (http://incubator.apache.org/thrift/)
5. Cloudera's CDH3 Hadoop tarball (http://archive.cloudera.com/cdh/3/hadoop-0.20.2+320.tar.gz).
6. Disco Distributed File System (http://discoproject.org/doc/start/ddfs.html)