Skip to content

Commit

Permalink
[Feat][Spark] Memory tuning for GraphAr spark with persist and storag…
Browse files Browse the repository at this point in the history
…e level (#326)
  • Loading branch information
acezen authored Jan 18, 2024
1 parent a37eee1 commit f2c8fc4
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 7 deletions.
3 changes: 3 additions & 0 deletions spark/src/main/java/com/alibaba/graphar/GeneralParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.alibaba.graphar;

import org.apache.spark.storage.StorageLevel;

/** General constant parameters for graphar. */
public class GeneralParams {
// column name
Expand All @@ -33,4 +35,5 @@ public class GeneralParams {
public static final Long defaultEdgeChunkSize = 4194304L; // 2^22
public static final String defaultFileType = "parquet";
public static final String defaultVersion = "v1";
public static final StorageLevel defaultStorageLevel = StorageLevel.MEMORY_AND_DISK_SER();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package com.alibaba.graphar.graph

import com.alibaba.graphar.{AdjListType, GraphInfo, VertexInfo, EdgeInfo}
import com.alibaba.graphar.{
AdjListType,
GraphInfo,
VertexInfo,
EdgeInfo,
GeneralParams
}
import com.alibaba.graphar.reader.{VertexReader, EdgeReader}
import com.alibaba.graphar.writer.{VertexWriter, EdgeWriter}

Expand Down Expand Up @@ -67,9 +73,11 @@ object GraphTransformer {
// read vertex chunks from the source graph
val reader = new VertexReader(source_prefix, source_vertex_info, spark)
val df = reader.readAllVertexPropertyGroups()
df.persist(GeneralParams.defaultStorageLevel)
// write vertex chunks for the dest graph
val writer = new VertexWriter(dest_prefix, dest_vertex_info, df)
writer.writeVertexProperties()
df.unpersist()
}
}

Expand Down Expand Up @@ -138,6 +146,7 @@ object GraphTransformer {
)
df = reader.readEdges(false)
has_loaded = true
df.persist(GeneralParams.defaultStorageLevel)
}

// read vertices number
Expand Down Expand Up @@ -167,6 +176,7 @@ object GraphTransformer {
df
)
writer.writeEdges()
df.unpersist()
}
}
}
Expand Down
22 changes: 18 additions & 4 deletions spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@ class GraphWriter() {
)
}
vertices += label -> df
vertexNums += label -> df.count
primaryKeys += label -> primaryKey
}

