Skip to content

Commit

Permalink
feat(ingest/spark): add removing partition pattern in spark lineage (#…
Browse files Browse the repository at this point in the history
…6605)

Co-authored-by: John Joyce <[email protected]>
  • Loading branch information
ssilb4 and jjoyce0510 authored Jan 24, 2023
1 parent ceaeb42 commit a6a597c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 25 deletions.
29 changes: 15 additions & 14 deletions metadata-integration/java/spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,21 @@ The Spark agent can be configured using Databricks Cluster [Spark configuration]

## Configuration Options

| Field | Required | Default | Description |
|--------------------------------------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages || | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
| spark.extraListeners || | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server || | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
| spark.datahub.coalesce_jobs | | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |
| Field | Required | Default | Description |
|--------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages || | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
| spark.extraListeners || | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server || | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table |
| spark.datahub.coalesce_jobs | | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |

## What to Expect: The Metadata Model

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DatasetExtractor {
private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
private static final String TABLE_HIVE_PLATFORM_ALIAS = "metadata.table.hive_platform_alias";
private static final String INCLUDE_SCHEME_KEY = "metadata.include_scheme";
private static final String REMOVE_PARTITION_PATTERN = "metadata.remove_partition_pattern";
// TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand

private DatasetExtractor() {
Expand Down Expand Up @@ -127,7 +128,7 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
}
return Optional.of(Collections.singletonList(new HdfsPathDataset(cmd.outputPath(),
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig))));
});

PLAN_TO_DATASET.put(LogicalRelation.class, (p, ctx, datahubConfig) -> {
Expand All @@ -151,7 +152,7 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
} else if (options.containsKey("path")) {
return Optional.of(Collections.singletonList(new HdfsPathDataset(new Path(options.get("path")),
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig))));
} else {
return Optional.empty();
}
Expand Down Expand Up @@ -191,7 +192,7 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
// TODO mapping to URN TBD
return Optional.of(Collections.singletonList(new HdfsPathDataset(res.get(0),
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig))));
});
REL_TO_DATASET.put(JDBCRelation.class, (r, ctx, datahubConfig) -> {
JDBCRelation rel = (JDBCRelation) r;
Expand Down Expand Up @@ -281,4 +282,9 @@ private static boolean getIncludeScheme(Config datahubConfig) {
return datahubConfig.hasPath(INCLUDE_SCHEME_KEY) ? datahubConfig.getBoolean(INCLUDE_SCHEME_KEY)
: true;
}

private static String getRemovePartitionPattern(Config datahubConfig) {
return datahubConfig.hasPath(REMOVE_PARTITION_PATTERN) ? datahubConfig.getString(REMOVE_PARTITION_PATTERN)
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
@ToString
public class HdfsPathDataset extends SparkDataset {

private static String getPath(Path path, boolean includeScheme) {
private static String getPath(Path path, boolean includeScheme, String removePartitionPattern) {
URI uri = path.toUri();

if (includeScheme) {
return uri.toString();
} else {
return uri.getHost() + uri.getPath();
String uriPath = includeScheme ? uri.toString() : uri.getHost() + uri.getPath();
if (removePartitionPattern != null) {
return uriPath.replaceAll(removePartitionPattern, "");
}
return uriPath;
}

private static String getPlatform(Path path) {
Expand All @@ -30,9 +29,14 @@ private static String getPlatform(Path path) {
}
}

public HdfsPathDataset(Path path, String platformInstance, boolean includeScheme, FabricType fabricType) {
public HdfsPathDataset(
Path path,
String platformInstance,
boolean includeScheme,
FabricType fabricType,
String removePartitionPattern) {
// TODO check static partitions?
this(getPath(path, includeScheme), platformInstance, getPlatform(path), fabricType);
this(getPath(path, includeScheme, removePartitionPattern), platformInstance, getPlatform(path), fabricType);
}

public HdfsPathDataset(String pathUri, String platformInstance, String platform, FabricType fabricType) {
Expand Down

0 comments on commit a6a597c

Please sign in to comment.