Skip to content

Latest commit

 

History

History
62 lines (62 loc) · 9.59 KB

lab2.md

File metadata and controls

62 lines (62 loc) · 9.59 KB

CIS Spring Camp - Lab 2 -Graph Processing

In this second lab we are going to use the GraphX extension of Spark to load and process large-scale graphs. Remember that Spark keeps all the loaded datasets and transformations in memory, which might exceed the capacity of the machines we are using. For this lab we have selected smaller samples of the full datasets we have collected, in order to avoid these problems.

We will use the same command line shell from the previous lab (spark-shell). In order to use GraphX functionality we first need to import its libraries, we can do that by typing the following command:

import org.apache.spark.graphx._

Creating Graphs with GraphX

In order to work with GraphX we need to create a Graph RDD that represents both the structure of the graph, and the associated properties to the edges and vertices. We will create the graph by loading from the HDFS an edge list file (that contains one edge per line, in the format (source destination). Spark graphs are directed graphs (so relationships are not symmetrical). They are also multigraphs by default, which means there might be multiple different edges with the same source and destination.

//generate graph from edge dataset. The graph used in this example may takes up to 10 minutes to load into Spark

val followerGraph = GraphLoader.edgeListFile(sc, "hdfs://moonshot-ha-nameservice/camp/follows.name.160113.2015.hash")

This graph only contains the raw node and edges structure, with vertices identified by long numbers. We will now extend the structure to provide the name of each user and channel to its respective vertex as an attribute. We can use for that a second file that contains the mapping of long ids to channel and user names.

val names = sc.textFile("hdfs://moonshot-ha-nameservice/camp/names.map").filter(x=>x.split(" ").length==2).map(line => (line.split(" ")(1).toLong, line.split(" ")(0) ))

val newatt = followerGraph.vertices.map(x=>(x._2,x._1)).join(names).map(x=>((x._2)._1, (((x._2)._2),x._1)) )

val namegraph = followerGraph.outerJoinVertices(newatt){ case (id, name, nameid ) => ( name, nameid) }.cache

The outerJoinVertices generates a new type of graph by join the current vertices RDD with the new RDD, newatt is this case. The newatt here contains the id(Int) of the streamer and the name(String), use newatt.first to check.

We invoke cache in this graph as we will use it Remember how Spark is a lazy framework: no transformation will be executed until an action is requested. We can now start accessing our graph structure. The following actions compute the size of the graph, in both nodes and edges.

namegraph.vertices.count()
namegraph.edges.count()

Q: Here the type of new graph generated is "org.apache.spark.graphx.Graph[(Int, Option[(String, Int)]),Int] ", can you recognize which type is which attributes?

Q: Also, the Option[(String, Int)]) is a little cumbersome here, is it possible to define a new class to encapsulate both attributes?

hint:

case class User(name: String, id:Int)
val namegraph = followerGraph.outerJoinVertices(newatt){ case ( id, nameid1, name) => User( name.get._1 , nameid1) }

Degree exploration

We can use some of the implemented methods of GraphX to compute the degree distribution of each vertex. The inDegrees and outDegrees transformations return pairs of elements, with the first element being the id of the vertex, and the second element the degree count. We will use the stats() method of Spark to compute some statistical properties of the overall degree distribution.

followerGraph.inDegrees.map(x => x._2).stats()
followerGraph.outDegrees.map(x => x._2).stats()

  • Q1, something on the degrees,
  • Q2, something else on the degrees

This exploration performs computation on the distribution of degree values. If we want to find the vertex representing the channel with the highest number of followers, we will need to define a different transformation. This time we will use the reduce action, to send back to the driver a single result, containing the vertex id with the maximum indegree count:

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)

Graph enrichment

We can combine multiple datasets, in order to provide additional information into the graph we have generated. In this section we will obtain the maximum viewership figures of each channel, as collected in the dataset we used yesterday, and add that information to each vertex as the second attribute (first one is the channel name).

// it would be good if we make sure this is computed in the first lab, and saved as a file, so that we can just...
// Compute the max degrees
val names = sc.textFile(sc, "hdfs://moonshot-ha-nameservice/camp/<<<lab1maxviewers>>>").map(line = > (line.split("\t")(1), line.split("\t")(2)) )
val followerGraph = followerGraph.outerJoinVertices(names).cache()

We need to use an outer join because only channels appear in yesterday/s dataset (not users).

Among all the channels with 100 or more followers, compute the ratio of maximum viewers divided by the number of followers. Get the distribution statistics, and the maximum value analogously to what we did before with the in degrees.

val TBC, could be left open to make attendees apply previous knowledge? 

Graph creation

Graphs are no more than an abstraction to represent relationships. In this part of the lab we are going to evaluate the similarity of channels depending on whether users are mutual followers of some of these channels. We will evaluate data the following way: For each user u, who follows channels (c1, c2, ...., cn), we will identify graph edges among all the channels in the follower set. That is, if user u follows channels c1, c2, and c3, the following edges will be generated: c1-c2, c1-c3, c2-c3. When processing all the users, we will obtain repeated edges this way, so we should consolidate these repeated edges. We will do that by adding the number of common edges as an attribute of the created one.

val userFollowers = sc.textFile(sc, "hdfs://moonshot-ha-nameservice/camp/<<<lab1maxviewers>>>").map(line = > (line.split("\t")._1.toLong, line.split("\t")._2.toLong) )
val edges = edges.map(e => (e,1) ).reduceByKey( (a,b)=> a+b).map(x=> Edge(x._1._1, x._1._2, x._2) )
//missing making it directed.
val channelGraph : Graph[Int] = Graph.fromEdges(edges.union(edges.reverse(), 1).cache()

We have used a new method to create the graph. This time, we created an RDD of Edge elements, with sourceId, destinationId, and property(being an Integer, with the number of users with the same). We have created the graph as a directed one this time, by transforming the set of edges into its uniion with the same set reversed.

Spend some time familiarising with this new graph. Using the transformations you learned previously in the lab, find out the number of nodes, edges (will be half of the provided value), and distribution of degrees.

We can use this graph to compute further graph analysis methods, such as the PageRank coefficient. Compute PageRank on this new graph, who is the most central user according to these metrics?

channelGraph.pageRank....()
find user with the most central

Bonus 1: Implement custom graph algorithms using Pregel-style computations

All the graph operations performed in this lab sheet have used methods predefined by GraphX. There are additional methods we haven't covered, which can be seen in the documentation. However, there are numerous computations that need to be implemented manually. For this purpose, GraphX exposes a Pregel interface where custom computations can be implemented. For doing so we can use the .pregel() transformation, that executes three functions iteratively.

a) Update vertex state
b) Merge messages
c) Send messages
These algorithms also require to initialise the state of the graph in many cases, as each computation will update the state of the vertex.

import org.apache.spark.graphx.lib._

channelGraph.joinVertices(to initialise values)
channelGraph.pregel(with the three functions)

def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]