Skip to content

Commit

Permalink
paralle write offset chunks to sequential
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Jan 19, 2024
1 parent 1b2b6d3 commit f4045a7
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ class EdgeWriter(
var chunkIndex: Int = 0
val fileType = edgeInfo.getAdjListFileType(adjListType)
val outputPrefix = prefix + edgeInfo.getOffsetPathPrefix(adjListType)
val offsetChunks = edgeDfAndOffsetDf._2
// parallel write offset chunks case error, convert to sequential
val offsetChunks = edgeDfAndOffsetDf._2.seq
offsetChunks.foreach { case (i, offsetChunk) =>
FileSystem.writeDataFrame(
offsetChunk,
Expand Down Expand Up @@ -347,11 +348,11 @@ class EdgeWriter(
val property = pIter.next()
propertyList += "`" + property.getName() + "`"
}
val propetyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*)
val propertyGroupDf = edgeDfAndOffsetDf._1.select(propertyList.map(col): _*)
val outputPrefix =
prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType)
FileSystem.writeDataFrame(
propetyGroupDf,
propertyGroupDf,
propertyGroup.getFile_type(),
outputPrefix,
None,
Expand Down

0 comments on commit f4045a7

Please sign in to comment.