diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d6bdd3d825565..c5839fc3b6488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index bc8ef4ad7e236..9e913de48f72c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -19,13 +19,14 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec} +import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec} import org.apache.spark.util.Utils private[datasources] object CompressionCodecs { private val shortCompressionCodecNames = Map( "bzip2" -> classOf[BZip2Codec].getName, + "deflate" -> classOf[DeflateCodec].getName, "gzip" -> classOf[GzipCodec].getName, "lz4" -> classOf[Lz4Codec].getName, "snappy" -> classOf[SnappyCodec].getName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 184cbb2f296b0..a1806221b7148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -286,7 +286,7 @@ private[sql] class ParquetRelation( ParquetRelation .shortParquetCompressionCodecNames .getOrElse( - sqlContext.conf.parquetCompressionCodec.toUpperCase, + sqlContext.conf.parquetCompressionCodec.toLowerCase(), CompressionCodecName.UNCOMPRESSED).name()) new BucketedOutputWriterFactory { @@ -905,9 +905,9 @@ private[sql] object ParquetRelation extends Logging { // The parquet compression short names val shortParquetCompressionCodecNames = Map( - "NONE" -> CompressionCodecName.UNCOMPRESSED, - "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED, - "SNAPPY" -> CompressionCodecName.SNAPPY, - "GZIP" -> CompressionCodecName.GZIP, - "LZO" -> CompressionCodecName.LZO) + "none" -> CompressionCodecName.UNCOMPRESSED, + "uncompressed" -> CompressionCodecName.UNCOMPRESSED, + "snappy" -> CompressionCodecName.SNAPPY, + "gzip" -> CompressionCodecName.GZIP, + "lzo" -> CompressionCodecName.LZO) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 6ae42a30fb00c..59e0e6a7cfa00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.text -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -58,18 +58,21 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("SPARK-13503 Support to specify the option for compression codec for TEXT") { - val df = sqlContext.read.text(testFile).withColumnRenamed("value", "adwrasdf") + val testDf = sqlContext.read.text(testFile) - val tempFile = Utils.createTempDir() - tempFile.delete() - df.write - .option("compression", "gZiP") - .text(tempFile.getCanonicalPath) - val compressedFiles = tempFile.listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz"))) - verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath)) + Seq("bzip2", "deflate", "gzip").foreach { codecName => + val tempDir = Utils.createTempDir() + val tempDirPath = tempDir.getAbsolutePath() + testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) + verifyFrame(sqlContext.read.text(tempDirPath)) + } - Utils.deleteRecursively(tempFile) + val errMsg = intercept[IllegalArgumentException] { + val tempDirPath = Utils.createTempDir().getAbsolutePath() + testDf.write.option("compression", "illegal").mode(SaveMode.Overwrite).text(tempDirPath) + } + assert(errMsg.getMessage === "Codec [illegal] is not available. " + + "Known codecs are bzip2, deflate, lz4, gzip, snappy.") } private def testFile: String = {