Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement][Spark] Add helper objects and methods for loading info classes from files #112

Merged
merged 2 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions docs/user-guide/spark-lib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,9 @@ To construct information from a Yaml file, please refer to the following example
.. code:: scala

// read graph yaml and construct information
val spark = ... // the Spark session
val file_path = ... // the path to the yaml file
val yaml_path = new Path(file_path)
val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = fs.open(yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo]
val graph_info = GraphInfo.loadGraphInfo(file_path, spark)

// use information classes
val vertices = graph_info.getVertices
Expand Down
14 changes: 14 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.alibaba.graphar

import java.io.{File, FileInputStream}
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{SparkSession}
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
Expand Down Expand Up @@ -425,3 +427,15 @@ class EdgeInfo() {
return str
}
}

/** Helper object to load edge info files */
object EdgeInfo {
/** Load a yaml file from path and construct a EdgeInfo from it. */
def loadEdgeInfo(edgeInfoPath: String, spark: SparkSession): EdgeInfo = {
val path = new Path(edgeInfoPath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
val input = fs.open(path)
val yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
return yaml.load(input).asInstanceOf[EdgeInfo]
}
}
22 changes: 22 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.alibaba.graphar

import java.io.{File, FileInputStream}
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{SparkSession}
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
Expand Down Expand Up @@ -202,3 +204,23 @@ class GraphInfo() {
@BeanProperty var edges = new java.util.ArrayList[String]()
@BeanProperty var version: String = ""
}

/** Helper object to load graph info files */
object GraphInfo {
/** Load a yaml file from path and construct a GraphInfo from it. */
def loadGraphInfo(graphInfoPath: String, spark: SparkSession): GraphInfo = {
val path = new Path(graphInfoPath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
val input = fs.open(path)
val yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = yaml.load(input).asInstanceOf[GraphInfo]
if (graph_info.getPrefix == "") {
val pos = graphInfoPath.lastIndexOf('/')
if (pos != -1) {
val prefix = graphInfoPath.substring(0, pos + 1) // +1 to include the slash
graph_info.setPrefix(prefix)
}
}
return graph_info
}
}
14 changes: 14 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.alibaba.graphar

import java.io.{File, FileInputStream}
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{SparkSession}
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
Expand Down Expand Up @@ -217,3 +219,15 @@ class VertexInfo() {
return prefix + str;
}
}

/** Helper object to load vertex info files */
object VertexInfo {
/** Load a yaml file from path and construct a VertexInfo from it. */
def loadVertexInfo(vertexInfoPath: String, spark: SparkSession): VertexInfo = {
val path = new Path(vertexInfoPath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
val input = fs.open(path)
val yaml = new Yaml(new Constructor(classOf[VertexInfo]))
return yaml.load(input).asInstanceOf[VertexInfo]
}
}
14 changes: 7 additions & 7 deletions spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
throw new IllegalArgumentException
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListOffsetFilePath(chunk_index, adjListType)
val file_path = prefix + edgeInfo.getAdjListOffsetFilePath(chunk_index, adjListType)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
return df
}
Expand All @@ -63,7 +63,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
def readAdjListChunk(vertex_chunk_index: Long, chunk_index: Long): DataFrame = {
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, chunk_index, adjListType)
val file_path = prefix + edgeInfo.getAdjListFilePath(vertex_chunk_index, chunk_index, adjListType)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
return df
}
Expand All @@ -77,7 +77,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
def readAdjListForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListPathPrefix(vertex_chunk_index, adjListType)
val file_path = prefix + edgeInfo.getAdjListPathPrefix(vertex_chunk_index, adjListType)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
Expand All @@ -94,7 +94,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
def readAllAdjList(addIndex: Boolean = true): DataFrame = {
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListPathPrefix(adjListType)
val file_path = prefix + edgeInfo.getAdjListPathPrefix(adjListType)
val df = spark.read.option("fileFormat", file_type).option("header", "true").option("recursiveFileLookup", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
Expand All @@ -115,7 +115,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
val file_path = prefix + "/" + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index, chunk_index)
val file_path = prefix + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index, chunk_index)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
return df
}
Expand All @@ -132,7 +132,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
val file_path = prefix + "/" + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType, vertex_chunk_index)
val file_path = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType, vertex_chunk_index)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
Expand All @@ -152,7 +152,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
val file_path = prefix + "/" + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType)
val file_path = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType)
val df = spark.read.option("fileFormat", file_type).option("header", "true").option("recursiveFileLookup", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession)
if (vertexInfo.containPropertyGroup(propertyGroup) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
val file_path = prefix + "/" + vertexInfo.getFilePath(propertyGroup, chunk_index)
val file_path = prefix + vertexInfo.getFilePath(propertyGroup, chunk_index)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
return df
}
Expand All @@ -66,7 +66,7 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession)
if (vertexInfo.containPropertyGroup(propertyGroup) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
val file_path = prefix + "/" + vertexInfo.getPathPrefix(propertyGroup)
val file_path = prefix + vertexInfo.getPathPrefix(propertyGroup)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)

