-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing #9983
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,21 +20,22 @@ package org.apache.spark.rdd | |
import java.io.IOException | ||
|
||
import scala.reflect.ClassTag | ||
import scala.util.control.NonFatal | ||
|
||
import org.apache.hadoop.fs.Path | ||
|
||
import org.apache.spark._ | ||
import org.apache.spark.broadcast.Broadcast | ||
import org.apache.spark.deploy.SparkHadoopUtil | ||
import org.apache.spark.util.{SerializableConfiguration, Utils} | ||
|
||
/** | ||
* An RDD that reads from checkpoint files previously written to reliable storage. | ||
*/ | ||
private[spark] class ReliableCheckpointRDD[T: ClassTag]( | ||
sc: SparkContext, | ||
val checkpointPath: String) | ||
extends CheckpointRDD[T](sc) { | ||
val checkpointPath: String, | ||
_partitioner: Option[Partitioner] = None | ||
) extends CheckpointRDD[T](sc) { | ||
|
||
@transient private val hadoopConf = sc.hadoopConfiguration | ||
@transient private val cpath = new Path(checkpointPath) | ||
|
@@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( | |
/** | ||
* Return the path of the checkpoint directory this RDD reads data from. | ||
*/ | ||
override def getCheckpointFile: Option[String] = Some(checkpointPath) | ||
override val getCheckpointFile: Option[String] = Some(checkpointPath) | ||
|
||
override val partitioner: Option[Partitioner] = { | ||
_partitioner.orElse { | ||
ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) | ||
} | ||
} | ||
|
||
/** | ||
* Return partitions described by the files in the checkpoint directory. | ||
|
@@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |
"part-%05d".format(partitionIndex) | ||
} | ||
|
||
private def checkpointPartitionerFileName(): String = { | ||
"_partitioner" | ||
} | ||
|
||
/** | ||
* Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. | ||
*/ | ||
def writeRDDToCheckpointDirectory[T: ClassTag]( | ||
originalRDD: RDD[T], | ||
checkpointDir: String, | ||
blockSize: Int = -1): ReliableCheckpointRDD[T] = { | ||
|
||
val sc = originalRDD.sparkContext | ||
|
||
// Create the output path for the checkpoint | ||
val checkpointDirPath = new Path(checkpointDir) | ||
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) | ||
if (!fs.mkdirs(checkpointDirPath)) { | ||
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") | ||
} | ||
|
||
// Save to file, and reload it as an RDD | ||
val broadcastedConf = sc.broadcast( | ||
new SerializableConfiguration(sc.hadoopConfiguration)) | ||
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) | ||
sc.runJob(originalRDD, | ||
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) | ||
|
||
if (originalRDD.partitioner.nonEmpty) { | ||
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) | ||
} | ||
|
||
val newRDD = new ReliableCheckpointRDD[T]( | ||
sc, checkpointDirPath.toString, originalRDD.partitioner) | ||
if (newRDD.partitions.length != originalRDD.partitions.length) { | ||
throw new SparkException( | ||
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + | ||
s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})") | ||
} | ||
newRDD | ||
} | ||
|
||
/** | ||
* Write this partition's values to a checkpoint file. | ||
* Write a RDD partition's data to a checkpoint file. | ||
*/ | ||
def writeCheckpointFile[T: ClassTag]( | ||
def writePartitionToCheckpointFile[T: ClassTag]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private |
||
path: String, | ||
broadcastedConf: Broadcast[SerializableConfiguration], | ||
blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { | ||
|
@@ -151,6 +200,67 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |
} | ||
} | ||
|
||
/** | ||
* Write a partitioner to the given RDD checkpoint directory. This is done on a best-effort | ||
* basis; any exception while writing the partitioner is caught, logged and ignored. | ||
*/ | ||
private def writePartitionerToCheckpointDir( | ||
sc: SparkContext, partitioner: Partitioner, checkpointDirPath: Path): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private, also style:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add java doc? (why do we need this) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, can you just call this |
||
try { | ||
val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) | ||
val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) | ||
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) | ||
val fileOutputStream = fs.create(partitionerFilePath, false, bufferSize) | ||
val serializer = SparkEnv.get.serializer.newInstance() | ||
val serializeStream = serializer.serializeStream(fileOutputStream) | ||
Utils.tryWithSafeFinally { | ||
serializeStream.writeObject(partitioner) | ||
} { | ||
serializeStream.close() | ||
} | ||
logDebug(s"Written partitioner to $partitionerFilePath") | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning(s"Error writing partitioner $partitioner to $checkpointDirPath") | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Read a partitioner from the given RDD checkpoint directory, if it exists. | ||
* This is done on a best-effort basis; any exception while reading the partitioner is | ||
* caught, logged and ignored. | ||
*/ | ||
private def readCheckpointedPartitionerFile( | ||
sc: SparkContext, | ||
checkpointDirPath: String): Option[Partitioner] = { | ||
try { | ||
val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) | ||
val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) | ||
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) | ||
if (fs.exists(partitionerFilePath)) { | ||
val fileInputStream = fs.open(partitionerFilePath, bufferSize) | ||
val serializer = SparkEnv.get.serializer.newInstance() | ||
val deserializeStream = serializer.deserializeStream(fileInputStream) | ||
val partitioner = Utils.tryWithSafeFinally[Partitioner] { | ||
deserializeStream.readObject[Partitioner] | ||
} { | ||
deserializeStream.close() | ||
} | ||
logDebug(s"Read partitioner from $partitionerFilePath") | ||
Some(partitioner) | ||
} else { | ||
logDebug("No partitioner file") | ||
None | ||
} | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning(s"Error reading partitioner from $checkpointDirPath, " + | ||
s"partitioner will not be recovered which may lead to performance loss", e) | ||
None | ||
} | ||
} | ||
|
||
/** | ||
* Read the content of the specified checkpoint file. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,25 +55,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v | |
* This is called immediately after the first action invoked on this RDD has completed. | ||
*/ | ||
protected override def doCheckpoint(): CheckpointRDD[T] = { | ||
|
||
// Create the output path for the checkpoint | ||
val path = new Path(cpDir) | ||
val fs = path.getFileSystem(rdd.context.hadoopConfiguration) | ||
if (!fs.mkdirs(path)) { | ||
throw new SparkException(s"Failed to create checkpoint path $cpDir") | ||
} | ||
|
||
// Save to file, and reload it as an RDD | ||
val broadcastedConf = rdd.context.broadcast( | ||
new SerializableConfiguration(rdd.context.hadoopConfiguration)) | ||
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) | ||
rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _) | ||
val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir) | ||
if (newRDD.partitions.length != rdd.partitions.length) { | ||
throw new SparkException( | ||
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + | ||
s"number of partitions from original RDD $rdd(${rdd.partitions.length})") | ||
} | ||
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All this code has been moved in the |
||
|
||
// Optionally clean our checkpoint files if the reference is out of scope | ||
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) { | ||
|
@@ -83,7 +65,6 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v | |
} | ||
|
||
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") | ||
|
||
newRDD | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make this a val