Skip to content

Commit

Permalink
apache#118 s3 file read retry (apache#128)
Browse files Browse the repository at this point in the history
Co-authored-by: Yu Gan <[email protected]>
  • Loading branch information
2 people authored and zheniantoushipashi committed Jul 11, 2020
1 parent 941ffcb commit 85cf38e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ protected void initialize(String path, List<String> columns) throws IOException
config.set("spark.sql.parquet.int96AsTimestamp", "false");

this.file = new Path(path);
// read retry before getFileStatus
this.file.getFileSystem(config).open(this.file).close();
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
ParquetMetadata footer = readFooter(config, file, range(0, length));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,113 +352,124 @@ class ParquetFileFormat

(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)

val fileSplit =
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
val filePath = fileSplit.getPath

val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
fileSplit.getStart,
fileSplit.getStart + fileSplit.getLength,
fileSplit.getLength,
fileSplit.getLocations,
null)

val sharedConf = broadcastedHadoopConf.value.value

lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(parquetSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}

// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")

val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
try {
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
fileSplit.getStart,
fileSplit.getStart + fileSplit.getLength,
fileSplit.getLength,
fileSplit.getLocations,
null)

val sharedConf = broadcastedHadoopConf.value.value

logInfo(s"READ_RETRY before footerFileMetaData $filePath")
filePath.getFileSystem(sharedConf).open(filePath).close()
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates.
// Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(parquetSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}

val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// PARQUET_INT96_TIMESTAMP_CONVERSION
// says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
// writer here for this file. We have to do this per-file, as each file in the table may
// have different writers.
// Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads.
def isCreatedByParquetMr: Boolean =
footerFileMetaData.getCreatedBy().startsWith("parquet-mr")

val convertTz =
if (timestampConversion && !isCreatedByParquetMr) {
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
} else {
None
}

// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
val iter = if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)

// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns UnsafeRow
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records.
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
if (partitionSchema.length == 0) {
// There is no partition columns
val taskContext = Option(TaskContext.get())
val iter = if (enableVectorizedReader) {
logInfo(s"FILE_SYSTEM_CHECK VectorizedParquetRecordReader $filePath")
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}

// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
logInfo(s"FILE_SYSTEM_CHECK ParquetRecordReader $filePath")
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns UnsafeRow
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)

val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val joinedRow = new JoinedRow()
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

// This is a horrible erasure hack...
// if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
if (partitionSchema.length == 0) {
// There is no partition columns
iter.asInstanceOf[Iterator[InternalRow]]
} else {
iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
}
}
if (taskContext.isDefined) {
val metrics = taskContext.get.taskMetrics()
metrics.setAdditionalMetric("Parquet Metric:" + ParquetMetrics.get().toString)
}
iter
} catch {
case e: IOException =>
logError(s"RECORD_ERROR_STACK_INFO $filePath", e)
throw e
}
if (taskContext.isDefined) {
val metrics = taskContext.get.taskMetrics()
metrics.setAdditionalMetric("Parquet Metric:" + ParquetMetrics.get().toString)
}
iter
}
}

Expand Down

0 comments on commit 85cf38e

Please sign in to comment.