From a30d70012b097f0b97ab1f2459637dcde18e2c43 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Tue, 2 May 2017 11:43:03 -0400 Subject: [PATCH 1/6] make the avro file picked for schema stable by sorting on path and picking the first --- .../databricks/spark/avro/DefaultSource.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala index 4ebcec85..f0ab8451 100644 --- a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala @@ -61,24 +61,28 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sparkContext.hadoopConfiguration - // Schema evolution is not supported yet. Here we only pick a single random sample file to + // Schema evolution is not supported yet. Here we only pick the first file sorted by path to // figure out the schema of the whole dataset. - val sampleFile = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) { - files.find(_.getPath.getName.endsWith(".avro")).getOrElse { - throw new FileNotFoundException( - "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" is " + - "set to true. Do all input files have \".avro\" extension?" - ) - } + def sampleFilePath = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) { + files.iterator.map(_.getPath).filter(_.getName.endsWith(".avro")) + .reduceOption{ (p1, p2) => if (p1.compareTo(p2) <= 0) p1 else p2 } + .getOrElse { + throw new FileNotFoundException( + "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " + + "is set to true. Do all input files have \".avro\" extension?" + ) + } } else { - files.headOption.getOrElse { - throw new FileNotFoundException("No Avro files found.") - } + files.iterator.map(_.getPath) + .reduceOption{ (p1, p2) => if (p1.compareTo(p2) <= 0) p1 else p2 } + .getOrElse{ + throw new FileNotFoundException("No Avro files found.") + } } // User can specify an optional avro json schema. val avroSchema = options.get(AvroSchema).map(new Schema.Parser().parse).getOrElse { - val in = new FsInput(sampleFile.getPath, conf) + val in = new FsInput(sampleFilePath, conf) try { val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()) try { From bdefef80086568bd2bd753167e3e6db6ac2e889e Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 11 Feb 2018 19:04:57 -0500 Subject: [PATCH 2/6] test reading files with different schemas and picking the schema consistently --- .../scala/com/databricks/spark/avro/AvroSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 8e49caa0..c0a9e44a 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -803,4 +803,15 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { assert(readDf.collect().sameElements(writeDf.collect())) } } + + test("writing avro partitions with different schemas and reading back out with a single predictable schema") { + TestUtils.withTempDir { tempDir => + val df1 = spark.createDataFrame(Seq(("a", 1), ("b", 2))) + df1.write.avro(s"$tempDir/different_schemas/z=1") + val df2 = spark.createDataFrame(Seq(Tuple1("a"), Tuple1("b"))) + df2.write.avro(s"$tempDir/different_schemas/z=2") + val df3 = spark.read.avro(s"$tempDir/different_schemas") + assert(df3.schema.fieldNames.toSet === Set("_1", "_2", "z")) + } + } } From 4867bcfe8ab91a805283bcdd6cf067e31e5d97e0 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 3 Jun 2018 12:11:22 -0400 Subject: [PATCH 3/6] use min for picking first path --- .../databricks/spark/avro/DefaultSource.scala | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala index f0ab8451..798cd453 100644 --- a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala @@ -21,6 +21,7 @@ import java.net.URI import java.util.zip.Deflater import scala.util.control.NonFatal +import scala.math.Ordering import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration} import com.esotericsoftware.kryo.{Kryo, KryoSerializable} @@ -63,21 +64,31 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister { // Schema evolution is not supported yet. Here we only pick the first file sorted by path to // figure out the schema of the whole dataset. - def sampleFilePath = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) { - files.iterator.map(_.getPath).filter(_.getName.endsWith(".avro")) - .reduceOption{ (p1, p2) => if (p1.compareTo(p2) <= 0) p1 else p2 } - .getOrElse { - throw new FileNotFoundException( + def sampleFilePath = { + implicit def pathOrdering: Ordering[Path] = Ordering.fromLessThan( + (p1: Path, p2: Path) => p1.compareTo(p2) <= 0 + ) + + val ignoreWithoutExtension = conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true) + + val paths = if (ignoreWithoutExtension) { + files.map(_.getPath).filter(_.getName.endsWith(".avro")) + } else { + files.map(_.getPath) + } + + if (paths.isEmpty) { + throw new FileNotFoundException( + if (ignoreWithoutExtension) { "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " + - "is set to true. Do all input files have \".avro\" extension?" - ) - } - } else { - files.iterator.map(_.getPath) - .reduceOption{ (p1, p2) => if (p1.compareTo(p2) <= 0) p1 else p2 } - .getOrElse{ - throw new FileNotFoundException("No Avro files found.") - } + "is set to true. Do all input files have \".avro\" extension?" + } else { + "No Avro files found." + } + ) + } else { + paths.min + } } // User can specify an optional avro json schema. From dc6dcdd77f4b45ca451ecdc80473793d5970e5a7 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 3 Jun 2018 12:12:18 -0400 Subject: [PATCH 4/6] add one more test for predictably picking schema --- src/test/scala/com/databricks/spark/avro/AvroSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index c0a9e44a..593def1c 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -807,11 +807,17 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { test("writing avro partitions with different schemas and reading back out with a single predictable schema") { TestUtils.withTempDir { tempDir => val df1 = spark.createDataFrame(Seq(("a", 1), ("b", 2))) - df1.write.avro(s"$tempDir/different_schemas/z=1") val df2 = spark.createDataFrame(Seq(Tuple1("a"), Tuple1("b"))) + + df1.write.avro(s"$tempDir/different_schemas/z=1") df2.write.avro(s"$tempDir/different_schemas/z=2") val df3 = spark.read.avro(s"$tempDir/different_schemas") assert(df3.schema.fieldNames.toSet === Set("_1", "_2", "z")) + + df1.write.avro(s"$tempDir/different_schemas_yet_again/z=2") + df2.write.avro(s"$tempDir/different_schemas_yet_again/z=1") + val df4 = spark.read.avro(s"$tempDir/different_schemas_yet_again") + assert(df4.schema.fieldNames.toSet === Set("_1", "z")) } } } From 64c10b1d6bab678c071a766ced8c2b7034c004fe Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 3 Jun 2018 12:40:34 -0400 Subject: [PATCH 5/6] can't stand scalastyle --- src/main/scala/com/databricks/spark/avro/DefaultSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala index 798cd453..3b5995f7 100644 --- a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala @@ -20,8 +20,8 @@ import java.io._ import java.net.URI import java.util.zip.Deflater -import scala.util.control.NonFatal import scala.math.Ordering +import scala.util.control.NonFatal import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration} import com.esotericsoftware.kryo.{Kryo, KryoSerializable} From c82d354b95be93c99385e0159d8a76471b9907e7 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 7 Jun 2018 17:26:39 -0400 Subject: [PATCH 6/6] pick last file instead of first --- src/main/scala/com/databricks/spark/avro/DefaultSource.scala | 4 ++-- src/test/scala/com/databricks/spark/avro/AvroSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala index 3b5995f7..9fede8d1 100644 --- a/src/main/scala/com/databricks/spark/avro/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/avro/DefaultSource.scala @@ -62,7 +62,7 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister { files: Seq[FileStatus]): Option[StructType] = { val conf = spark.sparkContext.hadoopConfiguration - // Schema evolution is not supported yet. Here we only pick the first file sorted by path to + // Schema evolution is not supported yet. Here we only pick the last file sorted by path to // figure out the schema of the whole dataset. def sampleFilePath = { implicit def pathOrdering: Ordering[Path] = Ordering.fromLessThan( @@ -87,7 +87,7 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister { } ) } else { - paths.min + paths.max } } diff --git a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala index 593def1c..6606fa24 100644 --- a/src/test/scala/com/databricks/spark/avro/AvroSuite.scala +++ b/src/test/scala/com/databricks/spark/avro/AvroSuite.scala @@ -806,8 +806,8 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll { test("writing avro partitions with different schemas and reading back out with a single predictable schema") { TestUtils.withTempDir { tempDir => - val df1 = spark.createDataFrame(Seq(("a", 1), ("b", 2))) - val df2 = spark.createDataFrame(Seq(Tuple1("a"), Tuple1("b"))) + val df1 = spark.createDataFrame(Seq(Tuple1("a"), Tuple1("b"))) + val df2 = spark.createDataFrame(Seq(("a", 1), ("b", 2))) df1.write.avro(s"$tempDir/different_schemas/z=1") df2.write.avro(s"$tempDir/different_schemas/z=2")