-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcentralities.py
92 lines (70 loc) · 3.08 KB
/
centralities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from graphframes import *
from pyspark import SparkContext
from pyspark.sql import SparkSession
from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F
from pyspark.sql.types import *
from operator import itemgetter
spark = SparkSession.builder.appName('centralities').getOrCreate()
# Create GraphFrame out of the .csv files.
v = spark.read.csv("/vagrant/social-nodes.csv", header=True)
e = spark.read.csv("/vagrant/social-relationships.csv", header=True)
g = GraphFrame(v, e)
g.vertices.show()
g.edges.show()
# Find and show the degree centrality.
total_degree = g.degrees
in_degree = g.inDegrees
out_degree = g.outDegrees
(total_degree.join(in_degree, "id", how="left")
.join(out_degree, "id", how="left")
.fillna(0)
.sort("inDegree", ascending=False)
.show())
# Find and show the closeness centrality.
def collect_paths(paths):
return F.collect_set(paths)
collect_paths_udf = F.udf(collect_paths, ArrayType(StringType()))
paths_type = ArrayType(
StructType([StructField("id", StringType()), StructField("distance", IntegerType())]))
def flatten(ids):
flat_list = [item for sublist in ids for item in sublist]
return list(dict(sorted(flat_list, key=itemgetter(0))).items())
flatten_udf = F.udf(flatten, paths_type)
def new_paths(paths, id):
paths = [{"id": col1, "distance": col2 + 1} for col1,
col2 in paths if col1 != id]
paths.append({"id": id, "distance": 1})
return paths
new_paths_udf = F.udf(new_paths, paths_type)
def merge_paths(ids, new_ids, id):
joined_ids = ids + (new_ids if new_ids else [])
merged_ids = [(col1, col2) for col1, col2 in joined_ids if col1 != id]
best_ids = dict(sorted(merged_ids, key=itemgetter(1), reverse=True))
return [{"id": col1, "distance": col2} for col1, col2 in best_ids.items()]
merge_paths_udf = F.udf(merge_paths, paths_type)
def calculate_closeness(ids):
nodes = len(ids)
total_distance = sum([col2 for col1, col2 in ids])
return 0 if total_distance == 0 else nodes * 1.0 / total_distance
closeness_udf = F.udf(calculate_closeness, DoubleType())
vertices = g.vertices.withColumn("ids", F.array())
cached_vertices = AM.getCachedDataFrame(vertices)
g2 = GraphFrame(cached_vertices, g.edges)
print(g2.vertices.count())
for i in range(0, g2.vertices.count()):
msg_dst = new_paths_udf(AM.src["ids"], AM.src["id"])
msg_src = new_paths_udf(AM.dst["ids"], AM.dst["id"])
agg = g2.aggregateMessages(F.collect_set(AM.msg).alias("agg"),
sendToSrc=msg_src, sendToDst=msg_dst)
res = agg.withColumn("newIds", flatten_udf("agg")).drop("agg")
new_vertices = (g2.vertices.join(res, on="id", how="left_outer")
.withColumn("mergedIds", merge_paths_udf("ids", "newIds",
"id")).drop("ids", "newIds")
.withColumnRenamed("mergedIds", "ids"))
cached_new_vertices = AM.getCachedDataFrame(new_vertices)
g2 = GraphFrame(cached_new_vertices, g2.edges)
(g2.vertices
.withColumn("closeness", closeness_udf("ids"))
.sort("closeness", ascending=False)
.show(truncate=False))