diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index c21509d57847d..aaef7c74eea33 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -41,7 +41,8 @@ private[spark] class WholeTextFileInputFormat split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - val reader = new ConfigurableCombineFileRecordReader(split, context) + val reader = + new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader]) reader.setConf(getConf) reader } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 89e85abb60d13..1b1131b9b8831 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -94,21 +94,23 @@ private[spark] class WholeTextFileRecordReader( /** * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]] - * that could pass Hadoop configuration to WholeTextFileRecordReader. + * that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]] + * RecordReaders. */ -private[spark] class ConfigurableCombineFileRecordReader( +private[spark] class ConfigurableCombineFileRecordReader[K, V]( split: InputSplit, - context: TaskAttemptContext) - extends CombineFileRecordReader[String, String]( + context: TaskAttemptContext, + recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable]) + extends CombineFileRecordReader[K, V]( split.asInstanceOf[CombineFileSplit], context, - classOf[WholeTextFileRecordReader] + recordReaderClass ) with Configurable { override def initNextRecordReader(): Boolean = { val r = super.initNextRecordReader() if (r) { - this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(getConf) + this.curReader.asInstanceOf[HConfigurable].setConf(getConf) } r }