From f4045a7671b9a9cab1aa1bb684ea05589afe0f11 Mon Sep 17 00:00:00 2001 From: acezen Date: Fri, 19 Jan 2024 11:36:36 +0800 Subject: [PATCH] paralle write offset chunks to sequential --- .../main/scala/com/alibaba/graphar/writer/EdgeWriter.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index 76c230899..ccd74780b 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -293,7 +293,8 @@ class EdgeWriter( var chunkIndex: Int = 0 val fileType = edgeInfo.getAdjListFileType(adjListType) val outputPrefix = prefix + edgeInfo.getOffsetPathPrefix(adjListType) - val offsetChunks = edgeDfAndOffsetDf._2 + // parallel write offset chunks case error, convert to sequential + val offsetChunks = edgeDfAndOffsetDf._2.seq offsetChunks.foreach { case (i, offsetChunk) => FileSystem.writeDataFrame( offsetChunk, @@ -347,11 +348,11 @@ class EdgeWriter( val property = pIter.next() propertyList += "`" + property.getName() + "`" } - val propetyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*) + val propertyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*) val outputPrefix = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType) FileSystem.writeDataFrame( - propetyGroupDf, + propertyGroupDf, propertyGroup.getFile_type(), outputPrefix, None,