Skip to content

Commit

Permalink
fix(patch): Add Finegrained Lineage patch support for DatajobInputOut…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored and sleeperdeep committed Dec 17, 2024
1 parent 201b08a commit db1e402
Show file tree
Hide file tree
Showing 8 changed files with 698 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.metadata.graph.LineageDirection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;

public class DataJobInputOutputPatchBuilder
Expand All @@ -24,6 +26,7 @@ public class DataJobInputOutputPatchBuilder
private static final String OUTPUT_DATASET_EDGES_PATH_START = "/outputDatasetEdges/";
private static final String INPUT_DATASET_FIELDS_PATH_START = "/inputDatasetFields/";
private static final String OUTPUT_DATASET_FIELDS_PATH_START = "/outputDatasetFields/";
private static final String FINE_GRAINED_PATH_START = "/fineGrainedLineages/";

// Simplified with just Urn
public DataJobInputOutputPatchBuilder addInputDatajobEdge(@Nonnull DataJobUrn dataJobUrn) {
Expand Down Expand Up @@ -136,6 +139,103 @@ public DataJobInputOutputPatchBuilder addEdge(
return this;
}

/**
* Adds a field as a fine grained upstream
*
* @param upstreamSchemaField a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for
* full confidence
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param downstreamSchemaField the downstream schema field this upstream is derived from, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param queryUrn query urn the relationship is derived from
* @return this builder
*/
public DataJobInputOutputPatchBuilder addFineGrainedUpstreamField(
@Nonnull Urn upstreamSchemaField,
@Nullable Float confidenceScore,
@Nonnull String transformationOperation,
@Nonnull Urn downstreamSchemaField,
@Nullable Urn queryUrn) {
Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore);
String finalQueryUrn;
if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) {
finalQueryUrn = "NONE";
} else {
finalQueryUrn = queryUrn.toString();
}

ObjectNode fineGrainedLineageNode = instance.objectNode();
fineGrainedLineageNode.put("confidenceScore", instance.numberNode(finalConfidenceScore));
pathValues.add(
ImmutableTriple.of(
PatchOperationType.ADD.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ encodeValueUrn(downstreamSchemaField)
+ "/"
+ finalQueryUrn
+ "/"
+ encodeValueUrn(upstreamSchemaField),
fineGrainedLineageNode));

return this;
}

private Float getConfidenceScoreOrDefault(@Nullable Float confidenceScore) {
float finalConfidenceScore;
if (confidenceScore != null && confidenceScore > 0 && confidenceScore <= 1.0f) {
finalConfidenceScore = confidenceScore;
} else {
finalConfidenceScore = 1.0f;
}

return finalConfidenceScore;
}

/**
* Removes a field as a fine grained upstream
*
* @param upstreamSchemaField a schema field to be marked as upstream, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param transformationOperation string operation type that describes the transformation
* operation happening in the lineage edge
* @param downstreamSchemaField the downstream schema field this upstream is derived from, format:
* urn:li:schemaField(DATASET_URN, COLUMN NAME)
* @param queryUrn query urn the relationship is derived from
* @return this builder
*/
public DataJobInputOutputPatchBuilder removeFineGrainedUpstreamField(
@Nonnull Urn upstreamSchemaField,
@Nonnull String transformationOperation,
@Nonnull Urn downstreamSchemaField,
@Nullable Urn queryUrn) {

String finalQueryUrn;
if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) {
finalQueryUrn = "NONE";
} else {
finalQueryUrn = queryUrn.toString();
}
pathValues.add(
ImmutableTriple.of(
PatchOperationType.REMOVE.getValue(),
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ encodeValueUrn(downstreamSchemaField)
+ "/"
+ finalQueryUrn
+ "/"
+ encodeValueUrn(upstreamSchemaField),
null));

return this;
}

public DataJobInputOutputPatchBuilder removeEdge(
@Nonnull Edge edge, @Nonnull LineageDirection direction) {
String path = getEdgePath(edge, direction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField(
FINE_GRAINED_PATH_START
+ transformationOperation
+ "/"
+ downstreamSchemaField
+ encodeValueUrn(downstreamSchemaField)
+ "/"
+ finalQueryUrn
+ "/"
Expand Down
Loading

0 comments on commit db1e402

Please sign in to comment.