/**
* Put the egde datafrme into writer.
* Put the egde dataframe into writer.
* @param relation
* 3-Tuple (source label, edge label, target label) to indicate edge type.
* @param df
Expand Down Expand Up @@ -87,15 +86,26 @@ class GraphWriter() {
scala.collection.mutable.Map[String, DataFrame]()
vertexInfos.foreach {
case (label, vertexInfo) => {
val vertex_num = vertexNums(label)
val primaryKey = primaryKeys(label)
vertices(label).persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame
val df_and_mapping = IndexGenerator
.generateVertexIndexColumnAndIndexMapping(vertices(label), primaryKey)
df_and_mapping._1.persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame with index
df_and_mapping._2.persist(
GeneralParams.defaultStorageLevel
) // cache the index mapping DataFrame
vertices(label).unpersist() // unpersist the vertex DataFrame
val df_with_index = df_and_mapping._1
indexMappings += label -> df_and_mapping._2
val writer =
new VertexWriter(prefix, vertexInfo, df_with_index, vertex_num)
new VertexWriter(prefix, vertexInfo, df_with_index)
vertexNums += label -> writer.getVertexNum()
writer.writeVertexProperties()
df_with_index.unpersist()
}
}

Expand All @@ -117,6 +127,9 @@ class GraphWriter() {
src_vertex_index_mapping,
dst_vertex_index_mapping
)
edge_df_with_index.persist(
GeneralParams.defaultStorageLevel
) // cache the edge DataFrame with index

val adj_lists = edgeInfo.getAdj_lists
val adj_list_it = adj_lists.iterator
Expand All @@ -140,6 +153,7 @@ class GraphWriter() {
)
writer.writeEdges()
}
edge_df_with_index.unpersist()
}
}
}
Expand Down
18 changes: 16 additions & 2 deletions spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object EdgeWriter {

// sort by primary key and generate continue edge id for edge records
val sortedDfRDD = edgeDf.sort(colName).rdd
sortedDfRDD.persist(GeneralParams.defaultStorageLevel)
// generate continue edge id for every edge
val partitionCounts = sortedDfRDD
.mapPartitionsWithIndex(
Expand All @@ -82,6 +83,7 @@ object EdgeWriter {
val start = broadcastedPartitionCounts.value(i)
for { (row, j) <- ps.zipWithIndex } yield (start + j, row)
})
rddWithEid.persist(GeneralParams.defaultStorageLevel)

// Construct partitioner for edge chunk
// get edge num of every vertex chunk
Expand All @@ -101,6 +103,8 @@ object EdgeWriter {
edgeNumMutableMap(i) = 0
}
}
sortedDfRDD.unpersist() // unpersist the sortedDfRDD

var eidBeginOfVertexChunks =
new Array[Long](vertexChunkNum + 1) // eid begin of vertex chunks
var aggEdgeChunkNumOfVertexChunks =
Expand Down Expand Up @@ -130,7 +134,8 @@ object EdgeWriter {
val partitionRDD =
rddWithEid.repartitionAndSortWithinPartitions(partitioner).values
val partitionEdgeDf = spark.createDataFrame(partitionRDD, edgeSchema)
partitionEdgeDf.cache()
rddWithEid.unpersist() // unpersist the rddWithEid
partitionEdgeDf.persist(GeneralParams.defaultStorageLevel)

// generate offset DataFrames
if (
Expand All @@ -141,6 +146,7 @@ object EdgeWriter {
iterator.map(row => (row(colIndex).asInstanceOf[Long], 1))
})
.reduceByKey(_ + _)
edgeCountsByPrimaryKey.persist(GeneralParams.defaultStorageLevel)
val offsetDfSchema = StructType(
Seq(StructField(GeneralParams.offsetCol, IntegerType))
)
Expand Down Expand Up @@ -168,10 +174,11 @@ object EdgeWriter {
})
.map { case (k, v) => Row(v) }
val offsetChunk = spark.createDataFrame(offsetRDD, offsetDfSchema)
offsetChunk.cache()
offsetChunk.persist(GeneralParams.defaultStorageLevel)
offsetChunk
}
}
edgeCountsByPrimaryKey.unpersist() // unpersist the edgeCountsByPrimaryKey
return (
partitionEdgeDf,
offsetDfArray,
Expand Down Expand Up @@ -216,6 +223,8 @@ class EdgeWriter(
validate()
writeVertexNum()

edgeDf.persist(GeneralParams.defaultStorageLevel)

// validate data and info
private def validate(): Unit = {
// chunk if edge info contains the adj list type
Expand Down Expand Up @@ -290,6 +299,7 @@ class EdgeWriter(
Some(chunkIndex),
None
)
offsetChunk.unpersist()
chunkIndex = chunkIndex + 1
}
}
Expand Down Expand Up @@ -366,4 +376,8 @@ class EdgeWriter(
writeAdjList()
writeEdgeProperties()
}

override def finalize(): Unit = {
edgeDfAndOffsetDf._1.unpersist()
}
}
12 changes: 12 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class VertexWriter(
numVertices: Long = -1
) {
private val spark = vertexDf.sparkSession
vertexDf.persist(
GeneralParams.defaultStorageLevel
) // cache the vertex DataFrame
validate()
private val vertexNum: Long =
if (numVertices < 0) vertexDf.count else numVertices
Expand All @@ -84,6 +87,8 @@ class VertexWriter(
vertexInfo.getChunk_size(),
vertexNum
)
vertexDf.unpersist() // unpersist the vertex DataFrame
chunks.persist(GeneralParams.defaultStorageLevel)

private def validate(): Unit = {
// check if vertex DataFrame contains the index_filed
Expand All @@ -104,6 +109,8 @@ class VertexWriter(
)
}

def getVertexNum(): Long = vertexNum

/**
* Generate chunks of the property group for vertex DataFrame.
*
Expand Down Expand Up @@ -146,4 +153,9 @@ class VertexWriter(
writeVertexProperties(property_group)
}
}

override def finalize(): Unit = {
chunks.unpersist()
}

}

0 comments on commit f2c8fc4

Please sign in to comment.