From dc3d21a5bedbb8d42a3584159216e2b6d5bf9330 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 15 Dec 2014 23:34:46 -0800 Subject: [PATCH] More genericization in ConfigurableCombineFileRecordReader. --- .../spark/input/WholeTextFileInputFormat.scala | 3 ++- .../spark/input/WholeTextFileRecordReader.scala | 14 ++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index c21509d57847d..aaef7c74eea33 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -41,7 +41,8 @@ private[spark] class WholeTextFileInputFormat split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - val reader = new ConfigurableCombineFileRecordReader(split, context) + val reader = + new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader]) reader.setConf(getConf) reader } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 89e85abb60d13..1b1131b9b8831 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -94,21 +94,23 @@ private[spark] class WholeTextFileRecordReader( /** * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]] - * that could pass Hadoop configuration to WholeTextFileRecordReader. + * that can pass Hadoop Configuration to [[org.apache.hadoop.conf.Configurable Configurable]] + * RecordReaders. */ -private[spark] class ConfigurableCombineFileRecordReader( +private[spark] class ConfigurableCombineFileRecordReader[K, V]( split: InputSplit, - context: TaskAttemptContext) - extends CombineFileRecordReader[String, String]( + context: TaskAttemptContext, + recordReaderClass: Class[_ <: RecordReader[K, V] with HConfigurable]) + extends CombineFileRecordReader[K, V]( split.asInstanceOf[CombineFileSplit], context, - classOf[WholeTextFileRecordReader] + recordReaderClass ) with Configurable { override def initNextRecordReader(): Boolean = { val r = super.initNextRecordReader() if (r) { - this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(getConf) + this.curReader.asInstanceOf[HConfigurable].setConf(getConf) } r }