From a6470fc267e2d34de6538fb7bb52c4b42cd3d5ea Mon Sep 17 00:00:00 2001 From: danielli-ziprecruiter <91145628+danielli-ziprecruiter@users.noreply.github.com> Date: Thu, 22 Dec 2022 04:13:32 -0700 Subject: [PATCH] feat(ingestion): spark - support lineage for delta lake writes (#6834) --- metadata-integration/java/spark-lineage/README.md | 3 ++- .../main/java/datahub/spark/DatasetExtractor.java | 14 +++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index 84b9f8c4791814..dbc5092d43e4f7 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -186,10 +186,11 @@ Below is a list of Spark commands that are parsed currently: - InsertIntoHadoopFsRelationCommand - SaveIntoDataSourceCommand (jdbc) +- SaveIntoDataSourceCommand (Delta Lake) - CreateHiveTableAsSelectCommand - InsertIntoHiveTable -Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC. +Effectively, these support data sources/sinks corresponding to Hive, HDFS, JDBC, and Delta Lake. DataFrame.persist command is supported for below LeafExecNodes: diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java index b95e07cda270b0..f5f8ce4163188e 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java @@ -144,13 +144,17 @@ Optional> fromSparkPlanNode(SparkPlan plan, S Map options = JavaConversions.mapAsJavaMap(cmd.options()); String url = options.getOrDefault("url", ""); // e.g. jdbc:postgresql://localhost:5432/sparktestdb - if (!url.contains("jdbc")) { + if (url.contains("jdbc")) { + String tbl = options.get("dbtable"); + return Optional.of(Collections.singletonList( + new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); + } else if (options.containsKey("path")) { + return Optional.of(Collections.singletonList(new HdfsPathDataset(new Path(options.get("path")), + getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig), + getCommonFabricType(datahubConfig)))); + } else { return Optional.empty(); } - - String tbl = options.get("dbtable"); - return Optional.of(Collections.singletonList( - new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(CreateDataSourceTableAsSelectCommand.class, (p, ctx, datahubConfig) -> {