Skip to content

Commit

Permalink
[SPARK-49991][SQL] Make HadoopMapReduceCommitProtocol respect 'mapred…
Browse files Browse the repository at this point in the history
…uce.output.basename' to generate file names
  • Loading branch information
yaooqinn committed Oct 16, 2024
1 parent 861b5e9 commit 2c9909e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class HadoopMapReduceCommitProtocol(
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// the file name is fine and won't overflow.
val split = taskContext.getTaskAttemptID.getTaskID.getId
f"${spec.prefix}part-$split%05d-$jobId${spec.suffix}"
val basename = taskContext.getConfiguration.get("mapreduce.output.basename", "part")
f"${spec.prefix}$basename-$split%05d-$jobId${spec.suffix}"
}

override def setupJob(jobContext: JobContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}
}

test("basename") {
withTempPath { dir =>
withSQLConf("mapreduce.output.basename" -> "apachespark") {
spark.range(1).coalesce(1).write.parquet(dir.getCanonicalPath)
val df = spark.read.parquet(dir.getCanonicalPath)
assert(df.inputFiles.head.contains("apachespark"))
checkAnswer(spark.read.parquet(dir.getCanonicalPath), Row(0))
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down

0 comments on commit 2c9909e

Please sign in to comment.