-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapsp.txt
25 lines (22 loc) · 1.04 KB
/
apsp.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import numpy.random as npr
from operator import add
nv = 4
a = [[((i, j),npr.rand()) for j in range(i)] for i in range(nv)]
b = [((i, i), 0.0) for i in range(nv)]
edge_rdd = sc.parallelize(a).flatMap(lambda x : x)
edge_rdd = edge_rdd.flatMap(lambda x : [x, ((x[0][1], x[0][0]), x[1])])
self_rdd = sc.parallelize(b)
edge_rdd = edge_rdd.union(self_rdd)
edge_rdd.values().reduce(add)
i = 1 # iteration number 1
row_rdd = edge_rdd.filter(lambda x : x[0][0] == i)
row_rdd2 = row_rdd.map(lambda x : (x[0][0], (x[0][1], x[1])))
join_rdd = row_rdd2.join(row_rdd2).map(lambda x : ((x[1][0][0], x[1][1][0]), x[1][0][1] + x[1][1][1]))
# update the edges
edge_rdd2 = edge_rdd.join(join_rdd).mapValues(lambda x : min(x[0], x[1]))
for i in range(nv):
row_rdd = edge_rdd.filter(lambda x : x[0][0] == i)
row_rdd2 = row_rdd.map(lambda x : (x[0][0], (x[0][1], x[1])))
join_rdd = row_rdd2.join(row_rdd2).map(lambda x : ((x[1][0][0], x[1][1][0]), x[1][0][1] + x[1][1][1]))
edge_rdd = edge_rdd.join(join_rdd).mapValues(lambda x : min(x[0], x[1]))
edge_rdd.values().reduce(add)