Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use iceberg jar for delta icebergShaded #4205

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 77 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,13 @@ lazy val commonSettings = Seq(
"-Ddelta.log.cacheSize=3",
"-Dspark.databricks.delta.delta.log.cacheSize=3",
"-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5",
"-Xmx1024m"
"-Xmx1024m",
// For Java 17
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.net=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
),

testOptions += Tests.Argument("-oF"),
Expand Down Expand Up @@ -428,7 +434,8 @@ lazy val spark = (project in file("spark"))
sparkMimaSettings,
releaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.18.2",
libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
Expand Down Expand Up @@ -848,26 +855,53 @@ lazy val iceberg = (project in file("iceberg"))
)
// scalastyle:on println

lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs")

lazy val icebergShaded = (project in file("icebergShaded"))
.dependsOn(spark % "provided")
.disablePlugins(JavaFormatterPlugin, ScalafmtPlugin)
.settings (
name := "iceberg-shaded",
commonSettings,
skipReleaseSettings,

// Compile, patch and generated Iceberg JARs
generateIcebergJarsTask := {
import sys.process._
val scriptPath = baseDirectory.value / "generate_iceberg_jars.py"
// Download iceberg code in `iceberg_src` dir and generate the JARs in `lib` dir
Seq("python3", scriptPath.getPath)!
},
Compile / unmanagedJars := (Compile / unmanagedJars).dependsOn(generateIcebergJarsTask).value,
cleanFiles += baseDirectory.value / "iceberg_src",
cleanFiles += baseDirectory.value / "lib",
libraryDependencies ++= Seq(
// Fix Iceberg's legacy java.lang.NoClassDefFoundError: scala/jdk/CollectionConverters$ error
// due to legacy scala.
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1" % "provided",
"org.apache.iceberg" % "iceberg-core" % "1.8.0" excludeAll (
ExclusionRule("com.github.ben-manes.caffeine"),
ExclusionRule("io.netty"),
),
"org.apache.iceberg" % "iceberg-hive-metastore" % "1.8.0" excludeAll (
ExclusionRule("com.github.ben-manes.caffeine"),
ExclusionRule("io.netty"),
),
"org.apache.hadoop" % "hadoop-client" % "2.7.3" excludeAll (
ExclusionRule("org.apache.avro"),
ExclusionRule("org.slf4j"),
ExclusionRule("commons-beanutils"),
ExclusionRule("org.datanucleus"),
ExclusionRule("io.netty"),
),
"org.apache.hive" % "hive-metastore" % "2.3.8" excludeAll (
ExclusionRule("org.apache.avro"),
ExclusionRule("org.slf4j"),
ExclusionRule("org.pentaho"),
ExclusionRule("org.apache.hbase"),
ExclusionRule("org.apache.logging.log4j"),
ExclusionRule("co.cask.tephra"),
ExclusionRule("com.google.code.findbugs"),
ExclusionRule("org.eclipse.jetty.aggregate"),
ExclusionRule("org.eclipse.jetty.orbit"),
ExclusionRule("org.apache.parquet"),
ExclusionRule("com.tdunning"),
ExclusionRule("javax.transaction"),
ExclusionRule("com.zaxxer"),
ExclusionRule("org.apache.ant"),
ExclusionRule("javax.servlet"),
ExclusionRule("javax.jdo"),
ExclusionRule("commons-beanutils"),
ExclusionRule("org.datanucleus"),
),
),

// Generated shaded Iceberg JARs
Compile / packageBin := assembly.value,
Expand All @@ -877,8 +911,36 @@ lazy val icebergShaded = (project in file("icebergShaded"))
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll,
),
assembly / assemblyExcludedJars := {
val cp = (fullClasspath in assembly).value
cp.filter { jar =>
val doExclude = jar.data.getName.contains("jackson-annotations") ||
jar.data.getName.contains("RoaringBitmap")
doExclude
}
},
assembly / assemblyMergeStrategy := {
case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec$Builder.class") =>
MergeStrategy.first
case PathList("shadedForDelta", "org", "apache", "iceberg", "PartitionSpec.class") =>
MergeStrategy.first
case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog.class") =>
MergeStrategy.first
case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog$1.class") =>
MergeStrategy.first
case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog$ViewAwareTableBuilder.class") =>
MergeStrategy.first
case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveCatalog$TableAwareViewBuilder.class") =>
MergeStrategy.first
case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveTableOperations.class") =>
MergeStrategy.first
case PathList("shadedForDelta", "org", "apache", "iceberg", "hive", "HiveTableOperations$1.class") =>
MergeStrategy.first
case x => (assemblyMergeStrategy in assembly).value(x)
},
assemblyPackageScala / assembleArtifact := false,
// Make the 'compile' invoke the 'assembly' task to generate the uber jar.
Compile / packageBin := assembly.value
)

lazy val hudi = (project in file("hudi"))
Expand Down
10 changes: 8 additions & 2 deletions examples/scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ organizationName := "example"

