From f976b73daaa63f692d224174dcd8a9f8ccf8c0f4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 24 May 2014 18:43:56 +0800 Subject: [PATCH] Addessed the readability issue commented by @rxin --- .../org/apache/spark/sql/SQLContext.scala | 11 ++---- .../spark/sql/execution/SparkStrategies.scala | 38 ++++++++++--------- .../spark/sql/hive/HiveStrategies.scala | 2 +- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index cb4f109552597..043be58edc91b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -206,8 +206,8 @@ class SQLContext(@transient val sparkContext: SparkContext) * final desired output requires complex expressions to be evaluated or when columns can be * further eliminated out after filtering has been done. * - * The `prunePushedDownFilter` is used to remove those filters that can be removed by the filter - * pushdown optimization. + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. * * The required attributes for both filtering and expression evaluation are passed to the * provided `scanBuilder` function so that it can avoid unnecessary column materialization. @@ -215,15 +215,12 @@ class SQLContext(@transient val sparkContext: SparkContext) def pruneFilterProject( projectList: Seq[NamedExpression], filterPredicates: Seq[Expression], - prunePushedDownFilter: Option[Expression => Boolean], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { val projectSet = projectList.flatMap(_.references).toSet val filterSet = filterPredicates.flatMap(_.references).toSet - val filterCondition = prunePushedDownFilter - .map(filterPredicates.filter) - .getOrElse(filterPredicates) - .reduceLeftOption(And) + val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4113a4c64c267..cfa8bdae58b11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -141,31 +141,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { - val prunePushedDownFilter = + val prunePushedDownFilters = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - // Note: filters cannot be pushed down to Parquet if they contain more complex - // expressions than simple "Attribute cmp Literal" comparisons. Here we remove - // all filters that have been pushed down. Note that a predicate such as - // "(A AND B) OR C" can result in "A OR C" being pushed down. - Some((filter: Expression) => { - val recordFilter = ParquetFilters.createFilter(filter) - if (!recordFilter.isDefined) { - // First case: the pushdown did not result in any record filter. - true - } else { - // Second case: a record filter was created; here we are conservative in - // the sense that even if "A" was pushed and we check for "A AND B" we - // still want to keep "A AND B" in the higher-level filter, not just "B". - !ParquetFilters.findExpression(recordFilter.get, filter).isDefined + (filters: Seq[Expression]) => { + filters.filter { filter => + // Note: filters cannot be pushed down to Parquet if they contain more complex + // expressions than simple "Attribute cmp Literal" comparisons. Here we remove + // all filters that have been pushed down. Note that a predicate such as + // "(A AND B) OR C" can result in "A OR C" being pushed down. + val recordFilter = ParquetFilters.createFilter(filter) + if (!recordFilter.isDefined) { + // First case: the pushdown did not result in any record filter. + true + } else { + // Second case: a record filter was created; here we are conservative in + // the sense that even if "A" was pushed and we check for "A AND B" we + // still want to keep "A AND B" in the higher-level filter, not just "B". + !ParquetFilters.findExpression(recordFilter.get, filter).isDefined + } } - }) + } } else { - None + identity[Seq[Expression]] _ } pruneFilterProject( projectList, filters, - prunePushedDownFilter, + prunePushedDownFilters, ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index ae8f8e728c54b..8b51957162e04 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -69,7 +69,7 @@ private[hive] trait HiveStrategies { pruneFilterProject( projectList, otherPredicates, - None, + identity[Seq[Expression]], HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil case _ => Nil