Skip to content

Commit

Permalink
[SPARK-24552][CORE][SQL][BRANCH-2.3] Use unique id instead of attempt…
Browse files Browse the repository at this point in the history
… number for writes .

This passes a unique attempt id instead of attempt number to v2
data sources and hadoop APIs, because attempt number is reused
when stages are retried. When attempt numbers are reused, sources
that track data by partition id and attempt number may incorrectly
clean up data because the same attempt number can be both committed
and aborted.

Author: Marcelo Vanzin <[email protected]>

Closes #21615 from vanzin/SPARK-24552-2.3.
  • Loading branch information
Marcelo Vanzin committed Jun 25, 2018
1 parent a1e9640 commit db538b2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging {
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber

executeTask(
context = context,
config = config,
jobTrackerId = jobTrackerId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
sparkAttemptNumber = context.attemptNumber,
sparkAttemptNumber = attemptId,
committer = committer,
iterator = iter)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
val dataWriter = writeTask.createDataWriter(context.partitionId(), attemptId)

// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
Expand Down

0 comments on commit db538b2

Please sign in to comment.