From 38749034c72fa5ae55ef1e8e5aae05cba7eab608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Mon, 27 Feb 2023 17:58:49 +0800 Subject: [PATCH 1/2] Add helper objects for info classes --- .../scala/com/alibaba/graphar/EdgeInfo.scala | 14 ++++++++++ .../scala/com/alibaba/graphar/GraphInfo.scala | 22 ++++++++++++++++ .../com/alibaba/graphar/VertexInfo.scala | 14 ++++++++++ .../alibaba/graphar/reader/EdgeReader.scala | 14 +++++----- .../alibaba/graphar/reader/VertexReader.scala | 4 +-- .../com/alibaba/graphar/ComputeExample.scala | 12 ++++----- .../com/alibaba/graphar/TestGraphInfo.scala | 24 ++++++----------- .../com/alibaba/graphar/TestReader.scala | 26 +++++++++---------- .../com/alibaba/graphar/TestWriter.scala | 24 ++++++----------- .../alibaba/graphar/TransformExample.scala | 26 ++++++++----------- 10 files changed, 103 insertions(+), 77 deletions(-) diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala index 20c485302..5ba1e82d9 100644 --- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala @@ -16,6 +16,8 @@ package com.alibaba.graphar import java.io.{File, FileInputStream} +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{SparkSession} import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import scala.beans.BeanProperty @@ -425,3 +427,15 @@ class EdgeInfo() { return str } } + +/** Helper object to load edge info files */ +object EdgeInfo { + /** Load a yaml file from path and construct a EdgeInfo from it. */ + def loadEdgeInfo(edgeInfoPath: String, spark: SparkSession): EdgeInfo = { + val path = new Path(edgeInfoPath) + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + val input = fs.open(path) + val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) + return yaml.load(input).asInstanceOf[EdgeInfo] + } +} diff --git a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala index f251ebabe..86fccd0a1 100644 --- a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala @@ -16,6 +16,8 @@ package com.alibaba.graphar import java.io.{File, FileInputStream} +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{SparkSession} import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import scala.beans.BeanProperty @@ -202,3 +204,23 @@ class GraphInfo() { @BeanProperty var edges = new java.util.ArrayList[String]() @BeanProperty var version: String = "" } + +/** Helper object to load graph info files */ +object GraphInfo { + /** Load a yaml file from path and construct a GraphInfo from it. */ + def loadGraphInfo(graphInfoPath: String, spark: SparkSession): GraphInfo = { + val path = new Path(graphInfoPath) + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + val input = fs.open(path) + val yaml = new Yaml(new Constructor(classOf[GraphInfo])) + val graph_info = yaml.load(input).asInstanceOf[GraphInfo] + if (graph_info.getPrefix == "") { + val pos = graphInfoPath.lastIndexOf('/') + if (pos != -1) { + val prefix = graphInfoPath.substring(0, pos + 1) // +1 to include the slash + graph_info.setPrefix(prefix) + } + } + return graph_info + } +} diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala index e86208fc4..2da6488c4 100644 --- a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala +++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala @@ -16,6 +16,8 @@ package com.alibaba.graphar import java.io.{File, FileInputStream} +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{SparkSession} import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import scala.beans.BeanProperty @@ -217,3 +219,15 @@ class VertexInfo() { return prefix + str; } } + +/** Helper object to load vertex info files */ +object VertexInfo { + /** Load a yaml file from path and construct a VertexInfo from it. */ + def loadVertexInfo(vertexInfoPath: String, spark: SparkSession): VertexInfo = { + val path = new Path(vertexInfoPath) + val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration) + val input = fs.open(path) + val yaml = new Yaml(new Constructor(classOf[VertexInfo])) + return yaml.load(input).asInstanceOf[VertexInfo] + } +} diff --git a/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala b/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala index e9ed1fcdd..d8413a484 100644 --- a/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala +++ b/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala @@ -49,7 +49,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V throw new IllegalArgumentException val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType) val file_type = FileType.FileTypeToString(file_type_in_gar) - val file_path = prefix + "/" + edgeInfo.getAdjListOffsetFilePath(chunk_index, adjListType) + val file_path = prefix + edgeInfo.getAdjListOffsetFilePath(chunk_index, adjListType) val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) return df } @@ -63,7 +63,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V def readAdjListChunk(vertex_chunk_index: Long, chunk_index: Long): DataFrame = { val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType) val file_type = FileType.FileTypeToString(file_type_in_gar) - val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, chunk_index, adjListType) + val file_path = prefix + edgeInfo.getAdjListFilePath(vertex_chunk_index, chunk_index, adjListType) val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) return df } @@ -77,7 +77,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V def readAdjListForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = { val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType) val file_type = FileType.FileTypeToString(file_type_in_gar) - val file_path = prefix + "/" + edgeInfo.getAdjListPathPrefix(vertex_chunk_index, adjListType) + val file_path = prefix + edgeInfo.getAdjListPathPrefix(vertex_chunk_index, adjListType) val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) if (addIndex) { return IndexGenerator.generateEdgeIndexColumn(df) @@ -94,7 +94,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V def readAllAdjList(addIndex: Boolean = true): DataFrame = { val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType) val file_type = FileType.FileTypeToString(file_type_in_gar) - val file_path = prefix + "/" + edgeInfo.getAdjListPathPrefix(adjListType) + val file_path = prefix + edgeInfo.getAdjListPathPrefix(adjListType) val df = spark.read.option("fileFormat", file_type).option("header", "true").option("recursiveFileLookup", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) if (addIndex) { return IndexGenerator.generateEdgeIndexColumn(df) @@ -115,7 +115,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) throw new IllegalArgumentException val file_type = propertyGroup.getFile_type() - val file_path = prefix + "/" + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index, chunk_index) + val file_path = prefix + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index, chunk_index) val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) return df } @@ -132,7 +132,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) throw new IllegalArgumentException val file_type = propertyGroup.getFile_type() - val file_path = prefix + "/" + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType, vertex_chunk_index) + val file_path = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType, vertex_chunk_index) val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) if (addIndex) { return IndexGenerator.generateEdgeIndexColumn(df) @@ -152,7 +152,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) throw new IllegalArgumentException val file_type = propertyGroup.getFile_type() - val file_path = prefix + "/" + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType) + val file_path = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType) val df = spark.read.option("fileFormat", file_type).option("header", "true").option("recursiveFileLookup", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) if (addIndex) { return IndexGenerator.generateEdgeIndexColumn(df) diff --git a/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala index 32b2325e0..0ac4d4858 100644 --- a/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala +++ b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala @@ -50,7 +50,7 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) if (vertexInfo.containPropertyGroup(propertyGroup) == false) throw new IllegalArgumentException val file_type = propertyGroup.getFile_type() - val file_path = prefix + "/" + vertexInfo.getFilePath(propertyGroup, chunk_index) + val file_path = prefix + vertexInfo.getFilePath(propertyGroup, chunk_index) val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) return df } @@ -66,7 +66,7 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) if (vertexInfo.containPropertyGroup(propertyGroup) == false) throw new IllegalArgumentException val file_type = propertyGroup.getFile_type() - val file_path = prefix + "/" + vertexInfo.getPathPrefix(propertyGroup) + val file_path = prefix + vertexInfo.getPathPrefix(propertyGroup) val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path) if (addIndex) { diff --git a/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala b/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala index 1a761e90a..dc1a233b2 100644 --- a/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala +++ b/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala @@ -34,11 +34,10 @@ class ComputeExampleSuite extends AnyFunSuite { test("run cc using graphx") { // read vertex dataframe - val file_path = "gar-test/ldbc_sample/parquet" + val file_path = "gar-test/ldbc_sample/parquet/" val prefix = getClass.getClassLoader.getResource(file_path).getPath - val vertex_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml") - val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] + val vertex_yaml = getClass.getClassLoader.getResource(file_path + "person.vertex.yml").getPath + val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark) val vertex_reader = new VertexReader(prefix, vertex_info, spark) val vertices_num = vertex_reader.readVerticesNumber() @@ -48,9 +47,8 @@ class ComputeExampleSuite extends AnyFunSuite { assert(vertex_df.count() == vertices_num) // read edge dataframe - val edge_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person_knows_person.edge.yml") - val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) - val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] + val edge_yaml = getClass.getClassLoader.getResource(file_path + "person_knows_person.edge.yml").getPath + val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark) val adj_list_type = AdjListType.ordered_by_source val edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark) diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index 029a81669..0c33eb319 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -30,14 +30,12 @@ class GraphInfoSuite extends AnyFunSuite { test("load graph info") { // read graph yaml - val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) - val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) - val input = fs.open(yaml_path) - val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) - val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo] + val yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath + val prefix = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/").getPath + val graph_info = GraphInfo.loadGraphInfo(yaml_path, spark) assert(graph_info.getName == "ldbc_sample") - assert(graph_info.getPrefix == "" ) + assert(graph_info.getPrefix == prefix ) assert(graph_info.getVertices.size() == 1) val vertices = new java.util.ArrayList[String] vertices.add("person.vertex.yml") @@ -50,11 +48,8 @@ class GraphInfoSuite extends AnyFunSuite { } test("load vertex info") { - val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath) - val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) - val input = fs.open(yaml_path) - val yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = yaml.load(input).asInstanceOf[VertexInfo] + val yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath + val vertex_info = VertexInfo.loadVertexInfo(yaml_path, spark) assert(vertex_info.getLabel == "person") assert(vertex_info.isValidated) @@ -99,11 +94,8 @@ class GraphInfoSuite extends AnyFunSuite { } test("load edge info") { - val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) - val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) - val input = fs.open(yaml_path) - val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) - val edge_info = yaml.load(input).asInstanceOf[EdgeInfo] + val yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath + val edge_info = EdgeInfo.loadEdgeInfo(yaml_path, spark) assert(edge_info.getSrc_label == "person") assert(edge_info.getDst_label == "person") diff --git a/spark/src/test/scala/com/alibaba/graphar/TestReader.scala b/spark/src/test/scala/com/alibaba/graphar/TestReader.scala index 87ca0e38e..e058a26aa 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestReader.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestReader.scala @@ -33,9 +33,9 @@ class ReaderSuite extends AnyFunSuite { test("read chunk files directly") { // read vertex chunk files in Parquet - val parquet_file_path = "gar-test/ldbc_sample/parquet" + val parquet_file_path = "gar-test/ldbc_sample/parquet/" val parquet_prefix = getClass.getClassLoader.getResource(parquet_file_path).getPath - val parqeut_read_path = parquet_prefix + "/vertex/person/id" + val parqeut_read_path = parquet_prefix + "vertex/person/id" val df1 = spark.read.option("fileFormat", "parquet").format("com.alibaba.graphar.datasources.GarDataSource").load(parqeut_read_path) // validate reading results assert(df1.rdd.getNumPartitions == 10) @@ -43,17 +43,17 @@ class ReaderSuite extends AnyFunSuite { // println(df1.rdd.collect().mkString("\n")) // read vertex chunk files in Orc - val orc_file_path = "gar-test/ldbc_sample/orc" + val orc_file_path = "gar-test/ldbc_sample/orc/" val orc_prefix = getClass.getClassLoader.getResource(orc_file_path).getPath - val orc_read_path = orc_prefix + "/vertex/person/id" + val orc_read_path = orc_prefix + "vertex/person/id" val df2 = spark.read.option("fileFormat", "orc").format("com.alibaba.graphar.datasources.GarDataSource").load(orc_read_path) // validate reading results assert(df2.rdd.collect().deep == df1.rdd.collect().deep) // read adjList chunk files recursively in CSV - val csv_file_path = "gar-test/ldbc_sample/csv" + val csv_file_path = "gar-test/ldbc_sample/csv/" val csv_prefix = getClass.getClassLoader.getResource(csv_file_path).getPath - val csv_read_path = csv_prefix + "/edge/person_knows_person/ordered_by_source/adj_list" + val csv_read_path = csv_prefix + "edge/person_knows_person/ordered_by_source/adj_list" val df3 = spark.read.option("fileFormat", "csv").option("header", "true").option("recursiveFileLookup", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(csv_read_path) // validate reading results assert(df3.rdd.getNumPartitions == 11) @@ -65,11 +65,10 @@ class ReaderSuite extends AnyFunSuite { test("read vertex chunks") { // construct the vertex information - val file_path = "gar-test/ldbc_sample/csv" + val file_path = "gar-test/ldbc_sample/csv/" val prefix = getClass.getClassLoader.getResource(file_path).getPath - val vertex_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml") - val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] + val vertex_yaml = getClass.getClassLoader.getResource(file_path + "person.vertex.yml").getPath + val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark) // construct the vertex reader val reader = new VertexReader(prefix, vertex_info, spark) @@ -115,11 +114,10 @@ class ReaderSuite extends AnyFunSuite { test("read edge chunks") { // construct the edge information - val file_path = "gar-test/ldbc_sample/csv" + val file_path = "gar-test/ldbc_sample/csv/" val prefix = getClass.getClassLoader.getResource(file_path).getPath - val edge_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person_knows_person.edge.yml") - val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) - val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] + val edge_yaml = getClass.getClassLoader.getResource(file_path + "person_knows_person.edge.yml").getPath + val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark) // construct the edge reader val adj_list_type = AdjListType.ordered_by_source diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index da0b75380..982dcbe1c 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -39,10 +39,8 @@ class WriterSuite extends AnyFunSuite { val fs = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration) // read vertex yaml - val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath) - val vertex_input = fs.open(vertex_yaml_path) - val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] + val vertex_yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath + val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, spark) // generate vertex index column for vertex dataframe val vertex_df_with_index = utils.IndexGenerator.generateVertexIndexColumn(vertex_df) @@ -82,10 +80,8 @@ class WriterSuite extends AnyFunSuite { val fs = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration) // read edge yaml - val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) - val edge_input = fs.open(edge_yaml_path) - val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) - val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] + val edge_yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath + val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, spark) val adj_list_type = AdjListType.ordered_by_source // generate vertex index for edge dataframe @@ -147,16 +143,12 @@ class WriterSuite extends AnyFunSuite { val adj_list_type = AdjListType.ordered_by_source // read vertex yaml - val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath) - val vertex_input = fs.open(vertex_yaml_path) - val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] + val vertex_yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath + val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, spark) // read edge yaml - val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) - val edge_input = fs.open(edge_yaml_path) - val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) - val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] + val edge_yaml_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath + val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, spark) // construct person vertex mapping with dataframe val vertex_mapping = utils.IndexGenerator.constructVertexIndexMapping(vertex_df, vertex_info.getPrimaryKey()) diff --git a/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala b/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala index 8b0b66089..5edf334b4 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala @@ -35,11 +35,10 @@ class TransformExampleSuite extends AnyFunSuite { test("tranform file type") { // read from orc files - val file_path = "gar-test/ldbc_sample/orc" + val file_path = "gar-test/ldbc_sample/orc/" val prefix = getClass.getClassLoader.getResource(file_path).getPath - val vertex_file = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml") - val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = vertex_yaml.load(vertex_file).asInstanceOf[VertexInfo] + val vertex_yaml = getClass.getClassLoader.getResource(file_path + "person.vertex.yml").getPath + val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark) val reader = new VertexReader(prefix, vertex_info, spark) val vertices_num = reader.readVerticesNumber() @@ -47,11 +46,10 @@ class TransformExampleSuite extends AnyFunSuite { assert(vertex_df_with_index.count() == vertices_num) // write to parquet files - val output_file_path = "gar-test/ldbc_sample/parquet" + val output_file_path = "gar-test/ldbc_sample/parquet/" val output_prefix : String = "/tmp/example/" - val output_vertex_file = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml") - val output_vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val output_vertex_info = output_vertex_yaml.load(output_vertex_file).asInstanceOf[VertexInfo] + val output_vertex_yaml = getClass.getClassLoader.getResource(output_file_path + "person.vertex.yml").getPath + val output_vertex_info = VertexInfo.loadVertexInfo(output_vertex_yaml, spark) val writer = new VertexWriter(output_prefix, output_vertex_info, vertex_df_with_index) writer.writeVertexProperties() @@ -66,19 +64,17 @@ class TransformExampleSuite extends AnyFunSuite { } test("tranform adjList type") { - val file_path = "gar-test/ldbc_sample/parquet" + val file_path = "gar-test/ldbc_sample/parquet/" val prefix = getClass.getClassLoader.getResource(file_path).getPath // get vertex num - val vertex_file = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml") - val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) - val vertex_info = vertex_yaml.load(vertex_file).asInstanceOf[VertexInfo] + val vertex_yaml = getClass.getClassLoader.getResource(file_path + "person.vertex.yml").getPath + val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml, spark) // construct the vertex reader val vreader = new VertexReader(prefix, vertex_info, spark) val vertexNum = vreader.readVerticesNumber() // read edges of unordered_by_source type - val edge_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person_knows_person.edge.yml") - val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) - val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] + val edge_yaml = getClass.getClassLoader.getResource(file_path + "person_knows_person.edge.yml").getPath + val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml, spark) val adj_list_type = AdjListType.unordered_by_source val reader = new EdgeReader(prefix, edge_info, adj_list_type, spark) From e0273455d49a5fab55fc74049ad94d05ddd9752b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Mon, 27 Feb 2023 18:03:50 +0800 Subject: [PATCH 2/2] Update documentation --- docs/user-guide/spark-lib.rst | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/docs/user-guide/spark-lib.rst b/docs/user-guide/spark-lib.rst index b96bb6644..c5dd15441 100644 --- a/docs/user-guide/spark-lib.rst +++ b/docs/user-guide/spark-lib.rst @@ -53,12 +53,9 @@ To construct information from a Yaml file, please refer to the following example .. code:: scala // read graph yaml and construct information + val spark = ... // the Spark session val file_path = ... // the path to the yaml file - val yaml_path = new Path(file_path) - val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) - val input = fs.open(yaml_path) - val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) - val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo] + val graph_info = GraphInfo.loadGraphInfo(file_path, spark) // use information classes val vertices = graph_info.getVertices