diff --git a/metadata-integration/java/spark-lineage/README.md b/metadata-integration/java/spark-lineage/README.md index dbc5092d43e4f7..599fe6435ce115 100644 --- a/metadata-integration/java/spark-lineage/README.md +++ b/metadata-integration/java/spark-lineage/README.md @@ -122,20 +122,21 @@ The Spark agent can be configured using Databricks Cluster [Spark configuration] ## Configuration Options -| Field | Required | Default | Description | -|--------------------------------------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------| -| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | -| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | -| spark.datahub.rest.server | ✅ | | Datahub server url eg: | -| spark.datahub.rest.token | | | Authentication token. | -| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | -| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | -| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance | -| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | -| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | -| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | -| spark.datahub.coalesce_jobs | | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | -| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | +| Field | Required | Default | Description | +|--------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | +| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener | +| spark.datahub.rest.server | ✅ | | Datahub server url eg: | +| spark.datahub.rest.token | | | Authentication token. | +| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance | +| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance | +| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD | +| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` | +| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | +| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | +| spark.datahub.coalesce_jobs | | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | +| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true | ## What to Expect: The Metadata Model diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java index f5f8ce4163188e..51f5d561b26aea 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/DatasetExtractor.java @@ -60,6 +60,7 @@ public class DatasetExtractor { private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance"; private static final String TABLE_HIVE_PLATFORM_ALIAS = "metadata.table.hive_platform_alias"; private static final String INCLUDE_SCHEME_KEY = "metadata.include_scheme"; + private static final String REMOVE_PARTITION_PATTERN = "metadata.remove_partition_pattern"; // TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand private DatasetExtractor() { @@ -127,7 +128,7 @@ Optional> fromSparkPlanNode(SparkPlan plan, S } return Optional.of(Collections.singletonList(new HdfsPathDataset(cmd.outputPath(), getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig), - getCommonFabricType(datahubConfig)))); + getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig)))); }); PLAN_TO_DATASET.put(LogicalRelation.class, (p, ctx, datahubConfig) -> { @@ -151,7 +152,7 @@ Optional> fromSparkPlanNode(SparkPlan plan, S } else if (options.containsKey("path")) { return Optional.of(Collections.singletonList(new HdfsPathDataset(new Path(options.get("path")), getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig), - getCommonFabricType(datahubConfig)))); + getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig)))); } else { return Optional.empty(); } @@ -191,7 +192,7 @@ Optional> fromSparkPlanNode(SparkPlan plan, S // TODO mapping to URN TBD return Optional.of(Collections.singletonList(new HdfsPathDataset(res.get(0), getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig), - getCommonFabricType(datahubConfig)))); + getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig)))); }); REL_TO_DATASET.put(JDBCRelation.class, (r, ctx, datahubConfig) -> { JDBCRelation rel = (JDBCRelation) r; @@ -281,4 +282,9 @@ private static boolean getIncludeScheme(Config datahubConfig) { return datahubConfig.hasPath(INCLUDE_SCHEME_KEY) ? datahubConfig.getBoolean(INCLUDE_SCHEME_KEY) : true; } + + private static String getRemovePartitionPattern(Config datahubConfig) { + return datahubConfig.hasPath(REMOVE_PARTITION_PATTERN) ? datahubConfig.getString(REMOVE_PARTITION_PATTERN) + : null; + } } diff --git a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java index 4264f1ddeb2d5b..700aef5d6b15a6 100644 --- a/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java +++ b/metadata-integration/java/spark-lineage/src/main/java/datahub/spark/model/dataset/HdfsPathDataset.java @@ -11,14 +11,13 @@ @ToString public class HdfsPathDataset extends SparkDataset { - private static String getPath(Path path, boolean includeScheme) { + private static String getPath(Path path, boolean includeScheme, String removePartitionPattern) { URI uri = path.toUri(); - - if (includeScheme) { - return uri.toString(); - } else { - return uri.getHost() + uri.getPath(); + String uriPath = includeScheme ? uri.toString() : uri.getHost() + uri.getPath(); + if (removePartitionPattern != null) { + return uriPath.replaceAll(removePartitionPattern, ""); } + return uriPath; } private static String getPlatform(Path path) { @@ -30,9 +29,14 @@ private static String getPlatform(Path path) { } } - public HdfsPathDataset(Path path, String platformInstance, boolean includeScheme, FabricType fabricType) { + public HdfsPathDataset( + Path path, + String platformInstance, + boolean includeScheme, + FabricType fabricType, + String removePartitionPattern) { // TODO check static partitions? - this(getPath(path, includeScheme), platformInstance, getPlatform(path), fabricType); + this(getPath(path, includeScheme, removePartitionPattern), platformInstance, getPlatform(path), fabricType); } public HdfsPathDataset(String pathUri, String platformInstance, String platform, FabricType fabricType) {