In this second lab we are going to use the GraphX extension of Spark to load and process large-scale graphs. Remember that Spark keeps all the loaded datasets and transformations in memory, which might exceed the capacity of the machines we are using. For this lab we have selected smaller samples of the full datasets we have collected, in order to avoid these problems.
We will use the same command line shell from the previous lab (spark-shell). In order to use GraphX functionality we first need to import its libraries, we can do that by typing the following command:
import org.apache.spark.graphx._
In order to work with GraphX we need to create a Graph RDD that represents both the structure of the graph, and the associated properties to the edges and vertices. We will create the graph by loading from the HDFS an edge list file (that contains one edge per line, in the format (source destination). Spark graphs are directed graphs (so relationships are not symmetrical). They are also multigraphs by default, which means there might be multiple different edges with the same source and destination.
//generate graph from edge dataset. The graph used in this example may takes up to 10 minutes to load into Spark
val followerGraph = GraphLoader.edgeListFile(sc, "hdfs://moonshot-ha-nameservice/camp/follows.name.160113.2015.hash")
This graph only contains the raw node and edges structure, with vertices identified by long numbers. We will now extend the structure to provide the name of each user and channel to its respective vertex as an attribute. We can use for that a second file that contains the mapping of long ids to channel and user names.
val names = sc.textFile("hdfs://moonshot-ha-nameservice/camp/names.map").filter(x=>x.split(" ").length==2).map(line => (line.split(" ")(1).toLong, line.split(" ")(0) ))
val newatt = followerGraph.vertices.map(x=>(x._2,x._1)).join(names).map(x=>((x._2)._1, (((x._2)._2),x._1)) )
val namegraph = followerGraph.outerJoinVertices(newatt){ case (id, name, nameid ) => ( name, nameid) }.cache
The outerJoinVertices generates a new type of graph by join the current vertices RDD with the new RDD, newatt is this case. The newatt here contains the id(Int) of the streamer and the name(String), use newatt.first to check.
We invoke cache in this graph as we will use it Remember how Spark is a lazy framework: no transformation will be executed until an action is requested. We can now start accessing our graph structure. The following actions compute the size of the graph, in both nodes and edges.
namegraph.vertices.count()
namegraph.edges.count()
Q: Here the type of new graph generated is "org.apache.spark.graphx.Graph[(Int, Option[(String, Int)]),Int] ", can you recognize which type is which attributes?
Q: Also, the Option[(String, Int)]) is a little cumbersome here, is it possible to define a new class to encapsulate both attributes?
hint:
case class User(name: String, id:Int)
val namegraph = followerGraph.outerJoinVertices(newatt){ case ( id, nameid1, name) => User( name.get._1 , nameid1) }
We can use some of the implemented methods of GraphX to compute the degree distribution of each vertex. The inDegrees and outDegrees transformations return pairs of elements, with the first element being the id of the vertex, and the second element the degree count. We will use the stats() method of Spark to compute some statistical properties of the overall degree distribution.
followerGraph.inDegrees.map(x => x._2).stats()
followerGraph.outDegrees.map(x => x._2).stats()
- Q1, something on the degrees,
- Q2, something else on the degrees
This exploration performs computation on the distribution of degree values. If we want to find the vertex representing the channel with the highest number of followers, we will need to define a different transformation. This time we will use the reduce action, to send back to the driver a single result, containing the vertex id with the maximum indegree count:
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
We can combine multiple datasets, in order to provide additional information into the graph we have generated. In this section we will obtain the maximum viewership figures of each channel, as collected in the dataset we used yesterday, and add that information to each vertex as the second attribute (first one is the channel name).
// it would be good if we make sure this is computed in the first lab, and saved as a file, so that we can just...
// Compute the max degrees
val names = sc.textFile(sc, "hdfs://moonshot-ha-nameservice/camp/<<<lab1maxviewers>>>").map(line = > (line.split("\t")(1), line.split("\t")(2)) )
val followerGraph = followerGraph.outerJoinVertices(names).cache()
We need to use an outer join because only channels appear in yesterday/s dataset (not users).
Among all the channels with 100 or more followers, compute the ratio of maximum viewers divided by the number of followers. Get the distribution statistics, and the maximum value analogously to what we did before with the in degrees.
val TBC, could be left open to make attendees apply previous knowledge?
val userFollowers = sc.textFile(sc, "hdfs://moonshot-ha-nameservice/camp/<<<lab1maxviewers>>>").map(line = > (line.split("\t")._1.toLong, line.split("\t")._2.toLong) )
val edges = edges.map(e => (e,1) ).reduceByKey( (a,b)=> a+b).map(x=> Edge(x._1._1, x._1._2, x._2) )
//missing making it directed.val channelGraph : Graph[Int] = Graph.fromEdges(edges.union(edges.reverse(), 1).cache()
We have used a new method to create the graph. This time, we created an RDD of Edge elements, with sourceId, destinationId, and property(being an Integer, with the number of users with the same). We have created the graph as a directed one this time, by transforming the set of edges into its uniion with the same set reversed.
Spend some time familiarising with this new graph. Using the transformations you learned previously in the lab, find out the number of nodes, edges (will be half of the provided value), and distribution of degrees.
We can use this graph to compute further graph analysis methods, such as the PageRank coefficient. Compute PageRank on this new graph, who is the most central user according to these metrics?
channelGraph.pageRank....()
find user with the most central
All the graph operations performed in this lab sheet have used methods predefined by GraphX. There are additional methods we haven't covered, which can be seen in the documentation. However, there are numerous computations that need to be implemented manually. For this purpose, GraphX exposes a Pregel interface where custom computations can be implemented. For doing so we can use the .pregel() transformation, that executes three functions iteratively.
a) Update vertex state
b) Merge messages
c) Send messages
These algorithms also require to initialise the state of the graph in many cases, as each computation will update the state of the vertex.
import org.apache.spark.graphx.lib._
channelGraph.joinVertices(to initialise values)
channelGraph.pregel(with the three functions)
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]