Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3187] feat(spark-connector): Support SparkSQL extended syntax in Iceberg #3266

Merged
merged 31 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4d334aa
[#2543] feat(spark-connector): support row-level operations to iceber…
caican00 May 1, 2024
90b7be8
[#3264] feat(spark-connector): Support Iceberg time travel in SQL que…
caican00 May 4, 2024
65ef2a4
update
caican00 May 4, 2024
0a8ff35
[#3187] feat(spark-connector): Support SparkSQL extended syntax in Ic…
caican00 May 4, 2024
302244b
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-asof
caican00 May 13, 2024
90b8d14
update
caican00 May 13, 2024
86f51cd
Merge branch 'main' into iceberg-asof
caican00 May 15, 2024
d2ba387
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-e…
caican00 May 15, 2024
b367b65
update
caican00 May 16, 2024
ad1a52e
Merge branch 'iceberg-asof' of github.com:caican00/gravitino into ice…
caican00 May 16, 2024
a8a4d6b
update
caican00 May 16, 2024
2de6eaf
Merge branch 'main' into iceberg-extended-sql
caican00 May 16, 2024
ecc463e
update
caican00 May 17, 2024
53a8c8d
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-e…
caican00 May 17, 2024
30fba3c
update
caican00 May 17, 2024
b97c325
update
caican00 May 17, 2024
80b4c99
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-e…
caican00 May 21, 2024
560d6ad
update
caican00 May 21, 2024
b87d9f0
update
caican00 May 21, 2024
1468ee8
Merge branch 'main' into iceberg-extended-sql
caican00 May 21, 2024
ecceee9
update
caican00 May 22, 2024
8c6da72
Merge branch 'main' of github.com:datastrato/gravitino into iceberg-e…
caican00 Jun 5, 2024
7d859ba
update
caican00 Jun 5, 2024
801b907
update
caican00 Jun 5, 2024
066ca15
update
caican00 Jun 5, 2024
b9d75f0
Merge branch 'main' into iceberg-extended-sql
caican00 Jun 6, 2024
71fb559
update
caican00 Jun 7, 2024
9d5ebe4
Merge branch 'main' into iceberg-extended-sql
caican00 Jun 13, 2024
52f46ea
update
caican00 Jun 13, 2024
55fbd90
Merge branch 'main' into iceberg-extended-sql
caican00 Jun 13, 2024
d13e22d
Merge branch 'main' into iceberg-extended-sql
caican00 Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.spark.connector.iceberg.extensions;

import org.apache.spark.sql.SparkSessionExtensions;
import scala.Function1;

public class GravitinoIcebergSparkSessionExtensions
implements Function1<SparkSessionExtensions, Void> {

@Override
public Void apply(SparkSessionExtensions extensions) {

// planner extensions
extensions.injectPlannerStrategy(IcebergExtendedDataSourceV2Strategy::new);

// There must be a return value, and Void only supports returning null, not other types.
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.spark.connector.iceberg.extensions;

import com.datastrato.gravitino.spark.connector.iceberg.GravitinoIcebergCatalog;
import java.util.Collections;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField;
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch;
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag;
import org.apache.spark.sql.catalyst.plans.logical.DropBranch;
import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields;
import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField;
import org.apache.spark.sql.catalyst.plans.logical.DropTag;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField;
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields;
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.execution.datasources.v2.AddPartitionFieldExec;
import org.apache.spark.sql.execution.datasources.v2.CreateOrReplaceBranchExec;
import org.apache.spark.sql.execution.datasources.v2.CreateOrReplaceTagExec;
import org.apache.spark.sql.execution.datasources.v2.DropBranchExec;
import org.apache.spark.sql.execution.datasources.v2.DropIdentifierFieldsExec;
import org.apache.spark.sql.execution.datasources.v2.DropPartitionFieldExec;
import org.apache.spark.sql.execution.datasources.v2.DropTagExec;
import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy;
import org.apache.spark.sql.execution.datasources.v2.ReplacePartitionFieldExec;
import org.apache.spark.sql.execution.datasources.v2.SetIdentifierFieldsExec;
import org.apache.spark.sql.execution.datasources.v2.SetWriteDistributionAndOrderingExec;
import scala.Option;
import scala.Some;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class IcebergExtendedDataSourceV2Strategy extends ExtendedDataSourceV2Strategy {

private final SparkSession spark;

public IcebergExtendedDataSourceV2Strategy(SparkSession spark) {
super(spark);
this.spark = spark;
}

@Override
public Seq<SparkPlan> apply(LogicalPlan plan) {
if (plan instanceof AddPartitionField) {
AddPartitionField addPartitionField = (AddPartitionField) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark, addPartitionField.table())
.map(
catalogAndIdentifier -> {
AddPartitionFieldExec addPartitionFieldExec =
new AddPartitionFieldExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
addPartitionField.transform(),
addPartitionField.name());
return toSeq(addPartitionFieldExec);
})
.get();
} else if (plan instanceof CreateOrReplaceBranch) {
CreateOrReplaceBranch createOrReplaceBranch = (CreateOrReplaceBranch) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, createOrReplaceBranch.table())
.map(
catalogAndIdentifier -> {
CreateOrReplaceBranchExec createOrReplaceBranchExec =
new CreateOrReplaceBranchExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
createOrReplaceBranch.branch(),
createOrReplaceBranch.branchOptions(),
createOrReplaceBranch.create(),
createOrReplaceBranch.replace(),
createOrReplaceBranch.ifNotExists());
return toSeq(createOrReplaceBranchExec);
})
.get();
} else if (plan instanceof CreateOrReplaceTag) {
CreateOrReplaceTag createOrReplaceTag = (CreateOrReplaceTag) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, createOrReplaceTag.table())
.map(
catalogAndIdentifier -> {
CreateOrReplaceTagExec createOrReplaceTagExec =
new CreateOrReplaceTagExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
createOrReplaceTag.tag(),
createOrReplaceTag.tagOptions(),
createOrReplaceTag.create(),
createOrReplaceTag.replace(),
createOrReplaceTag.ifNotExists());
return toSeq(createOrReplaceTagExec);
})
.get();
} else if (plan instanceof DropBranch) {
DropBranch dropBranch = (DropBranch) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark, dropBranch.table())
.map(
catalogAndIdentifier -> {
DropBranchExec dropBranchExec =
new DropBranchExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropBranch.branch(),
dropBranch.ifExists());
return toSeq(dropBranchExec);
})
.get();
} else if (plan instanceof DropTag) {
DropTag dropTag = (DropTag) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark, dropTag.table())
.map(
catalogAndIdentifier -> {
DropTagExec dropTagExec =
new DropTagExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropTag.tag(),
dropTag.ifExists());
return toSeq(dropTagExec);
})
.get();
} else if (plan instanceof DropPartitionField) {
DropPartitionField dropPartitionField = (DropPartitionField) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, dropPartitionField.table())
.map(
catalogAndIdentifier -> {
DropPartitionFieldExec dropPartitionFieldExec =
new DropPartitionFieldExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropPartitionField.transform());
return toSeq(dropPartitionFieldExec);
})
.get();
} else if (plan instanceof ReplacePartitionField) {
ReplacePartitionField replacePartitionField = (ReplacePartitionField) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, replacePartitionField.table())
.map(
catalogAndIdentifier -> {
ReplacePartitionFieldExec replacePartitionFieldExec =
new ReplacePartitionFieldExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
replacePartitionField.transformFrom(),
replacePartitionField.transformTo(),
replacePartitionField.name());
return toSeq(replacePartitionFieldExec);
})
.get();
} else if (plan instanceof SetIdentifierFields) {
SetIdentifierFields setIdentifierFields = (SetIdentifierFields) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, setIdentifierFields.table())
.map(
catalogAndIdentifier -> {
SetIdentifierFieldsExec setIdentifierFieldsExec =
new SetIdentifierFieldsExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
setIdentifierFields.fields());
return toSeq(setIdentifierFieldsExec);
})
.get();
} else if (plan instanceof DropIdentifierFields) {
DropIdentifierFields dropIdentifierFields = (DropIdentifierFields) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, dropIdentifierFields.table())
.map(
catalogAndIdentifier -> {
DropIdentifierFieldsExec dropIdentifierFieldsExec =
new DropIdentifierFieldsExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
dropIdentifierFields.fields());
return toSeq(dropIdentifierFieldsExec);
})
.get();
} else if (plan instanceof SetWriteDistributionAndOrdering) {
SetWriteDistributionAndOrdering setWriteDistributionAndOrdering =
(SetWriteDistributionAndOrdering) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
spark, setWriteDistributionAndOrdering.table())
.map(
catalogAndIdentifier -> {
SetWriteDistributionAndOrderingExec setWriteDistributionAndOrderingExec =
new SetWriteDistributionAndOrderingExec(
catalogAndIdentifier.catalog,
catalogAndIdentifier.identifier,
setWriteDistributionAndOrdering.distributionMode(),
setWriteDistributionAndOrdering.sortOrder());
return toSeq(setWriteDistributionAndOrderingExec);
})
.get();
} else {
return super.apply(plan);
}
}