val scala212 = "2.12.18"
val scala213 = "2.13.13"
val icebergVersion = "1.4.1"
val icebergVersion = "1.8.0"
val defaultDeltaVersion = {
val versionFileContent = IO.read(file("../../version.sbt"))
val versionRegex = """.*version\s*:=\s*"([^"]+)".*""".r
Expand Down Expand Up @@ -126,7 +126,12 @@ lazy val extraMavenRepo = sys.env.get("EXTRA_MAVEN_REPO").toSeq.map { repo =>
lazy val java17Settings = Seq(
fork := true,
javaOptions ++= Seq(
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED",
// For Java 17
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.net=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
)
)

Expand Down Expand Up @@ -170,5 +175,6 @@ lazy val root = (project in file("."))
"-deprecation",
"-feature"
),
dependencyOverrides += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.18.2",
java17Settings
)
26 changes: 23 additions & 3 deletions examples/scala/src/main/scala/example/UniForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,36 @@ object UniForm {
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()

val schema =
"""
|col0 INT,
|col1 STRUCT<
| col2: MAP<INT, INT>,
| col3: ARRAY<INT>,
| col4: STRUCT<col5: STRING>
|>,
|col5 INT,
|col6 INT
|""".stripMargin

def getRowToInsertStr(id: Int): String = {
s"""
|$id,
|struct(map($id, $id), array($id), struct($id)),
|$id,
|$id
|""".stripMargin
}
deltaSpark.sql(s"DROP TABLE IF EXISTS ${testTableName}")
deltaSpark.sql(
s"""CREATE TABLE `${testTableName}` (col1 INT) using DELTA
s"""CREATE TABLE `${testTableName}` ($schema) using DELTA
|PARTITIONED BY (col0, col5, col6)
|TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.enableIcebergCompatV1' = 'true',
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
deltaSpark.sql(s"INSERT INTO `$testTableName` VALUES (123)")
deltaSpark.sql(s"INSERT INTO $testTableName VALUES (${getRowToInsertStr(1)})")

// Wait for the conversion to be done
Thread.sleep(10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PartitionSpec, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.ExpireSnapshots
import shadedForDelta.org.apache.iceberg.MetadataUpdate
import shadedForDelta.org.apache.iceberg.MetadataUpdate.{AddPartitionSpec, AddSchema}
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser

Expand Down Expand Up @@ -64,8 +66,10 @@ class IcebergConversionTransaction(
protected val postCommitSnapshot: Snapshot,
protected val tableOp: IcebergTableOp = WRITE_TABLE,
protected val lastConvertedIcebergSnapshotId: Option[Long] = None,
protected val lastConvertedDeltaVersion: Option[Long] = None
) extends DeltaLogging {
protected val lastConvertedDeltaVersion: Option[Long] = None,
protected val metadataUpdates: java.util.ArrayList[MetadataUpdate] =
new java.util.ArrayList[MetadataUpdate]()
) extends DeltaLogging {

///////////////////////////
// Nested Helper Classes //
Expand Down Expand Up @@ -307,7 +311,8 @@ class IcebergConversionTransaction(
log" Setting new Iceberg schema:\n ${MDC(DeltaLogKeys.SCHEMA, icebergSchema)}")
}

txn.setSchema(icebergSchema).commit()
metadataUpdates.add(
new AddSchema(icebergSchema, postCommitSnapshot.metadata.columnMappingMaxId.toInt))

recordDeltaEvent(
postCommitSnapshot.deltaLog,
Expand Down Expand Up @@ -351,12 +356,7 @@ class IcebergConversionTransaction(
assert(fileUpdates.forall(_.hasCommitted), "Cannot commit. You have uncommitted changes.")

val nameMapping = NameMappingParser.toJson(MappingUtil.create(icebergSchema))

// hard code dummy delta version as -1 for CREATE_TABLE, which will be later
// set to correct version in setSchemaTxn. -1 is chosen because it is less than the smallest
// possible legitimate Delta version which is 0.
val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version

val deltaVersion = postCommitSnapshot.version
var updateTxn = txn.updateProperties()
updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString)
.set(IcebergConverter.DELTA_TIMESTAMP_PROPERTY, postCommitSnapshot.timestamp.toString)
Expand Down Expand Up @@ -392,19 +392,21 @@ class IcebergConversionTransaction(
)
}
try {
txn.commitTransaction()
if (tableOp == CREATE_TABLE) {
// Iceberg CREATE_TABLE reassigns the field id in schema, which
// is overwritten by setting Delta schema with Delta generated field id to ensure
// consistency between field id in Iceberg schema after conversion and field id in
// parquet files written by Delta.
val setSchemaTxn = createIcebergTxn(Some(WRITE_TABLE))
setSchemaTxn.setSchema(icebergSchema).commit()
setSchemaTxn.updateProperties()
.set(IcebergConverter.DELTA_VERSION_PROPERTY, postCommitSnapshot.version.toString)
.commit()
setSchemaTxn.commitTransaction()
metadataUpdates.add(
new AddSchema(icebergSchema, postCommitSnapshot.metadata.columnMappingMaxId.toInt)
)
if (postCommitSnapshot.metadata.partitionColumns.nonEmpty) {
metadataUpdates.add(
new AddPartitionSpec(partitionSpec)
)
}
}
txn.commitTransaction()
recordIcebergCommit()
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,6 @@ class IcebergConverter(spark: SparkSession)
case (None, None) => CREATE_TABLE
}

UniversalFormat.enforceSupportInCatalog(cleanedCatalogTable, snapshotToConvert.metadata) match {
case Some(updatedTable) => spark.sessionState.catalog.alterTable(updatedTable)
case _ =>
}

val icebergTxn = new IcebergConversionTransaction(
spark, cleanedCatalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp,
lastConvertedIcebergSnapshotId, lastDeltaVersionConverted)
Expand Down
Loading