Skip to content

Commit

Permalink
remove is empty check
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown committed Jan 6, 2024
1 parent 2684527 commit 9674bfb
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,15 @@ object AvroConversionUtils {
* TODO convert directly from GenericRecord into InternalRow instead
*/
def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = {
if (rdd.isEmpty()) {
ss.emptyDataFrame
} else {
ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
val converter = createConverterToRow(schema, dataType)
records.map { r => converter(r) }
}
}, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)))
}
ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
val converter = createConverterToRow(schema, dataType)
records.map { r => converter(r) }
}
}, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,23 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
// injecting [[SQLConf]], which by default isn't propagated by Spark to the executor(s).
// [[SQLConf]] is required by [[AvroSerializer]]
injectSQLConf(df.queryExecution.toRdd.mapPartitions { rows =>
val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
val transform: GenericRecord => GenericRecord =
if (sameSchema) identity
else {
HoodieAvroUtils.rewriteRecordDeep(_, readerAvroSchema)
}
if (rows.isEmpty) {
Iterator.empty
} else {
val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
val transform: GenericRecord => GenericRecord =
if (sameSchema) identity
else {
HoodieAvroUtils.rewriteRecordDeep(_, readerAvroSchema)
}

// Since caller might request to get records in a different ("evolved") schema, we will be rewriting from
// existing Writer's schema into Reader's (avro) schema
val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
val convert = AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, writerAvroSchema, nullable = nullable)
// Since caller might request to get records in a different ("evolved") schema, we will be rewriting from
// existing Writer's schema into Reader's (avro) schema
val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
val convert = AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, writerAvroSchema, nullable = nullable)

rows.map { ir => transform(convert(ir)) }
rows.map { ir => transform(convert(ir)) }
}
}, SQLConf.get)
}

Expand Down

0 comments on commit 9674bfb

Please sign in to comment.