Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Pick last file sorted by path for schema #269

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions src/main/scala/com/databricks/spark/avro/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io._
import java.net.URI
import java.util.zip.Deflater

import scala.math.Ordering
import scala.util.control.NonFatal

import com.databricks.spark.avro.DefaultSource.{AvroSchema, IgnoreFilesWithoutExtensionProperty, SerializableConfiguration}
Expand Down Expand Up @@ -61,24 +62,38 @@ private[avro] class DefaultSource extends FileFormat with DataSourceRegister {
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sparkContext.hadoopConfiguration

// Schema evolution is not supported yet. Here we only pick a single random sample file to
// Schema evolution is not supported yet. Here we only pick the last file sorted by path to
// figure out the schema of the whole dataset.
val sampleFile = if (conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)) {
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
def sampleFilePath = {
implicit def pathOrdering: Ordering[Path] = Ordering.fromLessThan(
(p1: Path, p2: Path) => p1.compareTo(p2) <= 0
)

val ignoreWithoutExtension = conf.getBoolean(IgnoreFilesWithoutExtensionProperty, true)

val paths = if (ignoreWithoutExtension) {
files.map(_.getPath).filter(_.getName.endsWith(".avro"))
} else {
files.map(_.getPath)
}

if (paths.isEmpty) {
throw new FileNotFoundException(
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" is " +
"set to true. Do all input files have \".avro\" extension?"
if (ignoreWithoutExtension) {
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
"is set to true. Do all input files have \".avro\" extension?"
} else {
"No Avro files found."
}
)
}
} else {
files.headOption.getOrElse {
throw new FileNotFoundException("No Avro files found.")
} else {
paths.max
}
}

// User can specify an optional avro json schema.
val avroSchema = options.get(AvroSchema).map(new Schema.Parser().parse).getOrElse {
val in = new FsInput(sampleFile.getPath, conf)
val in = new FsInput(sampleFilePath, conf)
try {
val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
try {
Expand Down
17 changes: 17 additions & 0 deletions src/test/scala/com/databricks/spark/avro/AvroSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -803,4 +803,21 @@ class AvroSuite extends FunSuite with BeforeAndAfterAll {
assert(readDf.collect().sameElements(writeDf.collect()))
}
}

test("writing avro partitions with different schemas and reading back out with a single predictable schema") {
TestUtils.withTempDir { tempDir =>
val df1 = spark.createDataFrame(Seq(Tuple1("a"), Tuple1("b")))
val df2 = spark.createDataFrame(Seq(("a", 1), ("b", 2)))

df1.write.avro(s"$tempDir/different_schemas/z=1")
df2.write.avro(s"$tempDir/different_schemas/z=2")
val df3 = spark.read.avro(s"$tempDir/different_schemas")
assert(df3.schema.fieldNames.toSet === Set("_1", "_2", "z"))

df1.write.avro(s"$tempDir/different_schemas_yet_again/z=2")
df2.write.avro(s"$tempDir/different_schemas_yet_again/z=1")
val df4 = spark.read.avro(s"$tempDir/different_schemas_yet_again")
assert(df4.schema.fieldNames.toSet === Set("_1", "z"))
}
}
}