private Seq<SparkPlan> toSeq(SparkPlan plan) {
return JavaConverters.asScalaIteratorConverter(Collections.singletonList(plan).listIterator())
.asScala()
.toSeq();
}

static class IcebergCatalogAndIdentifier {

private final TableCatalog catalog;
private final Identifier identifier;

private IcebergCatalogAndIdentifier(TableCatalog catalog, Identifier identifier) {
this.catalog = catalog;
this.identifier = identifier;
}

private static IcebergCatalogAndIdentifier of(TableCatalog catalog, Identifier identifier) {
return new IcebergCatalogAndIdentifier(catalog, identifier);
}

static Option<IcebergCatalogAndIdentifier> buildCatalogAndIdentifier(
SparkSession spark, Seq<String> identifiers) {
Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
Spark3Util.catalogAndIdentifier(spark, JavaConverters.<String>seqAsJavaList(identifiers));
CatalogPlugin catalog = catalogAndIdentifier.catalog();
if (catalog instanceof GravitinoIcebergCatalog) {
return new Some<>(
IcebergCatalogAndIdentifier.of(
(TableCatalog) catalog, catalogAndIdentifier.identifier()));
} else {
// TODO: support SparkSessionCatalog
throw new UnsupportedOperationException(
"Unsupported catalog type: " + catalog.getClass().getName());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.catalog.GravitinoCatalogManager;
import com.datastrato.gravitino.spark.connector.iceberg.extensions.GravitinoIcebergSparkSessionExtensions;
import com.datastrato.gravitino.spark.connector.version.CatalogNameAdaptor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand All @@ -37,7 +39,9 @@ public class GravitinoDriverPlugin implements DriverPlugin {
private static final Logger LOG = LoggerFactory.getLogger(GravitinoDriverPlugin.class);

private GravitinoCatalogManager catalogManager;
private List<String> gravitinoDriverExtensions = new ArrayList<>();
private final List<String> toRegisteredDriverExtensions =
Arrays.asList(GravitinoIcebergSparkSessionExtensions.class.getName());
private final List<String> gravitinoDriverExtensions = new ArrayList<>();
private boolean enableIcebergSupport = false;

@VisibleForTesting
Expand All @@ -58,6 +62,8 @@ public Map<String, String> init(SparkContext sc, PluginContext pluginContext) {
String.format(
"%s:%s, should not be empty", GravitinoSparkConfig.GRAVITINO_METALAKE, metalake));

gravitinoDriverExtensions.addAll(toRegisteredDriverExtensions);

this.enableIcebergSupport =
conf.getBoolean(GravitinoSparkConfig.GRAVITINO_ENABLE_ICEBERG_SUPPORT, false);
if (enableIcebergSupport) {
Expand Down
Loading
Loading