Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add removing partition pattern in spark lineage #6605

Merged
merged 11 commits into from
Jan 24, 2023
29 changes: 15 additions & 14 deletions metadata-integration/java/spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,21 @@ The Spark agent can be configured using Databricks Cluster [Spark configuration]

## Configuration Options

| Field | Required | Default | Description |
|--------------------------------------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server | ✅ | | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
| spark.datahub.coalesce_jobs | | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |
| Field | Required | Default | Description |
|--------------------------------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| spark.jars.packages | ✅ | | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 |
| spark.extraListeners | ✅ | | datahub.spark.DatahubSparkListener |
| spark.datahub.rest.server | ✅ | | Datahub server url eg:<http://localhost:8080> |
| spark.datahub.rest.token | | | Authentication token. |
| spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! |
| spark.datahub.metadata.pipeline.platformInstance | | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance | | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.hive_platform_alias | | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to `glue` |
| spark.datahub.metadata.include_scheme | | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions |
| spark.datahub.metadata.remove_partition_pattern | | | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the default value! What is the default?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also what are sample values?

Copy link
Contributor Author

@ssilb4 ssilb4 Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it need to have default value. If it is null, It doesn't change any value.
I thought description is sample value, should I do more? (value: /partition=\d+)

| spark.datahub.coalesce_jobs | | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |

## What to Expect: The Metadata Model

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class DatasetExtractor {
private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
private static final String TABLE_HIVE_PLATFORM_ALIAS = "metadata.table.hive_platform_alias";
private static final String INCLUDE_SCHEME_KEY = "metadata.include_scheme";
private static final String REMOVE_PARTITION_PATTERN = "metadata.remove_partition_pattern";
// TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand

private DatasetExtractor() {
Expand Down Expand Up @@ -127,7 +128,7 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
}
return Optional.of(Collections.singletonList(new HdfsPathDataset(cmd.outputPath(),
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig))));
});

PLAN_TO_DATASET.put(LogicalRelation.class, (p, ctx, datahubConfig) -> {
Expand All @@ -151,7 +152,7 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
} else if (options.containsKey("path")) {
return Optional.of(Collections.singletonList(new HdfsPathDataset(new Path(options.get("path")),
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig))));
} else {
return Optional.empty();
}
Expand Down Expand Up @@ -191,7 +192,7 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
// TODO mapping to URN TBD
return Optional.of(Collections.singletonList(new HdfsPathDataset(res.get(0),
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
getCommonFabricType(datahubConfig), getRemovePartitionPattern(datahubConfig))));
});
REL_TO_DATASET.put(JDBCRelation.class, (r, ctx, datahubConfig) -> {
JDBCRelation rel = (JDBCRelation) r;
Expand Down Expand Up @@ -281,4 +282,9 @@ private static boolean getIncludeScheme(Config datahubConfig) {
return datahubConfig.hasPath(INCLUDE_SCHEME_KEY) ? datahubConfig.getBoolean(INCLUDE_SCHEME_KEY)
: true;
}

private static String getRemovePartitionPattern(Config datahubConfig) {
return datahubConfig.hasPath(REMOVE_PARTITION_PATTERN) ? datahubConfig.getString(REMOVE_PARTITION_PATTERN)
: null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
@ToString
public class HdfsPathDataset extends SparkDataset {

private static String getPath(Path path, boolean includeScheme) {
private static String getPath(Path path, boolean includeScheme, String removePartitionPattern) {
URI uri = path.toUri();

if (includeScheme) {
return uri.toString();
} else {
return uri.getHost() + uri.getPath();
String uriPath = includeScheme ? uri.toString() : uri.getHost() + uri.getPath();
if (removePartitionPattern != null) {
return uriPath.replaceAll(removePartitionPattern, "");
}
return uriPath;
}

private static String getPlatform(Path path) {
Expand All @@ -30,9 +29,14 @@ private static String getPlatform(Path path) {
}
}

public HdfsPathDataset(Path path, String platformInstance, boolean includeScheme, FabricType fabricType) {
public HdfsPathDataset(
Path path,
String platformInstance,
boolean includeScheme,
FabricType fabricType,
String removePartitionPattern) {
// TODO check static partitions?
this(getPath(path, includeScheme), platformInstance, getPlatform(path), fabricType);
this(getPath(path, includeScheme, removePartitionPattern), platformInstance, getPlatform(path), fabricType);
}

public HdfsPathDataset(String pathUri, String platformInstance, String platform, FabricType fabricType) {
Expand Down