Skip to content

Commit

Permalink
fix(spark):Add option to disable symlink resolution (datahub-project#…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored and sleeperdeep committed Jun 25, 2024
1 parent 5099075 commit c76d29b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;


@Builder
@Getter
@ToString
public class DatahubOpenlineageConfig {
@Builder.Default private final boolean isStreaming = false;
@Builder.Default private final String pipelineName = null;
Expand All @@ -30,6 +33,7 @@ public class DatahubOpenlineageConfig {
@Builder.Default private final boolean usePatch = true;
@Builder.Default private String hivePlatformAlias = "hive";
@Builder.Default private Map<String, String> urnAliases = new HashMap<>();
@Builder.Default private final boolean disableSymlinkResolution = false;

public List<PathSpec> getPathSpecsForPlatform(String platform) {
if ((pathSpecs == null) || (pathSpecs.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static Optional<DatasetUrn> convertOpenlineageDatasetToDatasetUrn(
String namespace = dataset.getNamespace();
String datasetName = dataset.getName();
Optional<DatasetUrn> datahubUrn;
if (dataset.getFacets() != null && dataset.getFacets().getSymlinks() != null) {
if (dataset.getFacets() != null && dataset.getFacets().getSymlinks() != null && !mappingConfig.isDisableSymlinkResolution()) {
Optional<DatasetUrn> originalUrn =
getDatasetUrnFromOlDataset(namespace, datasetName, mappingConfig);
for (OpenLineage.SymlinksDatasetFacetIdentifiers symlink :
Expand Down Expand Up @@ -581,6 +581,7 @@ private static void convertJobToDataJob(
OpenLineage.Job job = event.getJob();
DataJobInfo dji = new DataJobInfo();

log.debug("Datahub Config: {}", datahubConf);
if (job.getName().contains(".")) {

String jobName = job.getName().substring(job.getName().indexOf(".") + 1);
Expand Down
10 changes: 5 additions & 5 deletions metadata-integration/java/spark-lineage-beta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ When running jobs using spark-submit, the agent needs to be configured in the co

```text
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.10
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
```

## spark-submit command line

```sh
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.10 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.11 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
```

### Configuration Instructions: Amazon EMR
Expand All @@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)

```text
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.10
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.11
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
Expand All @@ -56,7 +56,7 @@ When running interactive jobs from a notebook, the listener can be configured wh
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.10")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage:0.2.11")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
Expand All @@ -79,7 +79,7 @@ appName("test-application")
config("spark.master","spark://spark-master:7077")
.

config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.10")
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.11")
.

config("spark.extraListeners","datahub.spark.DatahubSparkListener")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SparkConfigParser {

public static final String COALESCE_KEY = "coalesce_jobs";
public static final String PATCH_ENABLED = "patch.enabled";
public static final String DISABLE_SYMLINK_RESOLUTION = "disableSymlinkResolution";

public static final String STAGE_METADATA_COALESCING = "stage_metadata_coalescing";
public static final String STREAMING_JOB = "streaming_job";
Expand Down Expand Up @@ -150,6 +151,7 @@ public static DatahubOpenlineageConfig sparkConfigToDatahubOpenlineageConf(
builder.commonDatasetPlatformInstance(SparkConfigParser.getCommonPlatformInstance(sparkConfig));
builder.hivePlatformAlias(SparkConfigParser.getHivePlatformAlias(sparkConfig));
builder.usePatch(SparkConfigParser.isPatchEnabled(sparkConfig));
builder.disableSymlinkResolution(SparkConfigParser.isDisableSymlinkResolution(sparkConfig));
try {
String parentJob = SparkConfigParser.getParentJobKey(sparkConfig);
if (parentJob != null) {
Expand Down Expand Up @@ -320,6 +322,13 @@ public static boolean isPatchEnabled(Config datahubConfig) {
return datahubConfig.hasPath(PATCH_ENABLED) && datahubConfig.getBoolean(PATCH_ENABLED);
}

public static boolean isDisableSymlinkResolution(Config datahubConfig) {
if (!datahubConfig.hasPath(DISABLE_SYMLINK_RESOLUTION)) {
return false;
}
return datahubConfig.hasPath(DISABLE_SYMLINK_RESOLUTION) && datahubConfig.getBoolean(DISABLE_SYMLINK_RESOLUTION);
}

public static boolean isEmitCoalescePeriodically(Config datahubConfig) {
if (!datahubConfig.hasPath(STAGE_METADATA_COALESCING)) {
// if databricks tags are present and stage_metadata_coalescing is not present, then default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,35 @@ public void testProcessGlueOlEvent() throws URISyntaxException, IOException {
}
}


public void testProcessGlueOlEventSymlinkDisabled() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
builder.fabricType(FabricType.DEV);
builder.disableSymlinkResolution(true);

String olEvent =
IOUtils.toString(
this.getClass().getResourceAsStream("/ol_events/sample_glue.json"),
StandardCharsets.UTF_8);

OpenLineage.RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(olEvent);
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, builder.build());

assertNotNull(datahubJob);

for (DatahubDataset dataset : datahubJob.getInSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket-test/sample_data/input_data.parquet,DEV)",
dataset.getUrn().toString());
}
for (DatahubDataset dataset : datahubJob.getOutSet()) {
assertEquals(
"urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket-test/sample_data/output_data.parquet,DEV)",
dataset.getUrn().toString());
}
}

public void testProcessGlueOlEventWithHiveAlias() throws URISyntaxException, IOException {
DatahubOpenlineageConfig.DatahubOpenlineageConfigBuilder builder =
DatahubOpenlineageConfig.builder();
Expand Down

0 comments on commit c76d29b

Please sign in to comment.