diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index 84b9f8c4791814..dbc5092d43e4f7 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -186,10 +186,11 @@ Below is a list of Spark commands that are parsed currently: - InsertIntoHadoopFsRelationCommand - SaveIntoDataSourceCommand (jdbc) +- SaveIntoDataSourceCommand (Delta Lake) - CreateHiveTableAsSelectCommand - InsertIntoHiveTable -Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC. +Effectively, these support data sources/sinks corresponding to Hive, HDFS, JDBC, and Delta Lake. DataFrame.persist command is supported for below LeafExecNodes: diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java index b95e07cda270b0..f5f8ce4163188e 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java @@ -144,13 +144,17 @@ Optional> fromSparkPlanNode(SparkPlan plan, S Map options = JavaConversions.mapAsJavaMap(cmd.options()); String url = options.getOrDefault("url", ""); // e.g. jdbc:postgresql://localhost:5432/sparktestdb - if (!url.contains("jdbc")) { + if (url.contains("jdbc")) { + String tbl = options.get("dbtable"); + return Optional.of(Collections.singletonList( + new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); + } else if (options.containsKey("path")) { + return Optional.of(Collections.singletonList(new HdfsPathDataset(new Path(options.get("path")), + getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig), + getCommonFabricType(datahubConfig)))); + } else { return Optional.empty(); } - - String tbl = options.get("dbtable"); - return Optional.of(Collections.singletonList( - new JdbcDataset(url, tbl, getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig)))); }); PLAN_TO_DATASET.put(CreateDataSourceTableAsSelectCommand.class, (p, ctx, datahubConfig) -> {