if (addIndex) {
Expand Down
12 changes: 5 additions & 7 deletions spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ class ComputeExampleSuite extends AnyFunSuite {

test("run cc using graphx") {
// read vertex dataframe
val file_path = "gar-test/ldbc_sample/parquet"
val file_path = "gar-test/ldbc_sample/parquet/"
val prefix = getClass.getClassLoader.getResource(file_path).getPath
val vertex_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml")
val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
val vertex_yaml = getClass.getClassLoader.getResource(file_path + "person.vertex.yml").getPath
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)

val vertex_reader = new VertexReader(prefix, vertex_info, spark)
val vertices_num = vertex_reader.readVerticesNumber()
Expand All @@ -48,9 +47,8 @@ class ComputeExampleSuite extends AnyFunSuite {
assert(vertex_df.count() == vertices_num)

// read edge dataframe
val edge_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person_knows_person.edge.yml")
val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
val edge_yaml = getClass.getClassLoader.getResource(file_path + "person_knows_person.edge.yml").getPath
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark)
val adj_list_type = AdjListType.ordered_by_source

val edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)
Expand Down
24 changes: 8 additions & 16 deletions spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ class GraphInfoSuite extends AnyFunSuite {

test("load graph info") {
// read graph yaml
val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = fs.open(yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo]
val yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath
val prefix = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/").getPath
val graph_info = GraphInfo.loadGraphInfo(yaml_path, spark)

assert(graph_info.getName == "ldbc_sample")
assert(graph_info.getPrefix == "" )
assert(graph_info.getPrefix == prefix )
assert(graph_info.getVertices.size() == 1)
val vertices = new java.util.ArrayList[String]
vertices.add("person.vertex.yml")
Expand All @@ -50,11 +48,8 @@ class GraphInfoSuite extends AnyFunSuite {
}

test("load vertex info") {
val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = yaml.load(input).asInstanceOf[VertexInfo]
val yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath
val vertex_info = VertexInfo.loadVertexInfo(yaml_path, spark)

assert(vertex_info.getLabel == "person")
assert(vertex_info.isValidated)
Expand Down Expand Up @@ -99,11 +94,8 @@ class GraphInfoSuite extends AnyFunSuite {
}

test("load edge info") {
val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = yaml.load(input).asInstanceOf[EdgeInfo]
val yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath
val edge_info = EdgeInfo.loadEdgeInfo(yaml_path, spark)

assert(edge_info.getSrc_label == "person")
assert(edge_info.getDst_label == "person")
Expand Down
26 changes: 12 additions & 14 deletions spark/src/test/scala/com/alibaba/graphar/TestReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,27 @@ class ReaderSuite extends AnyFunSuite {

test("read chunk files directly") {
// read vertex chunk files in Parquet
val parquet_file_path = "gar-test/ldbc_sample/parquet"
val parquet_file_path = "gar-test/ldbc_sample/parquet/"
val parquet_prefix = getClass.getClassLoader.getResource(parquet_file_path).getPath
val parqeut_read_path = parquet_prefix + "/vertex/person/id"
val parqeut_read_path = parquet_prefix + "vertex/person/id"
val df1 = spark.read.option("fileFormat", "parquet").format("com.alibaba.graphar.datasources.GarDataSource").load(parqeut_read_path)
// validate reading results
assert(df1.rdd.getNumPartitions == 10)
assert(df1.count() == 903)
// println(df1.rdd.collect().mkString("\n"))

// read vertex chunk files in Orc
val orc_file_path = "gar-test/ldbc_sample/orc"
val orc_file_path = "gar-test/ldbc_sample/orc/"
val orc_prefix = getClass.getClassLoader.getResource(orc_file_path).getPath
val orc_read_path = orc_prefix + "/vertex/person/id"
val orc_read_path = orc_prefix + "vertex/person/id"
val df2 = spark.read.option("fileFormat", "orc").format("com.alibaba.graphar.datasources.GarDataSource").load(orc_read_path)
// validate reading results
assert(df2.rdd.collect().deep == df1.rdd.collect().deep)

// read adjList chunk files recursively in CSV
val csv_file_path = "gar-test/ldbc_sample/csv"
val csv_file_path = "gar-test/ldbc_sample/csv/"
val csv_prefix = getClass.getClassLoader.getResource(csv_file_path).getPath
val csv_read_path = csv_prefix + "/edge/person_knows_person/ordered_by_source/adj_list"
val csv_read_path = csv_prefix + "edge/person_knows_person/ordered_by_source/adj_list"
val df3 = spark.read.option("fileFormat", "csv").option("header", "true").option("recursiveFileLookup", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(csv_read_path)
// validate reading results
assert(df3.rdd.getNumPartitions == 11)
Expand All @@ -65,11 +65,10 @@ class ReaderSuite extends AnyFunSuite {

test("read vertex chunks") {
// construct the vertex information
val file_path = "gar-test/ldbc_sample/csv"
val file_path = "gar-test/ldbc_sample/csv/"
val prefix = getClass.getClassLoader.getResource(file_path).getPath
val vertex_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml")
val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
val vertex_yaml = getClass.getClassLoader.getResource(file_path + "person.vertex.yml").getPath
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark)

// construct the vertex reader
val reader = new VertexReader(prefix, vertex_info, spark)
Expand Down Expand Up @@ -115,11 +114,10 @@ class ReaderSuite extends AnyFunSuite {

test("read edge chunks") {
// construct the edge information
val file_path = "gar-test/ldbc_sample/csv"
val file_path = "gar-test/ldbc_sample/csv/"
val prefix = getClass.getClassLoader.getResource(file_path).getPath
val edge_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person_knows_person.edge.yml")
val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
val edge_yaml = getClass.getClassLoader.getResource(file_path + "person_knows_person.edge.yml").getPath
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark)

// construct the edge reader
val adj_list_type = AdjListType.ordered_by_source
Expand Down
Loading