diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInputOutputPatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInputOutputPatchBuilder.java index 6fffb17521ddb7..14fc92a1bf3c86 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInputOutputPatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/DataJobInputOutputPatchBuilder.java @@ -15,6 +15,8 @@ import com.linkedin.metadata.aspect.patch.PatchOperationType; import com.linkedin.metadata.graph.LineageDirection; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutableTriple; public class DataJobInputOutputPatchBuilder @@ -24,6 +26,7 @@ public class DataJobInputOutputPatchBuilder private static final String OUTPUT_DATASET_EDGES_PATH_START = "/outputDatasetEdges/"; private static final String INPUT_DATASET_FIELDS_PATH_START = "/inputDatasetFields/"; private static final String OUTPUT_DATASET_FIELDS_PATH_START = "/outputDatasetFields/"; + private static final String FINE_GRAINED_PATH_START = "/fineGrainedLineages/"; // Simplified with just Urn public DataJobInputOutputPatchBuilder addInputDatajobEdge(@Nonnull DataJobUrn dataJobUrn) { @@ -136,6 +139,103 @@ public DataJobInputOutputPatchBuilder addEdge( return this; } + /** + * Adds a field as a fine grained upstream + * + * @param upstreamSchemaField a schema field to be marked as upstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for + * full confidence + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param downstreamSchemaField the downstream schema field this upstream is derived from, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param queryUrn query urn the relationship is derived from + * @return this builder + */ + public DataJobInputOutputPatchBuilder addFineGrainedUpstreamField( + @Nonnull Urn upstreamSchemaField, + @Nullable Float confidenceScore, + @Nonnull String transformationOperation, + @Nonnull Urn downstreamSchemaField, + @Nullable Urn queryUrn) { + Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); + String finalQueryUrn; + if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) { + finalQueryUrn = "NONE"; + } else { + finalQueryUrn = queryUrn.toString(); + } + + ObjectNode fineGrainedLineageNode = instance.objectNode(); + fineGrainedLineageNode.put("confidenceScore", instance.numberNode(finalConfidenceScore)); + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + encodeValueUrn(downstreamSchemaField) + + "/" + + finalQueryUrn + + "/" + + encodeValueUrn(upstreamSchemaField), + fineGrainedLineageNode)); + + return this; + } + + private Float getConfidenceScoreOrDefault(@Nullable Float confidenceScore) { + float finalConfidenceScore; + if (confidenceScore != null && confidenceScore > 0 && confidenceScore <= 1.0f) { + finalConfidenceScore = confidenceScore; + } else { + finalConfidenceScore = 1.0f; + } + + return finalConfidenceScore; + } + + /** + * Removes a field as a fine grained upstream + * + * @param upstreamSchemaField a schema field to be marked as upstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param downstreamSchemaField the downstream schema field this upstream is derived from, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param queryUrn query urn the relationship is derived from + * @return this builder + */ + public DataJobInputOutputPatchBuilder removeFineGrainedUpstreamField( + @Nonnull Urn upstreamSchemaField, + @Nonnull String transformationOperation, + @Nonnull Urn downstreamSchemaField, + @Nullable Urn queryUrn) { + + String finalQueryUrn; + if (queryUrn == null || StringUtils.isBlank(queryUrn.toString())) { + finalQueryUrn = "NONE"; + } else { + finalQueryUrn = queryUrn.toString(); + } + pathValues.add( + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + encodeValueUrn(downstreamSchemaField) + + "/" + + finalQueryUrn + + "/" + + encodeValueUrn(upstreamSchemaField), + null)); + + return this; + } + public DataJobInputOutputPatchBuilder removeEdge( @Nonnull Edge edge, @Nonnull LineageDirection direction) { String path = getEdgePath(edge, direction); diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java index 08182761aeb03f..d0a46a35d51820 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/builder/UpstreamLineagePatchBuilder.java @@ -142,7 +142,7 @@ public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField( FINE_GRAINED_PATH_START + transformationOperation + "/" - + downstreamSchemaField + + encodeValueUrn(downstreamSchemaField) + "/" + finalQueryUrn + "/" diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/FineGrainedLineageTemplateHelper.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/FineGrainedLineageTemplateHelper.java new file mode 100644 index 00000000000000..1f6a58c52ba248 --- /dev/null +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/FineGrainedLineageTemplateHelper.java @@ -0,0 +1,282 @@ +package com.linkedin.metadata.aspect.patch.template; + +import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*; +import static com.linkedin.metadata.Constants.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Streams; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.codehaus.plexus.util.StringUtils; + +public class FineGrainedLineageTemplateHelper { + + private static final String FINE_GRAINED_UPSTREAM_TYPE = "upstreamType"; + private static final String FINE_GRAINED_UPSTREAMS = "upstreams"; + private static final String FINE_GRAINED_DOWNSTREAM_TYPE = "downstreamType"; + private static final String FINE_GRAINED_DOWNSTREAMS = "downstreams"; + private static final String FINE_GRAINED_TRANSFORMATION_OPERATION = "transformOperation"; + private static final String FINE_GRAINED_CONFIDENCE_SCORE = "confidenceScore"; + private static final String FINE_GRAINED_QUERY_ID = "query"; + + // Template support + private static final String NONE_TRANSFORMATION_TYPE = "NONE"; + private static final Float DEFAULT_CONFIDENCE_SCORE = 1.0f; + private static final String DEFAULT_QUERY_ID = "NONE"; + + /** + * Combines fine grained lineage array into a map using upstream and downstream types as keys, + * defaulting when not present. Due to this construction, patches will look like: path: + * /fineGrainedLineages/TRANSFORMATION_OPERATION/DOWNSTREAM_FIELD_URN/QUERY_ID/UPSTREAM_FIELD_URN, + * op: ADD/REMOVE, value: float (confidenceScore) Due to the way FineGrainedLineage was designed + * it doesn't necessarily have a consistent key we can reference, so this specialized method + * mimics the arrayFieldToMap of the super class with the specialization that it does not put the + * full value of the aspect at the end of the key, just the particular array. This prevents + * unintended overwrites through improper MCP construction that is technically allowed by the + * schema when combining under fields that form the natural key. + * + * @param fineGrainedLineages the fine grained lineage array node + * @return the modified {@link JsonNode} with array fields transformed to maps + */ + public static JsonNode combineAndTransformFineGrainedLineages( + @Nullable JsonNode fineGrainedLineages) { + ObjectNode mapNode = instance.objectNode(); + if (!(fineGrainedLineages instanceof ArrayNode) || fineGrainedLineages.isEmpty()) { + return mapNode; + } + JsonNode lineageCopy = fineGrainedLineages.deepCopy(); + + lineageCopy + .elements() + .forEachRemaining( + node -> { + JsonNode nodeClone = node.deepCopy(); + String transformationOperation = + nodeClone.has(FINE_GRAINED_TRANSFORMATION_OPERATION) + ? nodeClone.get(FINE_GRAINED_TRANSFORMATION_OPERATION).asText() + : NONE_TRANSFORMATION_TYPE; + + if (!mapNode.has(transformationOperation)) { + mapNode.set(transformationOperation, instance.objectNode()); + } + ObjectNode transformationOperationNode = + (ObjectNode) mapNode.get(transformationOperation); + + ArrayNode downstreams = + nodeClone.has(FINE_GRAINED_DOWNSTREAMS) + ? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS) + : null; + + if (downstreams == null || downstreams.size() != 1) { + throw new UnsupportedOperationException( + "Patching not supported on fine grained lineages with not" + + " exactly one downstream. Current fine grained lineage implementation is downstream derived and " + + "patches are keyed on the root of this derivation."); + } + + Float confidenceScore = + nodeClone.has(FINE_GRAINED_CONFIDENCE_SCORE) + ? nodeClone.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue() + : DEFAULT_CONFIDENCE_SCORE; + + String upstreamType = + nodeClone.has(FINE_GRAINED_UPSTREAM_TYPE) + ? nodeClone.get(FINE_GRAINED_UPSTREAM_TYPE).asText() + : null; + String downstreamType = + nodeClone.has(FINE_GRAINED_DOWNSTREAM_TYPE) + ? nodeClone.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText() + : null; + ArrayNode upstreams = + nodeClone.has(FINE_GRAINED_UPSTREAMS) + ? (ArrayNode) nodeClone.get(FINE_GRAINED_UPSTREAMS) + : null; + + String queryId = + nodeClone.has(FINE_GRAINED_QUERY_ID) + ? nodeClone.get(FINE_GRAINED_QUERY_ID).asText() + : DEFAULT_QUERY_ID; + + if (upstreamType == null) { + // Determine default type + Urn upstreamUrn = + upstreams != null ? UrnUtils.getUrn(upstreams.get(0).asText()) : null; + if (upstreamUrn != null + && DATASET_ENTITY_NAME.equals(upstreamUrn.getEntityType())) { + upstreamType = FINE_GRAINED_LINEAGE_DATASET_TYPE; + } else { + upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; + } + } + + if (downstreamType == null) { + // Always use FIELD type, only support patches for single field downstream + downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE; + } + + String downstreamRoot = downstreams.get(0).asText(); + if (!transformationOperationNode.has(downstreamRoot)) { + transformationOperationNode.set(downstreamRoot, instance.objectNode()); + } + ObjectNode downstreamRootNode = + (ObjectNode) transformationOperationNode.get(downstreamRoot); + if (!downstreamRootNode.has(queryId)) { + downstreamRootNode.set(queryId, instance.objectNode()); + } + ObjectNode queryNode = (ObjectNode) downstreamRootNode.get(queryId); + if (upstreams != null) { + addUrnsToParent( + queryNode, upstreams, confidenceScore, upstreamType, downstreamType); + } + }); + return mapNode; + } + + private static void addUrnsToParent( + JsonNode parentNode, + ArrayNode urnsList, + Float confidenceScore, + String upstreamType, + String downstreamType) { + // Will overwrite repeat urns with different confidence scores with the most recently seen + ((ObjectNode) parentNode) + .setAll( + Streams.stream(urnsList.elements()) + .map(JsonNode::asText) + .distinct() + .collect( + Collectors.toMap( + urn -> urn, + urn -> + mapToLineageValueNode(confidenceScore, upstreamType, downstreamType)))); + } + + private static JsonNode mapToLineageValueNode( + Float confidenceScore, String upstreamType, String downstreamType) { + ObjectNode objectNode = instance.objectNode(); + objectNode.set(FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(confidenceScore)); + objectNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType)); + objectNode.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType)); + return objectNode; + } + + /** + * Takes the transformed fine grained lineages map from pre-processing and reconstructs an array + * of FineGrainedLineages Avoids producing side effects by copying nodes, use resulting node and + * not the original + * + * @param transformedFineGrainedLineages the transformed fine grained lineage map + * @return the modified {@link JsonNode} formatted consistent with the original schema + */ + public static ArrayNode reconstructFineGrainedLineages(JsonNode transformedFineGrainedLineages) { + if (transformedFineGrainedLineages instanceof ArrayNode) { + // We already have an ArrayNode, no need to transform. This happens during `replace` + // operations + return (ArrayNode) transformedFineGrainedLineages; + } + ObjectNode mapNode = (ObjectNode) transformedFineGrainedLineages; + ArrayNode fineGrainedLineages = instance.arrayNode(); + + mapNode + .fieldNames() + .forEachRemaining( + transformationOperation -> { + final ObjectNode transformationOperationNode = + (ObjectNode) mapNode.get(transformationOperation); + transformationOperationNode + .fieldNames() + .forEachRemaining( + downstreamName -> { + final ObjectNode downstreamNode = + (ObjectNode) transformationOperationNode.get(downstreamName); + downstreamNode + .fieldNames() + .forEachRemaining( + queryId -> + buildFineGrainedLineage( + downstreamName, + downstreamNode, + queryId, + transformationOperation, + fineGrainedLineages)); + }); + }); + + return fineGrainedLineages; + } + + private static void buildFineGrainedLineage( + final String downstreamName, + final ObjectNode downstreamNode, + final String queryId, + final String transformationOperation, + final ArrayNode fineGrainedLineages) { + final ObjectNode fineGrainedLineage = instance.objectNode(); + final ObjectNode queryNode = (ObjectNode) downstreamNode.get(queryId); + if (queryNode.isEmpty()) { + // Short circuit if no upstreams left + return; + } + ArrayNode downstream = instance.arrayNode(); + downstream.add(instance.textNode(downstreamName)); + // Set defaults, if found in sub nodes override, for confidenceScore take lowest + AtomicReference minimumConfidenceScore = new AtomicReference<>(DEFAULT_CONFIDENCE_SCORE); + AtomicReference upstreamType = + new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_SET_TYPE); + AtomicReference downstreamType = new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_TYPE); + ArrayNode upstreams = instance.arrayNode(); + queryNode + .fieldNames() + .forEachRemaining( + upstream -> + processUpstream( + queryNode, + upstream, + minimumConfidenceScore, + upstreamType, + downstreamType, + upstreams)); + fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAMS, downstream); + fineGrainedLineage.set(FINE_GRAINED_UPSTREAMS, upstreams); + if (StringUtils.isNotBlank(queryId) && !DEFAULT_QUERY_ID.equals(queryId)) { + fineGrainedLineage.set(FINE_GRAINED_QUERY_ID, instance.textNode(queryId)); + } + fineGrainedLineage.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType.get())); + fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType.get())); + fineGrainedLineage.set( + FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(minimumConfidenceScore.get())); + fineGrainedLineage.set( + FINE_GRAINED_TRANSFORMATION_OPERATION, instance.textNode(transformationOperation)); + fineGrainedLineages.add(fineGrainedLineage); + } + + private static void processUpstream( + final ObjectNode queryNode, + final String upstream, + final AtomicReference minimumConfidenceScore, + final AtomicReference upstreamType, + final AtomicReference downstreamType, + final ArrayNode upstreams) { + final ObjectNode upstreamNode = (ObjectNode) queryNode.get(upstream); + if (upstreamNode.has(FINE_GRAINED_CONFIDENCE_SCORE)) { + Float scoreValue = upstreamNode.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue(); + if (scoreValue <= minimumConfidenceScore.get()) { + minimumConfidenceScore.set(scoreValue); + } + } + // Set types to last encountered, should never change, but this at least tries to support + // other types being specified. + if (upstreamNode.has(FINE_GRAINED_UPSTREAM_TYPE)) { + upstreamType.set(upstreamNode.get(FINE_GRAINED_UPSTREAM_TYPE).asText()); + } + if (upstreamNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) { + downstreamType.set(upstreamNode.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText()); + } + upstreams.add(instance.textNode(upstream)); + } +} diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/TemplateUtil.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/TemplateUtil.java index 2423e37e6d5419..23879ad1c2e353 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/TemplateUtil.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/TemplateUtil.java @@ -84,7 +84,7 @@ public static JsonNode populateTopLevelKeys(JsonNode transformedNode, JsonPatch // Skip first as it will always be blank due to path starting with / for (int i = 1; i < endIdx; i++) { String decodedKey = decodeValue(keys[i]); - if (parent.get(keys[i]) == null) { + if (parent.get(decodedKey) == null) { ((ObjectNode) parent).set(decodedKey, instance.objectNode()); } parent = parent.get(decodedKey); diff --git a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/datajob/DataJobInputOutputTemplate.java b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/datajob/DataJobInputOutputTemplate.java index 3d398d97b50c38..ef26eed2f814f8 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/datajob/DataJobInputOutputTemplate.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/aspect/patch/template/datajob/DataJobInputOutputTemplate.java @@ -1,6 +1,10 @@ package com.linkedin.metadata.aspect.patch.template.datajob; +import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*; +import static com.linkedin.metadata.Constants.*; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.DataJobUrnArray; import com.linkedin.common.DatasetUrnArray; import com.linkedin.common.EdgeArray; @@ -9,6 +13,7 @@ import com.linkedin.datajob.DataJobInputOutput; import com.linkedin.dataset.FineGrainedLineageArray; import com.linkedin.metadata.aspect.patch.template.ArrayMergingTemplate; +import com.linkedin.metadata.aspect.patch.template.FineGrainedLineageTemplateHelper; import java.util.Collections; import javax.annotation.Nonnull; @@ -23,6 +28,8 @@ public class DataJobInputOutputTemplate implements ArrayMergingTemplate { @@ -27,18 +19,6 @@ public class UpstreamLineageTemplate extends CompoundKeyTemplate { - JsonNode nodeClone = node.deepCopy(); - String transformationOperation = - nodeClone.has(FINE_GRAINED_TRANSFORMATION_OPERATION) - ? nodeClone.get(FINE_GRAINED_TRANSFORMATION_OPERATION).asText() - : NONE_TRANSFORMATION_TYPE; - - if (!mapNode.has(transformationOperation)) { - mapNode.set(transformationOperation, instance.objectNode()); - } - ObjectNode transformationOperationNode = - (ObjectNode) mapNode.get(transformationOperation); - - ArrayNode downstreams = - nodeClone.has(FINE_GRAINED_DOWNSTREAMS) - ? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS) - : null; - - if (downstreams == null || downstreams.size() != 1) { - throw new UnsupportedOperationException( - "Patching not supported on fine grained lineages with not" - + " exactly one downstream. Current fine grained lineage implementation is downstream derived and " - + "patches are keyed on the root of this derivation."); - } - - Float confidenceScore = - nodeClone.has(FINE_GRAINED_CONFIDENCE_SCORE) - ? nodeClone.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue() - : DEFAULT_CONFIDENCE_SCORE; - - String upstreamType = - nodeClone.has(FINE_GRAINED_UPSTREAM_TYPE) - ? nodeClone.get(FINE_GRAINED_UPSTREAM_TYPE).asText() - : null; - String downstreamType = - nodeClone.has(FINE_GRAINED_DOWNSTREAM_TYPE) - ? nodeClone.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText() - : null; - ArrayNode upstreams = - nodeClone.has(FINE_GRAINED_UPSTREAMS) - ? (ArrayNode) nodeClone.get(FINE_GRAINED_UPSTREAMS) - : null; - - String queryId = - nodeClone.has(FINE_GRAINED_QUERY_ID) - ? nodeClone.get(FINE_GRAINED_QUERY_ID).asText() - : DEFAULT_QUERY_ID; - - if (upstreamType == null) { - // Determine default type - Urn upstreamUrn = - upstreams != null ? UrnUtils.getUrn(upstreams.get(0).asText()) : null; - if (upstreamUrn != null - && DATASET_ENTITY_NAME.equals(upstreamUrn.getEntityType())) { - upstreamType = FINE_GRAINED_LINEAGE_DATASET_TYPE; - } else { - upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; - } - } - - if (downstreamType == null) { - // Always use FIELD type, only support patches for single field downstream - downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE; - } - - String downstreamRoot = downstreams.get(0).asText(); - if (!transformationOperationNode.has(downstreamRoot)) { - transformationOperationNode.set(downstreamRoot, instance.objectNode()); - } - ObjectNode downstreamRootNode = - (ObjectNode) transformationOperationNode.get(downstreamRoot); - if (!downstreamRootNode.has(queryId)) { - downstreamRootNode.set(queryId, instance.objectNode()); - } - ObjectNode queryNode = (ObjectNode) downstreamRootNode.get(queryId); - if (upstreams != null) { - addUrnsToParent( - queryNode, upstreams, confidenceScore, upstreamType, downstreamType); - } - }); - return mapNode; - } - - private void addUrnsToParent( - JsonNode parentNode, - ArrayNode urnsList, - Float confidenceScore, - String upstreamType, - String downstreamType) { - // Will overwrite repeat urns with different confidence scores with the most recently seen - ((ObjectNode) parentNode) - .setAll( - Streams.stream(urnsList.elements()) - .map(JsonNode::asText) - .distinct() - .collect( - Collectors.toMap( - urn -> urn, - urn -> - mapToLineageValueNode(confidenceScore, upstreamType, downstreamType)))); - } - - private JsonNode mapToLineageValueNode( - Float confidenceScore, String upstreamType, String downstreamType) { - ObjectNode objectNode = instance.objectNode(); - objectNode.set(FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(confidenceScore)); - objectNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType)); - objectNode.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType)); - return objectNode; - } - - /** - * Takes the transformed fine grained lineages map from pre-processing and reconstructs an array - * of FineGrainedLineages Avoids producing side effects by copying nodes, use resulting node and - * not the original - * - * @param transformedFineGrainedLineages the transformed fine grained lineage map - * @return the modified {@link JsonNode} formatted consistent with the original schema - */ - private ArrayNode reconstructFineGrainedLineages(JsonNode transformedFineGrainedLineages) { - if (transformedFineGrainedLineages instanceof ArrayNode) { - // We already have an ArrayNode, no need to transform. This happens during `replace` - // operations - return (ArrayNode) transformedFineGrainedLineages; - } - ObjectNode mapNode = (ObjectNode) transformedFineGrainedLineages; - ArrayNode fineGrainedLineages = instance.arrayNode(); - - mapNode - .fieldNames() - .forEachRemaining( - transformationOperation -> { - final ObjectNode transformationOperationNode = - (ObjectNode) mapNode.get(transformationOperation); - transformationOperationNode - .fieldNames() - .forEachRemaining( - downstreamName -> { - final ObjectNode downstreamNode = - (ObjectNode) transformationOperationNode.get(downstreamName); - downstreamNode - .fieldNames() - .forEachRemaining( - queryId -> - buildFineGrainedLineage( - downstreamName, - downstreamNode, - queryId, - transformationOperation, - fineGrainedLineages)); - }); - }); - - return fineGrainedLineages; - } - - private void buildFineGrainedLineage( - final String downstreamName, - final ObjectNode downstreamNode, - final String queryId, - final String transformationOperation, - final ArrayNode fineGrainedLineages) { - final ObjectNode fineGrainedLineage = instance.objectNode(); - final ObjectNode queryNode = (ObjectNode) downstreamNode.get(queryId); - if (queryNode.isEmpty()) { - // Short circuit if no upstreams left - return; - } - ArrayNode downstream = instance.arrayNode(); - downstream.add(instance.textNode(downstreamName)); - // Set defaults, if found in sub nodes override, for confidenceScore take lowest - AtomicReference minimumConfidenceScore = new AtomicReference<>(DEFAULT_CONFIDENCE_SCORE); - AtomicReference upstreamType = - new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_SET_TYPE); - AtomicReference downstreamType = new AtomicReference<>(FINE_GRAINED_LINEAGE_FIELD_TYPE); - ArrayNode upstreams = instance.arrayNode(); - queryNode - .fieldNames() - .forEachRemaining( - upstream -> - processUpstream( - queryNode, - upstream, - minimumConfidenceScore, - upstreamType, - downstreamType, - upstreams)); - fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAMS, downstream); - fineGrainedLineage.set(FINE_GRAINED_UPSTREAMS, upstreams); - if (StringUtils.isNotBlank(queryId) && !DEFAULT_QUERY_ID.equals(queryId)) { - fineGrainedLineage.set(FINE_GRAINED_QUERY_ID, instance.textNode(queryId)); - } - fineGrainedLineage.set(FINE_GRAINED_UPSTREAM_TYPE, instance.textNode(upstreamType.get())); - fineGrainedLineage.set(FINE_GRAINED_DOWNSTREAM_TYPE, instance.textNode(downstreamType.get())); - fineGrainedLineage.set( - FINE_GRAINED_CONFIDENCE_SCORE, instance.numberNode(minimumConfidenceScore.get())); - fineGrainedLineage.set( - FINE_GRAINED_TRANSFORMATION_OPERATION, instance.textNode(transformationOperation)); - fineGrainedLineages.add(fineGrainedLineage); - } - - private void processUpstream( - final ObjectNode queryNode, - final String upstream, - final AtomicReference minimumConfidenceScore, - final AtomicReference upstreamType, - final AtomicReference downstreamType, - final ArrayNode upstreams) { - final ObjectNode upstreamNode = (ObjectNode) queryNode.get(upstream); - if (upstreamNode.has(FINE_GRAINED_CONFIDENCE_SCORE)) { - Float scoreValue = upstreamNode.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue(); - if (scoreValue <= minimumConfidenceScore.get()) { - minimumConfidenceScore.set(scoreValue); - } - } - // Set types to last encountered, should never change, but this at least tries to support - // other types being specified. - if (upstreamNode.has(FINE_GRAINED_UPSTREAM_TYPE)) { - upstreamType.set(upstreamNode.get(FINE_GRAINED_UPSTREAM_TYPE).asText()); - } - if (upstreamNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) { - downstreamType.set(upstreamNode.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText()); - } - upstreams.add(instance.textNode(upstream)); - } } diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/DataJobInputOutputTemplateTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/DataJobInputOutputTemplateTest.java new file mode 100644 index 00000000000000..d2a26221a3bb9f --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/DataJobInputOutputTemplateTest.java @@ -0,0 +1,255 @@ +package com.linkedin.metadata.aspect.patch.template; + +import static com.linkedin.metadata.utils.GenericRecordUtils.*; +import static org.testng.Assert.*; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.common.UrnArray; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.DataMap; +import com.linkedin.datajob.DataJobInputOutput; +import com.linkedin.dataset.FineGrainedLineage; +import com.linkedin.dataset.FineGrainedLineageDownstreamType; +import com.linkedin.dataset.FineGrainedLineageUpstreamType; +import com.linkedin.metadata.aspect.patch.template.datajob.DataJobInputOutputTemplate; +import jakarta.json.Json; +import jakarta.json.JsonObjectBuilder; +import jakarta.json.JsonPatch; +import jakarta.json.JsonPatchBuilder; +import jakarta.json.JsonValue; +import org.testng.annotations.Test; + +public class DataJobInputOutputTemplateTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Test + public void testPatchUpstream() throws Exception { + DataJobInputOutputTemplate dataJobInputOutputTemplate = new DataJobInputOutputTemplate(); + DataJobInputOutput dataJobInputOutput = dataJobInputOutputTemplate.getDefault(); + JsonPatchBuilder jsonPatchBuilder = Json.createPatchBuilder(); + + JsonObjectBuilder fineGrainedLineageNode = Json.createObjectBuilder(); + JsonValue upstreamConfidenceScore = Json.createValue(1.0f); + fineGrainedLineageNode.add("confidenceScore", upstreamConfidenceScore); + jsonPatchBuilder.add( + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)//urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)", + fineGrainedLineageNode.build()); + + // Initial population test + DataJobInputOutput result = + dataJobInputOutputTemplate.applyPatch(dataJobInputOutput, jsonPatchBuilder.build()); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap = new DataMap(); + dataMap.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); + UrnArray urns = new UrnArray(); + Urn urn1 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); + urns.add(urn1); + UrnArray upstreams = new UrnArray(); + Urn upstreamUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)"); + upstreams.add(upstreamUrn); + fineGrainedLineage.setDownstreams(urns); + fineGrainedLineage.setUpstreams(upstreams); + fineGrainedLineage.setTransformOperation("CREATE"); + fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage); + + // Test non-overwrite upstreams and correct confidence score and types w/ overwrite + JsonObjectBuilder finegrainedLineageNode2 = Json.createObjectBuilder(); + finegrainedLineageNode2.add( + "upstreamType", Json.createValue(FineGrainedLineageUpstreamType.FIELD_SET.name())); + finegrainedLineageNode2.add("confidenceScore", upstreamConfidenceScore); + finegrainedLineageNode2.add( + "downstreamType", Json.createValue(FineGrainedLineageDownstreamType.FIELD.name())); + + JsonPatchBuilder patchOperations2 = Json.createPatchBuilder(); + patchOperations2.add( + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)", + finegrainedLineageNode2.build()); + + JsonValue upstreamConfidenceScore2 = Json.createValue(0.1f); + JsonObjectBuilder finegrainedLineageNode3 = Json.createObjectBuilder(); + finegrainedLineageNode3.add( + "upstreamType", Json.createValue(FineGrainedLineageUpstreamType.DATASET.name())); + finegrainedLineageNode3.add("confidenceScore", upstreamConfidenceScore2); + finegrainedLineageNode3.add( + "downstreamType", Json.createValue(FineGrainedLineageDownstreamType.FIELD_SET.name())); + + patchOperations2.add( + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)", + finegrainedLineageNode3.build()); + + JsonPatch jsonPatch2 = patchOperations2.build(); + + DataJobInputOutput result2 = dataJobInputOutputTemplate.applyPatch(result, jsonPatch2); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap2 = new DataMap(); + dataMap2.put("confidenceScore", 0.1); + FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); + UrnArray urns2 = new UrnArray(); + Urn urn2 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + urns2.add(urn2); + Urn downstreamUrn2 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)"); + UrnArray downstreams2 = new UrnArray(); + downstreams2.add(downstreamUrn2); + fineGrainedLineage2.setUpstreams(urns2); + fineGrainedLineage2.setDownstreams(downstreams2); + fineGrainedLineage2.setTransformOperation("CREATE"); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); + fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage2.setQuery(UrnUtils.getUrn("urn:li:query:someQuery")); + assertEquals(result2.getFineGrainedLineages().get(1), fineGrainedLineage2); + + // Check different queries + JsonObjectBuilder finegrainedLineageNode4 = Json.createObjectBuilder(); + finegrainedLineageNode4.add( + "upstreamType", Json.createValue(FineGrainedLineageUpstreamType.FIELD_SET.name())); + finegrainedLineageNode4.add("confidenceScore", upstreamConfidenceScore); + finegrainedLineageNode4.add( + "downstreamType", Json.createValue(FineGrainedLineageDownstreamType.FIELD.name())); + + JsonPatchBuilder patchOperations3 = Json.createPatchBuilder(); + patchOperations3.add( + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)", + finegrainedLineageNode4.build()); + + JsonPatch jsonPatch3 = patchOperations3.build(); + DataJobInputOutput result3 = dataJobInputOutputTemplate.applyPatch(result2, jsonPatch3); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap3 = new DataMap(); + dataMap3.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3); + UrnArray urns3 = new UrnArray(); + Urn urn3 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)"); + urns3.add(urn3); + + Urn upstreamUrn3 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + UrnArray upstreamUrns3 = new UrnArray(); + upstreamUrns3.add(upstreamUrn3); + fineGrainedLineage3.setDownstreams(urns3); + fineGrainedLineage3.setUpstreams(upstreamUrns3); + fineGrainedLineage3.setTransformOperation("CREATE"); + fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage3.setQuery(UrnUtils.getUrn("urn:li:query:anotherQuery")); + // Splits into two for different types + assertEquals(result3.getFineGrainedLineages().get(2), fineGrainedLineage3); + + // Check different transform types + JsonObjectBuilder finegrainedLineageNode5 = Json.createObjectBuilder(); + finegrainedLineageNode5.add( + "upstreamType", Json.createValue(FineGrainedLineageUpstreamType.FIELD_SET.name())); + finegrainedLineageNode5.add("confidenceScore", upstreamConfidenceScore); + finegrainedLineageNode5.add( + "downstreamType", Json.createValue(FineGrainedLineageDownstreamType.FIELD.name())); + + JsonPatchBuilder patchOperations4 = Json.createPatchBuilder(); + patchOperations4.add( + "/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)", + finegrainedLineageNode5.build()); + JsonPatch jsonPatch4 = patchOperations4.build(); + + DataJobInputOutput result4 = dataJobInputOutputTemplate.applyPatch(result3, jsonPatch4); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap4 = new DataMap(); + dataMap4.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4); + fineGrainedLineage4.setUpstreams(upstreamUrns3); + fineGrainedLineage4.setDownstreams(urns3); + fineGrainedLineage4.setTransformOperation("TRANSFORM"); + fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage4.setQuery(UrnUtils.getUrn("urn:li:query:anotherQuery")); + // New entry in array because of new transformation type + assertEquals(result4.getFineGrainedLineages().get(3), fineGrainedLineage4); + + // Remove + JsonPatchBuilder removeOperations = Json.createPatchBuilder(); + removeOperations.remove( + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)/NONE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)"); + removeOperations.remove( + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:someQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + removeOperations.remove( + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + removeOperations.remove( + "/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)/urn:li:query:anotherQuery/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + + JsonPatch removePatch = removeOperations.build(); + DataJobInputOutput finalResult = dataJobInputOutputTemplate.applyPatch(result4, removePatch); + assertEquals(finalResult, dataJobInputOutputTemplate.getDefault()); + } + + @Test + public void testPatchWithFieldWithForwardSlash() throws JsonProcessingException { + + String downstreamUrn = + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"; + String unescapedUpstreamUrn = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),slash/column)"; + String escapedUpstreamUrn = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),slash~1column)"; + String lineagePath = downstreamUrn + "//" + escapedUpstreamUrn; + + DataJobInputOutputTemplate dataJobInputOutputTemplate = new DataJobInputOutputTemplate(); + DataJobInputOutput dataJobInputOutput = dataJobInputOutputTemplate.getDefault(); + JsonPatchBuilder jsonPatchBuilder = Json.createPatchBuilder(); + + JsonObjectBuilder fineGrainedLineageNode = Json.createObjectBuilder(); + JsonValue upstreamConfidenceScore = Json.createValue(1.0f); + fineGrainedLineageNode.add("confidenceScore", upstreamConfidenceScore); + + jsonPatchBuilder.add(lineagePath, fineGrainedLineageNode.build()); + + // Initial population test + DataJobInputOutput result = + dataJobInputOutputTemplate.applyPatch(dataJobInputOutput, jsonPatchBuilder.build()); + + assertEquals( + result.getFineGrainedLineages().get(0).getUpstreams().get(0).toString(), + unescapedUpstreamUrn); + } + + @Test + public void testPatchWithFieldWithTilde() throws JsonProcessingException { + + String downstreamUrn = + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"; + String unescapedUpstreamUrn = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),tilde~column)"; + String escapedUpstreamUrn = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),tilde~0column)"; + String lineagePath = downstreamUrn + "//" + escapedUpstreamUrn; + + DataJobInputOutputTemplate dataJobInputOutputTemplate = new DataJobInputOutputTemplate(); + DataJobInputOutput dataJobInputOutput = dataJobInputOutputTemplate.getDefault(); + JsonPatchBuilder jsonPatchBuilder = Json.createPatchBuilder(); + + JsonObjectBuilder fineGrainedLineageNode = Json.createObjectBuilder(); + JsonValue upstreamConfidenceScore = Json.createValue(1.0f); + fineGrainedLineageNode.add("confidenceScore", upstreamConfidenceScore); + + jsonPatchBuilder.add(lineagePath, fineGrainedLineageNode.build()); + + // Initial population test + DataJobInputOutput result = + dataJobInputOutputTemplate.applyPatch(dataJobInputOutput, jsonPatchBuilder.build()); + assertEquals( + result.getFineGrainedLineages().get(0).getUpstreams().get(0).toString(), + unescapedUpstreamUrn); + } +} diff --git a/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java index f934dd8961ca37..ab0e7f960251c4 100644 --- a/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java +++ b/entity-registry/src/test/java/com/linkedin/metadata/aspect/patch/template/UpstreamLineageTemplateTest.java @@ -221,6 +221,7 @@ public void testPatchUpstream() throws Exception { JsonPatch removePatch = removeOperations.build(); UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch); + assertEquals(finalResult, upstreamLineageTemplate.getDefault()); } @@ -337,4 +338,39 @@ public void testPatchWithFieldWithTilde() throws JsonProcessingException { result.getFineGrainedLineages().get(0).getUpstreams().get(0).toString(), unescapedUpstreamUrn); } + + @Test + public void testPatchRemoveWithFields() throws JsonProcessingException { + + String downstreamUrn = + "/fineGrainedLineages/CREATE/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,~1tmp~1test.parquet,PROD),c1)"; + String upstreamUrn = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c1)"; + String upstreamUrn2 = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD),c2)"; + + String lineagePath1 = downstreamUrn + "/NONE/" + upstreamUrn; + String lineagePath2 = downstreamUrn + "/NONE/" + upstreamUrn2; + + UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); + UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); + JsonPatchBuilder jsonPatchBuilder = Json.createPatchBuilder(); + + JsonObjectBuilder fineGrainedLineageNode = Json.createObjectBuilder(); + JsonValue upstreamConfidenceScore = Json.createValue(1.0f); + fineGrainedLineageNode.add("confidenceScore", upstreamConfidenceScore); + + jsonPatchBuilder.add(lineagePath1, fineGrainedLineageNode.build()); + jsonPatchBuilder.add(lineagePath2, fineGrainedLineageNode.build()); + + // Initial population test + UpstreamLineage result = + upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatchBuilder.build()); + assertEquals( + result.getFineGrainedLineages().get(0).getUpstreams().get(0).toString(), upstreamUrn); + assertEquals( + result.getFineGrainedLineages().get(0).getUpstreams().get(1).toString(), upstreamUrn2); + + assertEquals(result.getFineGrainedLineages().get(0).getUpstreams().size(), 2); + } }