Skip to content

Commit

Permalink
Merge branch 'ESPARK-135' into 'spark_2.1'
Browse files Browse the repository at this point in the history
[ESPARK-135] 解决当不需要合并的时候仍然更换临时目录,导致最终结束为空的问题

解决当不需要合并的时候仍然更换临时目录,导致最终结束为空的问题 . 
resolve apache#135

See merge request !82
  • Loading branch information
cenyuhai committed Oct 28, 2017
2 parents f051b54 + ac81043 commit 5abc232
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ case class InsertIntoHiveTable(
specFiles.foreach(f => fs.delete(f.getPath))
}
}
FileOutputFormat.setOutputPath(conf.value, tmpMergeLocation)
} else {
val numFiles = MergeUtils.getTargetFileNum(path, conf.value,
avgConditionSize, targetFileSize)
Expand All @@ -290,12 +291,9 @@ case class InsertIntoHiveTable(
.filter(!_.getPath.getName.startsWith("part"))
specFiles.foreach(f => fs.delete(f.getPath))
}
if (conf.value.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
fs.createNewFile(new Path(tmpMergeLocationDir + "/_SUCCESS"))
}
FileOutputFormat.setOutputPath(conf.value, tmpMergeLocation)
}
}
FileOutputFormat.setOutputPath(conf.value, tmpMergeLocation)
}

private def saveAsHiveFile(
Expand Down Expand Up @@ -440,11 +438,11 @@ case class InsertIntoHiveTable(
case ex: Exception =>
logInfo("Merge file of " + tmpLocation + " failed!", ex)
fileSinkConf.dir = tmpLocation.toString
FileOutputFormat.setOutputPath(jobConf, tmpLocation)
if (!rollbackPathList.isEmpty) {
rollbackPathList.asScala.foreach { path =>
val srcPath = path.replace("-ext-10000", MergeUtils.TEMP_DIR)
logInfo("rename [" + srcPath + " to "
+ path + "]")
logInfo("rename [" + srcPath + " to " + path + "]")
fs.rename(new Path(srcPath), new Path(path))
}
}
Expand Down

0 comments on commit 5abc232

Please sign in to comment.