diff --git a/docker/elasticsearch-setup/create-indices.sh b/docker/elasticsearch-setup/create-indices.sh index 6f61f811275ebc..c1b3f87228e736 100755 --- a/docker/elasticsearch-setup/create-indices.sh +++ b/docker/elasticsearch-setup/create-indices.sh @@ -14,6 +14,8 @@ function create_index { create_index chartdocument chart/settings.json chart/mappings.json create_index corpuserinfodocument corp-user/settings.json corp-user/mappings.json create_index dashboarddocument dashboard/settings.json dashboard/mappings.json +create_index datajobdocument datajob/settings.json datajob/mappings.json +create_index dataflowdocument dataflow/settings.json dataflow/mappings.json create_index dataprocessdocument data-process/settings.json data-process/mappings.json create_index datasetdocument dataset/settings.json dataset/mappings.json create_index mlmodeldocument ml-model/settings.json ml-model/mappings.json diff --git a/gms/api/src/main/idl/com.linkedin.dataflow.dataFlows.restspec.json b/gms/api/src/main/idl/com.linkedin.dataflow.dataFlows.restspec.json new file mode 100644 index 00000000000000..c856512c497a1d --- /dev/null +++ b/gms/api/src/main/idl/com.linkedin.dataflow.dataFlows.restspec.json @@ -0,0 +1,141 @@ +{ + "name" : "dataFlows", + "namespace" : "com.linkedin.dataflow", + "path" : "/dataFlows", + "schema" : "com.linkedin.datajob.DataFlow", + "doc" : "generated from: com.linkedin.metadata.resources.datajob.DataFlows", + "collection" : { + "identifier" : { + "name" : "key", + "type" : "com.linkedin.datajob.DataFlowKey", + "params" : "com.linkedin.restli.common.EmptyRecord" + }, + "supports" : [ "batch_get", "get", "get_all" ], + "methods" : [ { + "method" : "get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "batch_get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "get_all", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "pagingSupported" : true + } ], + "finders" : [ { + "name" : "search", + "parameters" : [ { + "name" : "input", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "metadata" : { + "type" : "com.linkedin.metadata.query.SearchResultMetadata" + }, + "pagingSupported" : true + } ], + "actions" : [ { + "name" : "autocomplete", + "parameters" : [ { + "name" : "query", + "type" : "string" + }, { + "name" : "field", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.AutoCompleteResult" + }, { + "name" : "backfillWithUrns", + "parameters" : [ { + "name" : "urns", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.restli.BackfillResult" + }, { + "name" : "browse", + "parameters" : [ { + "name" : "path", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "start", + "type" : "int" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.BrowseResult" + }, { + "name" : "getBrowsePaths", + "parameters" : [ { + "name" : "urn", + "type" : "com.linkedin.common.Urn" + } ], + "returns" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "getSnapshot", + "parameters" : [ { + "name" : "urn", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.snapshot.DataFlowSnapshot" + }, { + "name" : "ingest", + "parameters" : [ { + "name" : "snapshot", + "type" : "com.linkedin.metadata.snapshot.DataFlowSnapshot" + } ] + } ], + "entity" : { + "path" : "/dataFlows/{key}" + } + } +} \ No newline at end of file diff --git a/gms/api/src/main/idl/com.linkedin.datajob.dataJobs.restspec.json b/gms/api/src/main/idl/com.linkedin.datajob.dataJobs.restspec.json new file mode 100644 index 00000000000000..ca64cee1cecbf0 --- /dev/null +++ b/gms/api/src/main/idl/com.linkedin.datajob.dataJobs.restspec.json @@ -0,0 +1,141 @@ +{ + "name" : "dataJobs", + "namespace" : "com.linkedin.datajob", + "path" : "/dataJobs", + "schema" : "com.linkedin.datajob.DataJob", + "doc" : "generated from: com.linkedin.metadata.resources.datajob.DataJobs", + "collection" : { + "identifier" : { + "name" : "key", + "type" : "com.linkedin.datajob.DataJobKey", + "params" : "com.linkedin.restli.common.EmptyRecord" + }, + "supports" : [ "batch_get", "get", "get_all" ], + "methods" : [ { + "method" : "get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "batch_get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "get_all", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "pagingSupported" : true + } ], + "finders" : [ { + "name" : "search", + "parameters" : [ { + "name" : "input", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "metadata" : { + "type" : "com.linkedin.metadata.query.SearchResultMetadata" + }, + "pagingSupported" : true + } ], + "actions" : [ { + "name" : "autocomplete", + "parameters" : [ { + "name" : "query", + "type" : "string" + }, { + "name" : "field", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.AutoCompleteResult" + }, { + "name" : "backfillWithUrns", + "parameters" : [ { + "name" : "urns", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.restli.BackfillResult" + }, { + "name" : "browse", + "parameters" : [ { + "name" : "path", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "start", + "type" : "int" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.BrowseResult" + }, { + "name" : "getBrowsePaths", + "parameters" : [ { + "name" : "urn", + "type" : "com.linkedin.common.Urn" + } ], + "returns" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "getSnapshot", + "parameters" : [ { + "name" : "urn", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.snapshot.DataJobSnapshot" + }, { + "name" : "ingest", + "parameters" : [ { + "name" : "snapshot", + "type" : "com.linkedin.metadata.snapshot.DataJobSnapshot" + } ] + } ], + "entity" : { + "path" : "/dataJobs/{key}" + } + } +} \ No newline at end of file diff --git a/gms/api/src/main/pegasus/com/linkedin/datajob/DataFlow.pdl b/gms/api/src/main/pegasus/com/linkedin/datajob/DataFlow.pdl new file mode 100644 index 00000000000000..9ffcc2767594e6 --- /dev/null +++ b/gms/api/src/main/pegasus/com/linkedin/datajob/DataFlow.pdl @@ -0,0 +1,26 @@ +namespace com.linkedin.datajob + +import com.linkedin.common.ChangeAuditStamps +import com.linkedin.common.DataFlowUrn +import com.linkedin.common.Ownership + +/** + * Metadata for DataFlow + */ +record DataFlow includes DataFlowKey, ChangeAuditStamps { + /** + * DataFlow urn + */ + urn: DataFlowUrn + + /** + * Ownership Info + */ + ownership: optional Ownership + + /** + * Optional additional metadata about the data flow + */ + info: optional DataFlowInfo + +} diff --git a/gms/api/src/main/pegasus/com/linkedin/datajob/DataFlowKey.pdl b/gms/api/src/main/pegasus/com/linkedin/datajob/DataFlowKey.pdl new file mode 100644 index 00000000000000..576ae1d0439e24 --- /dev/null +++ b/gms/api/src/main/pegasus/com/linkedin/datajob/DataFlowKey.pdl @@ -0,0 +1,36 @@ +namespace com.linkedin.datajob + + +/** + * Key for data flow resource + */ +record DataFlowKey { + + /** + * The workflow orchestrator, ex: Azkaban, Airflow + */ + @validate.strlen = { + "max" : 50, + "min" : 1 + } + orchestrator: string + + /** + * Id of the flow + */ + @validate.strlen = { + "max" : 200, + "min" : 1 + } + flowId: string + + /** + * Cluster of the flow + */ + @validate.strlen = { + "max" : 100, + "min" : 1 + } + cluster: string + +} \ No newline at end of file diff --git a/gms/api/src/main/pegasus/com/linkedin/datajob/DataJob.pdl b/gms/api/src/main/pegasus/com/linkedin/datajob/DataJob.pdl new file mode 100644 index 00000000000000..67a390991f9596 --- /dev/null +++ b/gms/api/src/main/pegasus/com/linkedin/datajob/DataJob.pdl @@ -0,0 +1,33 @@ +namespace com.linkedin.datajob + +import com.linkedin.common.ChangeAuditStamps +import com.linkedin.common.DataJobUrn +import com.linkedin.common.Ownership + + + +/** + * Metadata bout DataJob + */ +record DataJob includes DataJobKey, ChangeAuditStamps { + /** + * DataJob urn + */ + urn: DataJobUrn + + /** + * Ownership Info + */ + ownership: optional Ownership + + /** + * Input and output datasets of job + */ + inputOutput: optional DataJobInputOutput + + /** + * Optional additional metadata about the job + */ + info: optional DataJobInfo + +} diff --git a/gms/api/src/main/pegasus/com/linkedin/datajob/DataJobKey.pdl b/gms/api/src/main/pegasus/com/linkedin/datajob/DataJobKey.pdl new file mode 100644 index 00000000000000..263208ac1d0323 --- /dev/null +++ b/gms/api/src/main/pegasus/com/linkedin/datajob/DataJobKey.pdl @@ -0,0 +1,26 @@ +namespace com.linkedin.datajob + +import com.linkedin.common.DataFlowUrn + + +/** + * Key for data job resource + */ +record DataJobKey { + + /** + * Standardized dataflow urn where flow is defined. The data flow Urn (urn:li:dataflow:{orchestrator,flow_id, cluster}) + */ + @validate.`com.linkedin.datajob.rest.validator.DataFlowValidator` = { } + dataFlow: DataFlowUrn + + /** + * The Id of the data job + */ + @validate.strlen = { + "max" : 200, + "min" : 1 + } + jobId: string + +} \ No newline at end of file diff --git a/gms/api/src/main/snapshot/com.linkedin.chart.charts.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.chart.charts.snapshot.json index 2ca5c84b08d740..a855e7a5409e15 100644 --- a/gms/api/src/main/snapshot/com.linkedin.chart.charts.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.chart.charts.snapshot.json @@ -230,7 +230,7 @@ "fields" : [ { "name" : "owner", "type" : "Urn", - "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name" + "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name\n(Caveat: only corpuser is currently supported in the frontend.)" }, { "name" : "type", "type" : { diff --git a/gms/api/src/main/snapshot/com.linkedin.dashboard.dashboards.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.dashboard.dashboards.snapshot.json index cfbaaf34714a53..52cfdfd6550502 100644 --- a/gms/api/src/main/snapshot/com.linkedin.dashboard.dashboards.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.dashboard.dashboards.snapshot.json @@ -133,7 +133,7 @@ "fields" : [ { "name" : "owner", "type" : "Urn", - "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name" + "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name\n(Caveat: only corpuser is currently supported in the frontend.)" }, { "name" : "type", "type" : { diff --git a/gms/api/src/main/snapshot/com.linkedin.dataflow.dataFlows.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.dataflow.dataFlows.snapshot.json new file mode 100644 index 00000000000000..59a4eabcf2070a --- /dev/null +++ b/gms/api/src/main/snapshot/com.linkedin.dataflow.dataFlows.snapshot.json @@ -0,0 +1,644 @@ +{ + "models" : [ { + "type" : "record", + "name" : "AuditStamp", + "namespace" : "com.linkedin.common", + "doc" : "Data captured on a resource/association/sub-resource level giving insight into when that resource/association/sub-resource moved into a particular lifecycle stage, and who acted to move it into that specific lifecycle stage.", + "fields" : [ { + "name" : "time", + "type" : { + "type" : "typeref", + "name" : "Time", + "doc" : "Number of milliseconds since midnight, January 1, 1970 UTC. It must be a positive number", + "ref" : "long" + }, + "doc" : "When did the resource/association/sub-resource move into the specific lifecycle stage represented by this AuditEvent." + }, { + "name" : "actor", + "type" : { + "type" : "typeref", + "name" : "Urn", + "ref" : "string", + "java" : { + "class" : "com.linkedin.common.urn.Urn" + } + }, + "doc" : "The entity (e.g. a member URN) which will be credited for moving the resource/association/sub-resource into the specific lifecycle stage. It is also the one used to authorize the change." + }, { + "name" : "impersonator", + "type" : "Urn", + "doc" : "The entity (e.g. a service URN) which performs the change on behalf of the Actor and must be authorized to act as the Actor.", + "optional" : true + } ] + }, { + "type" : "record", + "name" : "ChangeAuditStamps", + "namespace" : "com.linkedin.common", + "doc" : "Data captured on a resource/association/sub-resource level giving insight into when that resource/association/sub-resource moved into various lifecycle stages, and who acted to move it into those lifecycle stages. The recommended best practice is to include this record in your record schema, and annotate its fields as @readOnly in your resource. See https://github.com/linkedin/rest.li/wiki/Validation-in-Rest.li#restli-validation-annotations", + "fields" : [ { + "name" : "created", + "type" : "AuditStamp", + "doc" : "An AuditStamp corresponding to the creation of this resource/association/sub-resource" + }, { + "name" : "lastModified", + "type" : "AuditStamp", + "doc" : "An AuditStamp corresponding to the last modification of this resource/association/sub-resource. If no modification has happened since creation, lastModified should be the same as created" + }, { + "name" : "deleted", + "type" : "AuditStamp", + "doc" : "An AuditStamp corresponding to the deletion of this resource/association/sub-resource. Logically, deleted MUST have a later timestamp than creation. It may or may not have the same time as lastModified depending upon the resource/association/sub-resource semantics.", + "optional" : true + } ] + }, { + "type" : "typeref", + "name" : "DataFlowUrn", + "namespace" : "com.linkedin.common", + "doc" : "Standardized data processing flow identifier.", + "ref" : "string", + "java" : { + "class" : "com.linkedin.common.urn.DataFlowUrn" + }, + "validate" : { + "com.linkedin.common.validator.TypedUrnValidator" : { + "accessible" : true, + "constructable" : true, + "doc" : "Standardized data processing flow identifier.", + "entityType" : "dataFlow", + "fields" : [ { + "doc" : "Workflow manager like azkaban, airflow which orchestrates the flow", + "maxLength" : 50, + "name" : "orchestrator", + "type" : "string" + }, { + "doc" : "Unique Identifier of the data flow", + "maxLength" : 200, + "name" : "flowId", + "type" : "string" + }, { + "doc" : "Cluster where the flow is executed", + "maxLength" : 100, + "name" : "cluster", + "type" : "string" + } ], + "maxLength" : 373, + "name" : "DataFlow", + "namespace" : "li", + "owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ], + "owningTeam" : "urn:li:internalTeam:datahub" + } + } + }, { + "type" : "record", + "name" : "Owner", + "namespace" : "com.linkedin.common", + "doc" : "Ownership information", + "fields" : [ { + "name" : "owner", + "type" : "Urn", + "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name\n(Caveat: only corpuser is currently supported in the frontend.)" + }, { + "name" : "type", + "type" : { + "type" : "enum", + "name" : "OwnershipType", + "doc" : "Owner category or owner role", + "symbols" : [ "DEVELOPER", "DATAOWNER", "DELEGATE", "PRODUCER", "CONSUMER", "STAKEHOLDER" ], + "symbolDocs" : { + "CONSUMER" : "A person, group, or service that consumes the data", + "DATAOWNER" : "A person or group that is owning the data", + "DELEGATE" : "A person or a group that overseas the operation, e.g. a DBA or SRE.", + "DEVELOPER" : "A person or group that is in charge of developing the code", + "PRODUCER" : "A person, group, or service that produces/generates the data", + "STAKEHOLDER" : "A person or a group that has direct business interest" + } + }, + "doc" : "The type of the ownership" + }, { + "name" : "source", + "type" : { + "type" : "record", + "name" : "OwnershipSource", + "doc" : "Source/provider of the ownership information", + "fields" : [ { + "name" : "type", + "type" : { + "type" : "enum", + "name" : "OwnershipSourceType", + "symbols" : [ "AUDIT", "DATABASE", "FILE_SYSTEM", "ISSUE_TRACKING_SYSTEM", "MANUAL", "SERVICE", "SOURCE_CONTROL", "OTHER" ], + "symbolDocs" : { + "AUDIT" : "Auditing system or audit logs", + "DATABASE" : "Database, e.g. GRANTS table", + "FILE_SYSTEM" : "File system, e.g. file/directory owner", + "ISSUE_TRACKING_SYSTEM" : "Issue tracking system, e.g. Jira", + "MANUAL" : "Manually provided by a user", + "OTHER" : "Other sources", + "SERVICE" : "Other ownership-like service, e.g. Nuage, ACL service etc", + "SOURCE_CONTROL" : "SCM system, e.g. GIT, SVN" + } + }, + "doc" : "The type of the source" + }, { + "name" : "url", + "type" : "string", + "doc" : "A reference URL for the source", + "optional" : true + } ] + }, + "doc" : "Source information for the ownership", + "optional" : true + } ] + }, { + "type" : "record", + "name" : "Ownership", + "namespace" : "com.linkedin.common", + "doc" : "Ownership information of an entity.", + "fields" : [ { + "name" : "owners", + "type" : { + "type" : "array", + "items" : "Owner" + }, + "doc" : "List of owners of the entity." + }, { + "name" : "lastModified", + "type" : "AuditStamp", + "doc" : "Audit stamp containing who last modified the record and when." + } ] + }, "com.linkedin.common.OwnershipSource", "com.linkedin.common.OwnershipSourceType", "com.linkedin.common.OwnershipType", "com.linkedin.common.Time", "com.linkedin.common.Urn", { + "type" : "record", + "name" : "DataFlow", + "namespace" : "com.linkedin.datajob", + "doc" : "Metadata for DataFlow", + "include" : [ { + "type" : "record", + "name" : "DataFlowKey", + "doc" : "Key for data flow resource", + "fields" : [ { + "name" : "orchestrator", + "type" : "string", + "doc" : "The workflow orchestrator, ex: Azkaban, Airflow", + "validate" : { + "strlen" : { + "max" : 50, + "min" : 1 + } + } + }, { + "name" : "flowId", + "type" : "string", + "doc" : "Id of the flow", + "validate" : { + "strlen" : { + "max" : 200, + "min" : 1 + } + } + }, { + "name" : "cluster", + "type" : "string", + "doc" : "Cluster of the flow", + "validate" : { + "strlen" : { + "max" : 100, + "min" : 1 + } + } + } ] + }, "com.linkedin.common.ChangeAuditStamps" ], + "fields" : [ { + "name" : "urn", + "type" : "com.linkedin.common.DataFlowUrn", + "doc" : "DataFlow urn" + }, { + "name" : "ownership", + "type" : "com.linkedin.common.Ownership", + "doc" : "Ownership Info", + "optional" : true + }, { + "name" : "info", + "type" : { + "type" : "record", + "name" : "DataFlowInfo", + "doc" : "Information about a Data processing flow", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "Flow name" + }, { + "name" : "description", + "type" : "string", + "doc" : "Flow description", + "optional" : true + }, { + "name" : "project", + "type" : "string", + "doc" : "Optional project/namespace associated with the flow", + "optional" : true + } ] + }, + "doc" : "Optional additional metadata about the data flow", + "optional" : true + } ] + }, "com.linkedin.datajob.DataFlowInfo", "com.linkedin.datajob.DataFlowKey", { + "type" : "typeref", + "name" : "DataFlowAspect", + "namespace" : "com.linkedin.metadata.aspect", + "doc" : "A union of all supported metadata aspects for a Data flow", + "ref" : [ "com.linkedin.datajob.DataFlowInfo", "com.linkedin.common.Ownership" ] + }, { + "type" : "record", + "name" : "AggregationMetadata", + "namespace" : "com.linkedin.metadata.query", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "The name of the aggregation, e.g, platform, origin" + }, { + "name" : "aggregations", + "type" : { + "type" : "map", + "values" : "long" + }, + "doc" : "List of aggregations showing the number of documents falling into each bucket. e.g, for platform aggregation, the bucket can be hive, kafka, etc" + } ] + }, { + "type" : "record", + "name" : "AutoCompleteResult", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The model for the auto complete result", + "fields" : [ { + "name" : "query", + "type" : "string", + "doc" : "The original chars typed by user" + }, { + "name" : "suggestions", + "type" : { + "type" : "array", + "items" : "string" + }, + "doc" : "A list of typeahead suggestions" + } ] + }, { + "type" : "record", + "name" : "BrowseResult", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The model for the result of a browse query", + "fields" : [ { + "name" : "entities", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "BrowseResultEntity", + "doc" : "Data model for an entity returned as part of a browse query", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "Name of the entity", + "optional" : true + }, { + "name" : "urn", + "type" : "com.linkedin.common.Urn", + "doc" : "URN of the entity" + } ] + } + }, + "doc" : "A list of entities under the queried path" + }, { + "name" : "metadata", + "type" : { + "type" : "record", + "name" : "BrowseResultMetadata", + "doc" : "The model for browse result metadata", + "fields" : [ { + "name" : "path", + "type" : "string", + "doc" : "Path that is being browsed" + }, { + "name" : "groups", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "BrowseResultGroup", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "Name of the group" + }, { + "name" : "count", + "type" : "long", + "doc" : "Number of entities that can be reached from this path" + } ] + } + }, + "doc" : "A list of groups and total number of entities inside those groups under the queried path", + "default" : [ ] + }, { + "name" : "totalNumEntities", + "type" : "long", + "doc" : "Total number of entities we can reach from path" + } ] + }, + "doc" : "Metadata specific to the browse result of the queried path" + }, { + "name" : "from", + "type" : "int", + "doc" : "Offset of the first entity in the result" + }, { + "name" : "pageSize", + "type" : "int", + "doc" : "Size of each page in the result" + }, { + "name" : "numEntities", + "type" : "int", + "doc" : "The total number of entities directly under queried path" + } ] + }, "com.linkedin.metadata.query.BrowseResultEntity", "com.linkedin.metadata.query.BrowseResultGroup", "com.linkedin.metadata.query.BrowseResultMetadata", { + "type" : "enum", + "name" : "Condition", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The matching condition in a filter criterion", + "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], + "symbolDocs" : { + "CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile", + "END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event", + "EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs", + "GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5", + "GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5", + "LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3", + "LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3", + "START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView" + } + }, { + "type" : "record", + "name" : "Criterion", + "namespace" : "com.linkedin.metadata.query", + "doc" : "A criterion for matching a field with given value", + "fields" : [ { + "name" : "field", + "type" : "string", + "doc" : "The name of the field that the criterion refers to" + }, { + "name" : "value", + "type" : "string", + "doc" : "The value of the intended field" + }, { + "name" : "condition", + "type" : "Condition", + "doc" : "The condition for the criterion, e.g. EQUAL, START_WITH", + "default" : "EQUAL" + } ] + }, { + "type" : "record", + "name" : "Filter", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The filter for finding an record or a collection of records", + "fields" : [ { + "name" : "criteria", + "type" : { + "type" : "array", + "items" : "Criterion" + }, + "doc" : "A list of criteria the filter applies to the query" + } ] + }, { + "type" : "record", + "name" : "SearchResultMetadata", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The model for the search result", + "fields" : [ { + "name" : "searchResultMetadatas", + "type" : { + "type" : "array", + "items" : "AggregationMetadata" + }, + "doc" : "A list of search result metadata such as aggregations" + }, { + "name" : "urns", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.Urn" + }, + "doc" : "A list of urns corresponding to search documents (in order) as returned by the search index" + } ] + }, { + "type" : "record", + "name" : "SortCriterion", + "namespace" : "com.linkedin.metadata.query", + "doc" : "Sort order along with the field to sort it on, to be applied to the results.", + "fields" : [ { + "name" : "field", + "type" : "string", + "doc" : "The name of the field that sorting has to be applied to" + }, { + "name" : "order", + "type" : { + "type" : "enum", + "name" : "SortOrder", + "doc" : "The order used to sort the results", + "symbols" : [ "ASCENDING", "DESCENDING" ], + "symbolDocs" : { + "ASCENDING" : "If results need to be sorted in ascending order", + "DESCENDING" : "If results need to be sorted in descending order" + } + }, + "doc" : "The order to sort the results i.e. ASCENDING or DESCENDING" + } ] + }, "com.linkedin.metadata.query.SortOrder", { + "type" : "record", + "name" : "BackfillResult", + "namespace" : "com.linkedin.metadata.restli", + "doc" : "The model for the result of a backfill", + "fields" : [ { + "name" : "entities", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "BackfillResultEntity", + "fields" : [ { + "name" : "urn", + "type" : "com.linkedin.common.Urn", + "doc" : "Urn of the backfilled entity" + }, { + "name" : "aspects", + "type" : { + "type" : "array", + "items" : "string" + }, + "doc" : "List of the aspects backfilled for the entity" + } ] + } + }, + "doc" : "List of backfilled entities" + } ] + }, "com.linkedin.metadata.restli.BackfillResultEntity", { + "type" : "record", + "name" : "DataFlowSnapshot", + "namespace" : "com.linkedin.metadata.snapshot", + "doc" : "A metadata snapshot for a specific DataFlow entity.", + "fields" : [ { + "name" : "urn", + "type" : "com.linkedin.common.DataFlowUrn", + "doc" : "URN for the entity the metadata snapshot is associated with." + }, { + "name" : "aspects", + "type" : { + "type" : "array", + "items" : "com.linkedin.metadata.aspect.DataFlowAspect" + }, + "doc" : "The list of metadata aspects associated with the data flow. Depending on the use case, this can either be all, or a selection, of supported aspects." + } ] + }, { + "type" : "record", + "name" : "EmptyRecord", + "namespace" : "com.linkedin.restli.common", + "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource.", + "fields" : [ ], + "validate" : { + "com.linkedin.restli.common.EmptyRecordValidator" : { } + } + } ], + "schema" : { + "name" : "dataFlows", + "namespace" : "com.linkedin.dataflow", + "path" : "/dataFlows", + "schema" : "com.linkedin.datajob.DataFlow", + "doc" : "generated from: com.linkedin.metadata.resources.datajob.DataFlows", + "collection" : { + "identifier" : { + "name" : "key", + "type" : "com.linkedin.datajob.DataFlowKey", + "params" : "com.linkedin.restli.common.EmptyRecord" + }, + "supports" : [ "batch_get", "get", "get_all" ], + "methods" : [ { + "method" : "get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "batch_get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "get_all", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "pagingSupported" : true + } ], + "finders" : [ { + "name" : "search", + "parameters" : [ { + "name" : "input", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "metadata" : { + "type" : "com.linkedin.metadata.query.SearchResultMetadata" + }, + "pagingSupported" : true + } ], + "actions" : [ { + "name" : "autocomplete", + "parameters" : [ { + "name" : "query", + "type" : "string" + }, { + "name" : "field", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.AutoCompleteResult" + }, { + "name" : "backfillWithUrns", + "parameters" : [ { + "name" : "urns", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.restli.BackfillResult" + }, { + "name" : "browse", + "parameters" : [ { + "name" : "path", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "start", + "type" : "int" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.BrowseResult" + }, { + "name" : "getBrowsePaths", + "parameters" : [ { + "name" : "urn", + "type" : "com.linkedin.common.Urn" + } ], + "returns" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "getSnapshot", + "parameters" : [ { + "name" : "urn", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.snapshot.DataFlowSnapshot" + }, { + "name" : "ingest", + "parameters" : [ { + "name" : "snapshot", + "type" : "com.linkedin.metadata.snapshot.DataFlowSnapshot" + } ] + } ], + "entity" : { + "path" : "/dataFlows/{key}" + } + } + } +} \ No newline at end of file diff --git a/gms/api/src/main/snapshot/com.linkedin.datajob.dataJobs.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.datajob.dataJobs.snapshot.json new file mode 100644 index 00000000000000..592f4270276011 --- /dev/null +++ b/gms/api/src/main/snapshot/com.linkedin.datajob.dataJobs.snapshot.json @@ -0,0 +1,736 @@ +{ + "models" : [ { + "type" : "record", + "name" : "AuditStamp", + "namespace" : "com.linkedin.common", + "doc" : "Data captured on a resource/association/sub-resource level giving insight into when that resource/association/sub-resource moved into a particular lifecycle stage, and who acted to move it into that specific lifecycle stage.", + "fields" : [ { + "name" : "time", + "type" : { + "type" : "typeref", + "name" : "Time", + "doc" : "Number of milliseconds since midnight, January 1, 1970 UTC. It must be a positive number", + "ref" : "long" + }, + "doc" : "When did the resource/association/sub-resource move into the specific lifecycle stage represented by this AuditEvent." + }, { + "name" : "actor", + "type" : { + "type" : "typeref", + "name" : "Urn", + "ref" : "string", + "java" : { + "class" : "com.linkedin.common.urn.Urn" + } + }, + "doc" : "The entity (e.g. a member URN) which will be credited for moving the resource/association/sub-resource into the specific lifecycle stage. It is also the one used to authorize the change." + }, { + "name" : "impersonator", + "type" : "Urn", + "doc" : "The entity (e.g. a service URN) which performs the change on behalf of the Actor and must be authorized to act as the Actor.", + "optional" : true + } ] + }, { + "type" : "record", + "name" : "ChangeAuditStamps", + "namespace" : "com.linkedin.common", + "doc" : "Data captured on a resource/association/sub-resource level giving insight into when that resource/association/sub-resource moved into various lifecycle stages, and who acted to move it into those lifecycle stages. The recommended best practice is to include this record in your record schema, and annotate its fields as @readOnly in your resource. See https://github.com/linkedin/rest.li/wiki/Validation-in-Rest.li#restli-validation-annotations", + "fields" : [ { + "name" : "created", + "type" : "AuditStamp", + "doc" : "An AuditStamp corresponding to the creation of this resource/association/sub-resource" + }, { + "name" : "lastModified", + "type" : "AuditStamp", + "doc" : "An AuditStamp corresponding to the last modification of this resource/association/sub-resource. If no modification has happened since creation, lastModified should be the same as created" + }, { + "name" : "deleted", + "type" : "AuditStamp", + "doc" : "An AuditStamp corresponding to the deletion of this resource/association/sub-resource. Logically, deleted MUST have a later timestamp than creation. It may or may not have the same time as lastModified depending upon the resource/association/sub-resource semantics.", + "optional" : true + } ] + }, { + "type" : "typeref", + "name" : "DataFlowUrn", + "namespace" : "com.linkedin.common", + "doc" : "Standardized data processing flow identifier.", + "ref" : "string", + "java" : { + "class" : "com.linkedin.common.urn.DataFlowUrn" + }, + "validate" : { + "com.linkedin.common.validator.TypedUrnValidator" : { + "accessible" : true, + "constructable" : true, + "doc" : "Standardized data processing flow identifier.", + "entityType" : "dataFlow", + "fields" : [ { + "doc" : "Workflow manager like azkaban, airflow which orchestrates the flow", + "maxLength" : 50, + "name" : "orchestrator", + "type" : "string" + }, { + "doc" : "Unique Identifier of the data flow", + "maxLength" : 200, + "name" : "flowId", + "type" : "string" + }, { + "doc" : "Cluster where the flow is executed", + "maxLength" : 100, + "name" : "cluster", + "type" : "string" + } ], + "maxLength" : 373, + "name" : "DataFlow", + "namespace" : "li", + "owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ], + "owningTeam" : "urn:li:internalTeam:datahub" + } + } + }, { + "type" : "typeref", + "name" : "DataJobUrn", + "namespace" : "com.linkedin.common", + "doc" : "Standardized data processing job identifier.", + "ref" : "string", + "java" : { + "class" : "com.linkedin.common.urn.DataJobUrn" + }, + "validate" : { + "com.linkedin.common.validator.TypedUrnValidator" : { + "accessible" : true, + "constructable" : true, + "doc" : "Standardized data processing job identifier.", + "entityType" : "dataJob", + "fields" : [ { + "doc" : "Standardized data processing flow urn representing the flow for the job", + "name" : "flow", + "type" : "com.linkedin.common.urn.DataFlowUrn" + }, { + "doc" : "Unique identifier of the data job", + "maxLength" : 200, + "name" : "jobID", + "type" : "string" + } ], + "maxLength" : 594, + "name" : "DataJob", + "namespace" : "li", + "owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ], + "owningTeam" : "urn:li:internalTeam:datahub" + } + } + }, { + "type" : "typeref", + "name" : "DatasetUrn", + "namespace" : "com.linkedin.common", + "doc" : "Standardized dataset identifier.", + "ref" : "string", + "java" : { + "class" : "com.linkedin.common.urn.DatasetUrn" + }, + "validate" : { + "com.linkedin.common.validator.TypedUrnValidator" : { + "accessible" : true, + "constructable" : true, + "doc" : "Standardized dataset identifier.", + "entityType" : "dataset", + "fields" : [ { + "doc" : "Standardized platform urn where dataset is defined.", + "name" : "platform", + "type" : "com.linkedin.common.urn.DataPlatformUrn" + }, { + "doc" : "Dataset native name e.g. ., /dir/subdir/, or ", + "maxLength" : 210, + "name" : "datasetName", + "type" : "string" + }, { + "doc" : "Fabric type where dataset belongs to or where it was generated.", + "name" : "origin", + "type" : "com.linkedin.common.FabricType" + } ], + "maxLength" : 284, + "name" : "Dataset", + "namespace" : "li", + "owners" : [ "urn:li:corpuser:fbar", "urn:li:corpuser:bfoo" ], + "owningTeam" : "urn:li:internalTeam:datahub" + } + } + }, { + "type" : "record", + "name" : "Owner", + "namespace" : "com.linkedin.common", + "doc" : "Ownership information", + "fields" : [ { + "name" : "owner", + "type" : "Urn", + "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name\n(Caveat: only corpuser is currently supported in the frontend.)" + }, { + "name" : "type", + "type" : { + "type" : "enum", + "name" : "OwnershipType", + "doc" : "Owner category or owner role", + "symbols" : [ "DEVELOPER", "DATAOWNER", "DELEGATE", "PRODUCER", "CONSUMER", "STAKEHOLDER" ], + "symbolDocs" : { + "CONSUMER" : "A person, group, or service that consumes the data", + "DATAOWNER" : "A person or group that is owning the data", + "DELEGATE" : "A person or a group that overseas the operation, e.g. a DBA or SRE.", + "DEVELOPER" : "A person or group that is in charge of developing the code", + "PRODUCER" : "A person, group, or service that produces/generates the data", + "STAKEHOLDER" : "A person or a group that has direct business interest" + } + }, + "doc" : "The type of the ownership" + }, { + "name" : "source", + "type" : { + "type" : "record", + "name" : "OwnershipSource", + "doc" : "Source/provider of the ownership information", + "fields" : [ { + "name" : "type", + "type" : { + "type" : "enum", + "name" : "OwnershipSourceType", + "symbols" : [ "AUDIT", "DATABASE", "FILE_SYSTEM", "ISSUE_TRACKING_SYSTEM", "MANUAL", "SERVICE", "SOURCE_CONTROL", "OTHER" ], + "symbolDocs" : { + "AUDIT" : "Auditing system or audit logs", + "DATABASE" : "Database, e.g. GRANTS table", + "FILE_SYSTEM" : "File system, e.g. file/directory owner", + "ISSUE_TRACKING_SYSTEM" : "Issue tracking system, e.g. Jira", + "MANUAL" : "Manually provided by a user", + "OTHER" : "Other sources", + "SERVICE" : "Other ownership-like service, e.g. Nuage, ACL service etc", + "SOURCE_CONTROL" : "SCM system, e.g. GIT, SVN" + } + }, + "doc" : "The type of the source" + }, { + "name" : "url", + "type" : "string", + "doc" : "A reference URL for the source", + "optional" : true + } ] + }, + "doc" : "Source information for the ownership", + "optional" : true + } ] + }, { + "type" : "record", + "name" : "Ownership", + "namespace" : "com.linkedin.common", + "doc" : "Ownership information of an entity.", + "fields" : [ { + "name" : "owners", + "type" : { + "type" : "array", + "items" : "Owner" + }, + "doc" : "List of owners of the entity." + }, { + "name" : "lastModified", + "type" : "AuditStamp", + "doc" : "Audit stamp containing who last modified the record and when." + } ] + }, "com.linkedin.common.OwnershipSource", "com.linkedin.common.OwnershipSourceType", "com.linkedin.common.OwnershipType", "com.linkedin.common.Time", "com.linkedin.common.Urn", { + "type" : "record", + "name" : "DataJob", + "namespace" : "com.linkedin.datajob", + "doc" : "Metadata bout DataJob", + "include" : [ { + "type" : "record", + "name" : "DataJobKey", + "doc" : "Key for data job resource", + "fields" : [ { + "name" : "dataFlow", + "type" : "com.linkedin.common.DataFlowUrn", + "doc" : "Standardized dataflow urn where flow is defined. The data flow Urn (urn:li:dataflow:{orchestrator,flow_id, cluster})", + "validate" : { + "com.linkedin.datajob.rest.validator.DataFlowValidator" : { } + } + }, { + "name" : "jobId", + "type" : "string", + "doc" : "The Id of the data job", + "validate" : { + "strlen" : { + "max" : 200, + "min" : 1 + } + } + } ] + }, "com.linkedin.common.ChangeAuditStamps" ], + "fields" : [ { + "name" : "urn", + "type" : "com.linkedin.common.DataJobUrn", + "doc" : "DataJob urn" + }, { + "name" : "ownership", + "type" : "com.linkedin.common.Ownership", + "doc" : "Ownership Info", + "optional" : true + }, { + "name" : "inputOutput", + "type" : { + "type" : "record", + "name" : "DataJobInputOutput", + "doc" : "Information about the inputs and outputs of a Data processing job", + "fields" : [ { + "name" : "inputDatasets", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.DatasetUrn" + }, + "doc" : "Input datasets consumed by the data job during processing" + }, { + "name" : "outputDatasets", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.DatasetUrn" + }, + "doc" : "Output datasets produced by the data job during processing" + } ] + }, + "doc" : "Input and output datasets of job", + "optional" : true + }, { + "name" : "info", + "type" : { + "type" : "record", + "name" : "DataJobInfo", + "doc" : "Information about a Data processing job", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "Job name" + }, { + "name" : "description", + "type" : "string", + "doc" : "Job description", + "optional" : true + }, { + "name" : "type", + "type" : [ { + "type" : "enum", + "name" : "AzkabanJobType", + "namespace" : "com.linkedin.datajob.azkaban", + "doc" : "The various types of support azkaban jobs", + "symbols" : [ "COMMAND", "HADOOP_JAVA", "HADOOP_SHELL", "HIVE", "PIG", "SQL" ], + "symbolDocs" : { + "COMMAND" : "The command job type is one of the basic built-in types. It runs multiple UNIX commands using java processbuilder.\nUpon execution, Azkaban spawns off a process to run the command.", + "HADOOP_JAVA" : "Runs a java program with ability to access Hadoop cluster.\nhttps://azkaban.readthedocs.io/en/latest/jobTypes.html#java-job-type", + "HADOOP_SHELL" : "In large part, this is the same Command type. The difference is its ability to talk to a Hadoop cluster\nsecurely, via Hadoop tokens.", + "HIVE" : "Hive type is for running Hive jobs.", + "PIG" : "Pig type is for running Pig jobs.", + "SQL" : "SQL is for running Presto, mysql queries etc" + } + } ], + "doc" : "Datajob type" + } ] + }, + "doc" : "Optional additional metadata about the job", + "optional" : true + } ] + }, "com.linkedin.datajob.DataJobInfo", "com.linkedin.datajob.DataJobInputOutput", "com.linkedin.datajob.DataJobKey", "com.linkedin.datajob.azkaban.AzkabanJobType", { + "type" : "typeref", + "name" : "DataJobAspect", + "namespace" : "com.linkedin.metadata.aspect", + "doc" : "A union of all supported metadata aspects for a Data job", + "ref" : [ "com.linkedin.datajob.DataJobInfo", "com.linkedin.datajob.DataJobInputOutput", "com.linkedin.common.Ownership" ] + }, { + "type" : "record", + "name" : "AggregationMetadata", + "namespace" : "com.linkedin.metadata.query", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "The name of the aggregation, e.g, platform, origin" + }, { + "name" : "aggregations", + "type" : { + "type" : "map", + "values" : "long" + }, + "doc" : "List of aggregations showing the number of documents falling into each bucket. e.g, for platform aggregation, the bucket can be hive, kafka, etc" + } ] + }, { + "type" : "record", + "name" : "AutoCompleteResult", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The model for the auto complete result", + "fields" : [ { + "name" : "query", + "type" : "string", + "doc" : "The original chars typed by user" + }, { + "name" : "suggestions", + "type" : { + "type" : "array", + "items" : "string" + }, + "doc" : "A list of typeahead suggestions" + } ] + }, { + "type" : "record", + "name" : "BrowseResult", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The model for the result of a browse query", + "fields" : [ { + "name" : "entities", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "BrowseResultEntity", + "doc" : "Data model for an entity returned as part of a browse query", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "Name of the entity", + "optional" : true + }, { + "name" : "urn", + "type" : "com.linkedin.common.Urn", + "doc" : "URN of the entity" + } ] + } + }, + "doc" : "A list of entities under the queried path" + }, { + "name" : "metadata", + "type" : { + "type" : "record", + "name" : "BrowseResultMetadata", + "doc" : "The model for browse result metadata", + "fields" : [ { + "name" : "path", + "type" : "string", + "doc" : "Path that is being browsed" + }, { + "name" : "groups", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "BrowseResultGroup", + "fields" : [ { + "name" : "name", + "type" : "string", + "doc" : "Name of the group" + }, { + "name" : "count", + "type" : "long", + "doc" : "Number of entities that can be reached from this path" + } ] + } + }, + "doc" : "A list of groups and total number of entities inside those groups under the queried path", + "default" : [ ] + }, { + "name" : "totalNumEntities", + "type" : "long", + "doc" : "Total number of entities we can reach from path" + } ] + }, + "doc" : "Metadata specific to the browse result of the queried path" + }, { + "name" : "from", + "type" : "int", + "doc" : "Offset of the first entity in the result" + }, { + "name" : "pageSize", + "type" : "int", + "doc" : "Size of each page in the result" + }, { + "name" : "numEntities", + "type" : "int", + "doc" : "The total number of entities directly under queried path" + } ] + }, "com.linkedin.metadata.query.BrowseResultEntity", "com.linkedin.metadata.query.BrowseResultGroup", "com.linkedin.metadata.query.BrowseResultMetadata", { + "type" : "enum", + "name" : "Condition", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The matching condition in a filter criterion", + "symbols" : [ "CONTAIN", "END_WITH", "EQUAL", "GREATER_THAN", "GREATER_THAN_OR_EQUAL_TO", "LESS_THAN", "LESS_THAN_OR_EQUAL_TO", "START_WITH" ], + "symbolDocs" : { + "CONTAIN" : "Represent the relation: String field contains value, e.g. name contains Profile", + "END_WITH" : "Represent the relation: String field ends with value, e.g. name ends with Event", + "EQUAL" : "Represent the relation: field = value, e.g. platform = hdfs", + "GREATER_THAN" : "Represent the relation greater than, e.g. ownerCount > 5", + "GREATER_THAN_OR_EQUAL_TO" : "Represent the relation greater than or equal to, e.g. ownerCount >= 5", + "LESS_THAN" : "Represent the relation less than, e.g. ownerCount < 3", + "LESS_THAN_OR_EQUAL_TO" : "Represent the relation less than or equal to, e.g. ownerCount <= 3", + "START_WITH" : "Represent the relation: String field starts with value, e.g. name starts with PageView" + } + }, { + "type" : "record", + "name" : "Criterion", + "namespace" : "com.linkedin.metadata.query", + "doc" : "A criterion for matching a field with given value", + "fields" : [ { + "name" : "field", + "type" : "string", + "doc" : "The name of the field that the criterion refers to" + }, { + "name" : "value", + "type" : "string", + "doc" : "The value of the intended field" + }, { + "name" : "condition", + "type" : "Condition", + "doc" : "The condition for the criterion, e.g. EQUAL, START_WITH", + "default" : "EQUAL" + } ] + }, { + "type" : "record", + "name" : "Filter", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The filter for finding an record or a collection of records", + "fields" : [ { + "name" : "criteria", + "type" : { + "type" : "array", + "items" : "Criterion" + }, + "doc" : "A list of criteria the filter applies to the query" + } ] + }, { + "type" : "record", + "name" : "SearchResultMetadata", + "namespace" : "com.linkedin.metadata.query", + "doc" : "The model for the search result", + "fields" : [ { + "name" : "searchResultMetadatas", + "type" : { + "type" : "array", + "items" : "AggregationMetadata" + }, + "doc" : "A list of search result metadata such as aggregations" + }, { + "name" : "urns", + "type" : { + "type" : "array", + "items" : "com.linkedin.common.Urn" + }, + "doc" : "A list of urns corresponding to search documents (in order) as returned by the search index" + } ] + }, { + "type" : "record", + "name" : "SortCriterion", + "namespace" : "com.linkedin.metadata.query", + "doc" : "Sort order along with the field to sort it on, to be applied to the results.", + "fields" : [ { + "name" : "field", + "type" : "string", + "doc" : "The name of the field that sorting has to be applied to" + }, { + "name" : "order", + "type" : { + "type" : "enum", + "name" : "SortOrder", + "doc" : "The order used to sort the results", + "symbols" : [ "ASCENDING", "DESCENDING" ], + "symbolDocs" : { + "ASCENDING" : "If results need to be sorted in ascending order", + "DESCENDING" : "If results need to be sorted in descending order" + } + }, + "doc" : "The order to sort the results i.e. ASCENDING or DESCENDING" + } ] + }, "com.linkedin.metadata.query.SortOrder", { + "type" : "record", + "name" : "BackfillResult", + "namespace" : "com.linkedin.metadata.restli", + "doc" : "The model for the result of a backfill", + "fields" : [ { + "name" : "entities", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "BackfillResultEntity", + "fields" : [ { + "name" : "urn", + "type" : "com.linkedin.common.Urn", + "doc" : "Urn of the backfilled entity" + }, { + "name" : "aspects", + "type" : { + "type" : "array", + "items" : "string" + }, + "doc" : "List of the aspects backfilled for the entity" + } ] + } + }, + "doc" : "List of backfilled entities" + } ] + }, "com.linkedin.metadata.restli.BackfillResultEntity", { + "type" : "record", + "name" : "DataJobSnapshot", + "namespace" : "com.linkedin.metadata.snapshot", + "doc" : "A metadata snapshot for a specific DataJob entity.", + "fields" : [ { + "name" : "urn", + "type" : "com.linkedin.common.DataJobUrn", + "doc" : "URN for the entity the metadata snapshot is associated with." + }, { + "name" : "aspects", + "type" : { + "type" : "array", + "items" : "com.linkedin.metadata.aspect.DataJobAspect" + }, + "doc" : "The list of metadata aspects associated with the data job. Depending on the use case, this can either be all, or a selection, of supported aspects." + } ] + }, { + "type" : "record", + "name" : "EmptyRecord", + "namespace" : "com.linkedin.restli.common", + "doc" : "An literally empty record. Intended as a marker to indicate the absence of content where a record type is required. If used the underlying DataMap *must* be empty, EmptyRecordValidator is provided to help enforce this. For example, CreateRequest extends Request to indicate it has no response body. Also, a ComplexKeyResource implementation that has no ParamKey should have a signature like XyzResource implements ComplexKeyResource.", + "fields" : [ ], + "validate" : { + "com.linkedin.restli.common.EmptyRecordValidator" : { } + } + } ], + "schema" : { + "name" : "dataJobs", + "namespace" : "com.linkedin.datajob", + "path" : "/dataJobs", + "schema" : "com.linkedin.datajob.DataJob", + "doc" : "generated from: com.linkedin.metadata.resources.datajob.DataJobs", + "collection" : { + "identifier" : { + "name" : "key", + "type" : "com.linkedin.datajob.DataJobKey", + "params" : "com.linkedin.restli.common.EmptyRecord" + }, + "supports" : [ "batch_get", "get", "get_all" ], + "methods" : [ { + "method" : "get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "batch_get", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ] + }, { + "method" : "get_all", + "parameters" : [ { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "pagingSupported" : true + } ], + "finders" : [ { + "name" : "search", + "parameters" : [ { + "name" : "input", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "sort", + "type" : "com.linkedin.metadata.query.SortCriterion", + "optional" : true + } ], + "metadata" : { + "type" : "com.linkedin.metadata.query.SearchResultMetadata" + }, + "pagingSupported" : true + } ], + "actions" : [ { + "name" : "autocomplete", + "parameters" : [ { + "name" : "query", + "type" : "string" + }, { + "name" : "field", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.AutoCompleteResult" + }, { + "name" : "backfillWithUrns", + "parameters" : [ { + "name" : "urns", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.restli.BackfillResult" + }, { + "name" : "browse", + "parameters" : [ { + "name" : "path", + "type" : "string" + }, { + "name" : "filter", + "type" : "com.linkedin.metadata.query.Filter", + "optional" : true + }, { + "name" : "start", + "type" : "int" + }, { + "name" : "limit", + "type" : "int" + } ], + "returns" : "com.linkedin.metadata.query.BrowseResult" + }, { + "name" : "getBrowsePaths", + "parameters" : [ { + "name" : "urn", + "type" : "com.linkedin.common.Urn" + } ], + "returns" : "{ \"type\" : \"array\", \"items\" : \"string\" }" + }, { + "name" : "getSnapshot", + "parameters" : [ { + "name" : "urn", + "type" : "string" + }, { + "name" : "aspects", + "type" : "{ \"type\" : \"array\", \"items\" : \"string\" }", + "optional" : true + } ], + "returns" : "com.linkedin.metadata.snapshot.DataJobSnapshot" + }, { + "name" : "ingest", + "parameters" : [ { + "name" : "snapshot", + "type" : "com.linkedin.metadata.snapshot.DataJobSnapshot" + } ] + } ], + "entity" : { + "path" : "/dataJobs/{key}" + } + } + } +} \ No newline at end of file diff --git a/gms/api/src/main/snapshot/com.linkedin.dataprocess.dataProcesses.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.dataprocess.dataProcesses.snapshot.json index e264a5604d1743..5ff3ae7fc10e37 100644 --- a/gms/api/src/main/snapshot/com.linkedin.dataprocess.dataProcesses.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.dataprocess.dataProcesses.snapshot.json @@ -123,7 +123,7 @@ "fields" : [ { "name" : "owner", "type" : "Urn", - "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name" + "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name\n(Caveat: only corpuser is currently supported in the frontend.)" }, { "name" : "type", "type" : { diff --git a/gms/api/src/main/snapshot/com.linkedin.dataset.datasets.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.dataset.datasets.snapshot.json index cc30d33de9df71..7adcde5dc2cc32 100644 --- a/gms/api/src/main/snapshot/com.linkedin.dataset.datasets.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.dataset.datasets.snapshot.json @@ -237,7 +237,7 @@ "fields" : [ { "name" : "owner", "type" : "Urn", - "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name" + "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name\n(Caveat: only corpuser is currently supported in the frontend.)" }, { "name" : "type", "type" : { diff --git a/gms/api/src/main/snapshot/com.linkedin.ml.mlModels.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.ml.mlModels.snapshot.json index 35965421779487..ffcfcc6d49d820 100644 --- a/gms/api/src/main/snapshot/com.linkedin.ml.mlModels.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.ml.mlModels.snapshot.json @@ -320,7 +320,7 @@ "fields" : [ { "name" : "owner", "type" : "Urn", - "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name" + "doc" : "Owner URN, e.g. urn:li:corpuser:ldap, urn:li:corpGroup:group_name, and urn:li:multiProduct:mp_name\n(Caveat: only corpuser is currently supported in the frontend.)" }, { "name" : "type", "type" : { diff --git a/gms/client/src/main/java/com/linkedin/datajob/client/DataFlows.java b/gms/client/src/main/java/com/linkedin/datajob/client/DataFlows.java new file mode 100644 index 00000000000000..114314138e4e34 --- /dev/null +++ b/gms/client/src/main/java/com/linkedin/datajob/client/DataFlows.java @@ -0,0 +1,161 @@ +package com.linkedin.datajob.client; + +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.query.AutoCompleteResult; +import com.linkedin.metadata.query.SortCriterion; +import com.linkedin.metadata.restli.BaseSearchableClient; +import com.linkedin.datajob.DataFlow; +import com.linkedin.datajob.DataFlowKey; +import com.linkedin.dataflow.DataFlowsDoAutocompleteRequestBuilder; +import com.linkedin.dataflow.DataFlowsFindBySearchRequestBuilder; +import com.linkedin.dataflow.DataFlowsRequestBuilders; +import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.restli.client.BatchGetEntityRequest; +import com.linkedin.restli.client.Client; +import com.linkedin.restli.client.GetRequest; +import com.linkedin.restli.common.CollectionResponse; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.EmptyRecord; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static com.linkedin.metadata.dao.utils.QueryUtils.*; + +public class DataFlows extends BaseSearchableClient { + private static final DataFlowsRequestBuilders DATA_FLOWS_REQUEST_BUILDERS = new DataFlowsRequestBuilders(); + + public DataFlows(@Nonnull Client restliClient) { + super(restliClient); + } + + @Nonnull + @Override + public CollectionResponse search(@Nonnull String input, @Nullable StringArray aspectNames, + @Nullable Map requestFilters, @Nullable SortCriterion sortCriterion, int start, int count) + throws RemoteInvocationException { + final DataFlowsFindBySearchRequestBuilder requestBuilder = DATA_FLOWS_REQUEST_BUILDERS.findBySearch() + .aspectsParam(aspectNames) + .inputParam(input) + .sortParam(sortCriterion) + .paginate(start, count); + if (requestFilters != null) { + requestBuilder.filterParam(newFilter(requestFilters)); + } + return _client.sendRequest(requestBuilder.build()).getResponse().getEntity(); + } + + @Nonnull + public CollectionResponse search(@Nonnull String input, int start, int count) + throws RemoteInvocationException { + return search(input, null, null, start, count); + } + + /** + * Gets {@link DataFlow} model for the given urn + * + * @param urn data flow urn + * @return {@link DataFlow} Data flow + * @throws RemoteInvocationException + */ + @Nonnull + public DataFlow get(@Nonnull DataFlowUrn urn) + throws RemoteInvocationException { + GetRequest getRequest = DATA_FLOWS_REQUEST_BUILDERS.get() + .id(new ComplexResourceKey<>(toDataFlowKey(urn), new EmptyRecord())) + .build(); + + return _client.sendRequest(getRequest).getResponse().getEntity(); + } + + /** + * Searches for data flows matching to a given query and filters + * + * @param input search query + * @param requestFilters search filters + * @param start start offset for search results + * @param count max number of search results requested + * @return CollectionResponse of {@link DataFlow} + * @throws RemoteInvocationException + */ + @Nonnull + public CollectionResponse search(@Nonnull String input, @Nonnull Map requestFilters, + int start, int count) throws RemoteInvocationException { + + DataFlowsFindBySearchRequestBuilder requestBuilder = DATA_FLOWS_REQUEST_BUILDERS + .findBySearch() + .inputParam(input) + .filterParam(newFilter(requestFilters)).paginate(start, count); + return _client.sendRequest(requestBuilder.build()).getResponse().getEntity(); + } + + /** + * Autocomplete search for data flows in search bar + * + * @param query search query + * @param field field of the Data Flow + * @param requestFilters autocomplete filters + * @param limit max number of autocomplete results + * @throws RemoteInvocationException + */ + @Nonnull + public AutoCompleteResult autoComplete(@Nonnull String query, @Nonnull String field, + @Nonnull Map requestFilters, + @Nonnull int limit) throws RemoteInvocationException { + DataFlowsDoAutocompleteRequestBuilder requestBuilder = DATA_FLOWS_REQUEST_BUILDERS + .actionAutocomplete() + .queryParam(query) + .fieldParam(field) + .filterParam(newFilter(requestFilters)) + .limitParam(limit); + return _client.sendRequest(requestBuilder.build()).getResponse().getEntity(); + } + + /** + * Batch gets list of {@link DataFlow} + * + * @param urns list of flow urn + * @return map of {@link DataFlow} + * @throws RemoteInvocationException + */ + @Nonnull + public Map batchGet(@Nonnull Set urns) + throws RemoteInvocationException { + BatchGetEntityRequest, DataFlow> batchGetRequest + = DATA_FLOWS_REQUEST_BUILDERS.batchGet() + .ids(urns.stream().map(this::getKeyFromUrn).collect(Collectors.toSet())) + .build(); + + return _client.sendRequest(batchGetRequest).getResponseEntity().getResults() + .entrySet().stream().collect(Collectors.toMap( + entry -> getUrnFromKey(entry.getKey()), + entry -> entry.getValue().getEntity()) + ); + } + + @Nonnull + private ComplexResourceKey getKeyFromUrn(@Nonnull DataFlowUrn urn) { + return new ComplexResourceKey<>(toDataFlowKey(urn), new EmptyRecord()); + } + + @Nonnull + private DataFlowUrn getUrnFromKey(@Nonnull ComplexResourceKey key) { + return toFlowUrn(key.getKey()); + } + + @Nonnull + protected DataFlowKey toDataFlowKey(@Nonnull DataFlowUrn urn) { + return new DataFlowKey() + .setOrchestrator(urn.getOrchestratorEntity()) + .setFlowId(urn.getFlowIdEntity()) + .setCluster(urn.getClusterEntity()); + } + + @Nonnull + protected DataFlowUrn toFlowUrn(@Nonnull DataFlowKey key) { + return new DataFlowUrn(key.getOrchestrator(), key.getFlowId(), key.getCluster()); + } +} diff --git a/gms/client/src/main/java/com/linkedin/datajob/client/DataJobs.java b/gms/client/src/main/java/com/linkedin/datajob/client/DataJobs.java new file mode 100644 index 00000000000000..78cbd0b8abaf42 --- /dev/null +++ b/gms/client/src/main/java/com/linkedin/datajob/client/DataJobs.java @@ -0,0 +1,160 @@ +package com.linkedin.datajob.client; + +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.query.AutoCompleteResult; +import com.linkedin.metadata.query.SortCriterion; +import com.linkedin.metadata.restli.BaseSearchableClient; +import com.linkedin.datajob.DataJob; +import com.linkedin.datajob.DataJobKey; +import com.linkedin.datajob.DataJobsDoAutocompleteRequestBuilder; +import com.linkedin.datajob.DataJobsFindBySearchRequestBuilder; +import com.linkedin.datajob.DataJobsRequestBuilders; +import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.restli.client.BatchGetEntityRequest; +import com.linkedin.restli.client.Client; +import com.linkedin.restli.client.GetRequest; +import com.linkedin.restli.common.CollectionResponse; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.EmptyRecord; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static com.linkedin.metadata.dao.utils.QueryUtils.*; + +public class DataJobs extends BaseSearchableClient { + private static final DataJobsRequestBuilders DATA_JOBS_REQUEST_BUILDERS = new DataJobsRequestBuilders(); + + public DataJobs(@Nonnull Client restliClient) { + super(restliClient); + } + + @Nonnull + @Override + public CollectionResponse search(@Nonnull String input, @Nullable StringArray aspectNames, + @Nullable Map requestFilters, @Nullable SortCriterion sortCriterion, int start, int count) + throws RemoteInvocationException { + final DataJobsFindBySearchRequestBuilder requestBuilder = DATA_JOBS_REQUEST_BUILDERS.findBySearch() + .aspectsParam(aspectNames) + .inputParam(input) + .sortParam(sortCriterion) + .paginate(start, count); + if (requestFilters != null) { + requestBuilder.filterParam(newFilter(requestFilters)); + } + return _client.sendRequest(requestBuilder.build()).getResponse().getEntity(); + } + + @Nonnull + public CollectionResponse search(@Nonnull String input, int start, int count) + throws RemoteInvocationException { + return search(input, null, null, start, count); + } + + /** + * Gets {@link DataJob} model for the given urn + * + * @param urn data job urn + * @return {@link DataJob} data job + * @throws RemoteInvocationException + */ + @Nonnull + public DataJob get(@Nonnull DataJobUrn urn) + throws RemoteInvocationException { + GetRequest getRequest = DATA_JOBS_REQUEST_BUILDERS.get() + .id(new ComplexResourceKey<>(toDataJobKey(urn), new EmptyRecord())) + .build(); + + return _client.sendRequest(getRequest).getResponse().getEntity(); + } + + /** + * Searches for data jobs matching to a given query and filters + * + * @param input search query + * @param requestFilters search filters + * @param start start offset for search results + * @param count max number of search results requested + * @return CollectionResponse of {@link DataJob} + * @throws RemoteInvocationException + */ + @Nonnull + public CollectionResponse search(@Nonnull String input, @Nonnull Map requestFilters, + int start, int count) throws RemoteInvocationException { + + DataJobsFindBySearchRequestBuilder requestBuilder = DATA_JOBS_REQUEST_BUILDERS + .findBySearch() + .inputParam(input) + .filterParam(newFilter(requestFilters)).paginate(start, count); + return _client.sendRequest(requestBuilder.build()).getResponse().getEntity(); + } + + /** + * Autocomplete search for data jobs in search bar + * + * @param query search query + * @param field field of the Data Job + * @param requestFilters autocomplete filters + * @param limit max number of autocomplete results + * @throws RemoteInvocationException + */ + @Nonnull + public AutoCompleteResult autoComplete(@Nonnull String query, @Nonnull String field, + @Nonnull Map requestFilters, + @Nonnull int limit) throws RemoteInvocationException { + DataJobsDoAutocompleteRequestBuilder requestBuilder = DATA_JOBS_REQUEST_BUILDERS + .actionAutocomplete() + .queryParam(query) + .fieldParam(field) + .filterParam(newFilter(requestFilters)) + .limitParam(limit); + return _client.sendRequest(requestBuilder.build()).getResponse().getEntity(); + } + + /** + * Batch gets list of {@link DataJob} + * + * @param urns list of flow urn + * @return map of {@link DataJob} + * @throws RemoteInvocationException + */ + @Nonnull + public Map batchGet(@Nonnull Set urns) + throws RemoteInvocationException { + BatchGetEntityRequest, DataJob> batchGetRequest + = DATA_JOBS_REQUEST_BUILDERS.batchGet() + .ids(urns.stream().map(this::getKeyFromUrn).collect(Collectors.toSet())) + .build(); + + return _client.sendRequest(batchGetRequest).getResponseEntity().getResults() + .entrySet().stream().collect(Collectors.toMap( + entry -> getUrnFromKey(entry.getKey()), + entry -> entry.getValue().getEntity()) + ); + } + + @Nonnull + private ComplexResourceKey getKeyFromUrn(@Nonnull DataJobUrn urn) { + return new ComplexResourceKey<>(toDataJobKey(urn), new EmptyRecord()); + } + + @Nonnull + private DataJobUrn getUrnFromKey(@Nonnull ComplexResourceKey key) { + return toFlowUrn(key.getKey()); + } + + @Nonnull + protected DataJobKey toDataJobKey(@Nonnull DataJobUrn urn) { + return new DataJobKey() + .setDataFlow(urn.getFlowEntity()) + .setJobId(urn.getJobIdEntity()); + } + + @Nonnull + protected DataJobUrn toFlowUrn(@Nonnull DataJobKey key) { + return new DataJobUrn(key.getDataFlow(), key.getJobId()); + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataFlowDAOFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataFlowDAOFactory.java new file mode 100644 index 00000000000000..eaaeca054f52a4 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataFlowDAOFactory.java @@ -0,0 +1,34 @@ +package com.linkedin.gms.factory.datajob; + +import javax.annotation.Nonnull; + +import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.metadata.aspect.DataFlowAspect; +import com.linkedin.metadata.dao.EbeanLocalDAO; +import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer; +import com.linkedin.metadata.snapshot.DataFlowSnapshot; + +import io.ebean.config.ServerConfig; + +@Configuration +public class DataFlowDAOFactory { + @Autowired + private ApplicationContext applicationContext; + + @Bean(name = "dataFlowDAO") + @DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"}) + @Nonnull + protected EbeanLocalDAO createInstance() { + KafkaMetadataEventProducer producer = + new KafkaMetadataEventProducer<>(DataFlowSnapshot.class, DataFlowAspect.class, + applicationContext.getBean(Producer.class)); + return new EbeanLocalDAO<>(DataFlowAspect.class, producer, applicationContext.getBean(ServerConfig.class), DataFlowUrn.class); + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataFlowSearchDAOFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataFlowSearchDAOFactory.java new file mode 100644 index 00000000000000..2ed41918a3f0c5 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataFlowSearchDAOFactory.java @@ -0,0 +1,29 @@ +package com.linkedin.gms.factory.datajob; + +import com.linkedin.metadata.configs.DataFlowSearchConfig; +import com.linkedin.metadata.dao.search.ESSearchDAO; +import com.linkedin.metadata.search.DataFlowDocument; + +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +import javax.annotation.Nonnull; + +@Configuration +public class DataFlowSearchDAOFactory { + + @Autowired + ApplicationContext applicationContext; + + @Bean(name = "dataFlowSearchDAO") + @DependsOn({"elasticSearchRestHighLevelClient"}) + @Nonnull + protected ESSearchDAO createInstance() { + return new ESSearchDAO(applicationContext.getBean(RestHighLevelClient.class), DataFlowDocument.class, + new DataFlowSearchConfig()); + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataJobDAOFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataJobDAOFactory.java new file mode 100644 index 00000000000000..0e9cebbd21fff2 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataJobDAOFactory.java @@ -0,0 +1,34 @@ +package com.linkedin.gms.factory.datajob; + +import javax.annotation.Nonnull; + +import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.metadata.aspect.DataJobAspect; +import com.linkedin.metadata.dao.EbeanLocalDAO; +import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer; +import com.linkedin.metadata.snapshot.DataJobSnapshot; + +import io.ebean.config.ServerConfig; + +@Configuration +public class DataJobDAOFactory { + @Autowired + private ApplicationContext applicationContext; + + @Bean(name = "dataJobDAO") + @DependsOn({"gmsEbeanServiceConfig", "kafkaEventProducer"}) + @Nonnull + protected EbeanLocalDAO createInstance() { + KafkaMetadataEventProducer producer = + new KafkaMetadataEventProducer<>(DataJobSnapshot.class, DataJobAspect.class, + applicationContext.getBean(Producer.class)); + return new EbeanLocalDAO<>(DataJobAspect.class, producer, applicationContext.getBean(ServerConfig.class), DataJobUrn.class); + } +} diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataJobSearchDAOFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataJobSearchDAOFactory.java new file mode 100644 index 00000000000000..1b53b2b51fe104 --- /dev/null +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/datajob/DataJobSearchDAOFactory.java @@ -0,0 +1,29 @@ +package com.linkedin.gms.factory.datajob; + +import com.linkedin.metadata.configs.DataJobSearchConfig; +import com.linkedin.metadata.dao.search.ESSearchDAO; +import com.linkedin.metadata.search.DataJobDocument; + +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +import javax.annotation.Nonnull; + +@Configuration +public class DataJobSearchDAOFactory { + + @Autowired + ApplicationContext applicationContext; + + @Bean(name = "dataJobSearchDAO") + @DependsOn({"elasticSearchRestHighLevelClient"}) + @Nonnull + protected ESSearchDAO createInstance() { + return new ESSearchDAO(applicationContext.getBean(RestHighLevelClient.class), DataJobDocument.class, + new DataJobSearchConfig()); + } +} diff --git a/gms/impl/src/integTest/java/com/linkedin/metadata/DataFlowSearchSanityTest.java b/gms/impl/src/integTest/java/com/linkedin/metadata/DataFlowSearchSanityTest.java new file mode 100644 index 00000000000000..34eee9cc30e3f9 --- /dev/null +++ b/gms/impl/src/integTest/java/com/linkedin/metadata/DataFlowSearchSanityTest.java @@ -0,0 +1,42 @@ +package com.linkedin.metadata; + +import java.util.Collections; + +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.configs.DataFlowSearchConfig; +import com.linkedin.metadata.search.DataFlowDocument; +import com.linkedin.metadata.testing.BaseSearchSanityTests; +import com.linkedin.metadata.testing.SearchIndex; +import com.linkedin.metadata.testing.annotations.SearchIndexMappings; +import com.linkedin.metadata.testing.annotations.SearchIndexSettings; +import com.linkedin.metadata.testing.annotations.SearchIndexType; +import javax.annotation.Nonnull; + + +public class DataFlowSearchSanityTest extends BaseSearchSanityTests { + @SearchIndexType(DataFlowDocument.class) + @SearchIndexSettings("/index/dataflow/settings.json") + @SearchIndexMappings("/index/dataflow/mappings.json") + public SearchIndex _index; + + private static final DataFlowUrn URN = new DataFlowUrn("airflow", "my_pipeline", "prod_cluster"); + private static final DataFlowDocument DOCUMENT = new DataFlowDocument().setUrn(URN) + .setFlowId(URN.getFlowIdEntity()) + .setName("go_with_the_flow") + .setDescription("My pipeline!") + .setOrchestrator(URN.getOrchestratorEntity()) + .setOwners(new StringArray("fbaggins")) + .setCluster(URN.getClusterEntity()) + .setProject("toy_project"); + + protected DataFlowSearchSanityTest() { + super(URN, DOCUMENT, new DataFlowSearchConfig()); + } + + @Nonnull + @Override + public SearchIndex getIndex() { + return _index; + } +} diff --git a/gms/impl/src/integTest/java/com/linkedin/metadata/DataJobSearchSanityTest.java b/gms/impl/src/integTest/java/com/linkedin/metadata/DataJobSearchSanityTest.java new file mode 100644 index 00000000000000..f4106229e8f95a --- /dev/null +++ b/gms/impl/src/integTest/java/com/linkedin/metadata/DataJobSearchSanityTest.java @@ -0,0 +1,43 @@ +package com.linkedin.metadata; + +import java.util.Collections; + +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.configs.DataJobSearchConfig; +import com.linkedin.metadata.search.DataJobDocument; +import com.linkedin.metadata.testing.BaseSearchSanityTests; +import com.linkedin.metadata.testing.SearchIndex; +import com.linkedin.metadata.testing.annotations.SearchIndexMappings; +import com.linkedin.metadata.testing.annotations.SearchIndexSettings; +import com.linkedin.metadata.testing.annotations.SearchIndexType; +import javax.annotation.Nonnull; + + +public class DataJobSearchSanityTest extends BaseSearchSanityTests { + @SearchIndexType(DataJobDocument.class) + @SearchIndexSettings("/index/datajob/settings.json") + @SearchIndexMappings("/index/datajob/mappings.json") + public SearchIndex _index; + + private static final DataFlowUrn DATAFLOW_URN = new DataFlowUrn("airflow", "my_pipeline", "prod_cluster"); + private static final DataJobUrn URN = new DataJobUrn(DATAFLOW_URN, "my_job"); + + private static final DataJobDocument DOCUMENT = new DataJobDocument().setUrn(URN) + .setName("quest") + .setDescription("My pipeline!") + .setDataFlow(URN.getFlowEntity().getFlowIdEntity()) + .setOwners(new StringArray("fbaggins")) + .setJobId(URN.getJobIdEntity()); + + protected DataJobSearchSanityTest() { + super(URN, DOCUMENT, new DataJobSearchConfig()); + } + + @Nonnull + @Override + public SearchIndex getIndex() { + return _index; + } +} diff --git a/gms/impl/src/main/java/com/linkedin/metadata/configs/DataFlowSearchConfig.java b/gms/impl/src/main/java/com/linkedin/metadata/configs/DataFlowSearchConfig.java new file mode 100644 index 00000000000000..82f87d278f61e3 --- /dev/null +++ b/gms/impl/src/main/java/com/linkedin/metadata/configs/DataFlowSearchConfig.java @@ -0,0 +1,41 @@ +package com.linkedin.metadata.configs; + +import com.linkedin.metadata.dao.search.BaseSearchConfig; +import com.linkedin.metadata.dao.utils.SearchUtils; +import com.linkedin.metadata.search.DataFlowDocument; + +import javax.annotation.Nonnull; +import java.util.Collections; +import java.util.Set; + +public class DataFlowSearchConfig extends BaseSearchConfig { + @Override + @Nonnull + public Set getFacetFields() { + return Collections.emptySet(); + } + + @Override + @Nonnull + public Class getSearchDocument() { + return DataFlowDocument.class; + } + + @Override + @Nonnull + public String getDefaultAutocompleteField() { + return "flowId"; + } + + @Override + @Nonnull + public String getSearchQueryTemplate() { + return SearchUtils.readResourceFile(getClass(), "dataFlowESSearchQueryTemplate.json"); + } + + @Override + @Nonnull + public String getAutocompleteQueryTemplate() { + return SearchUtils.readResourceFile(getClass(), "dataFlowESAutocompleteQueryTemplate.json"); + } +} diff --git a/gms/impl/src/main/java/com/linkedin/metadata/configs/DataJobSearchConfig.java b/gms/impl/src/main/java/com/linkedin/metadata/configs/DataJobSearchConfig.java new file mode 100644 index 00000000000000..311179a8711fc8 --- /dev/null +++ b/gms/impl/src/main/java/com/linkedin/metadata/configs/DataJobSearchConfig.java @@ -0,0 +1,41 @@ +package com.linkedin.metadata.configs; + +import com.linkedin.metadata.dao.search.BaseSearchConfig; +import com.linkedin.metadata.dao.utils.SearchUtils; +import com.linkedin.metadata.search.DataJobDocument; + +import javax.annotation.Nonnull; +import java.util.Collections; +import java.util.Set; + +public class DataJobSearchConfig extends BaseSearchConfig { + @Override + @Nonnull + public Set getFacetFields() { + return Collections.emptySet(); + } + + @Override + @Nonnull + public Class getSearchDocument() { + return DataJobDocument.class; + } + + @Override + @Nonnull + public String getDefaultAutocompleteField() { + return "jobId"; + } + + @Override + @Nonnull + public String getSearchQueryTemplate() { + return SearchUtils.readResourceFile(getClass(), "dataJobESSearchQueryTemplate.json"); + } + + @Override + @Nonnull + public String getAutocompleteQueryTemplate() { + return SearchUtils.readResourceFile(getClass(), "dataJobESAutocompleteQueryTemplate.json"); + } +} diff --git a/gms/impl/src/main/java/com/linkedin/metadata/resources/datajob/DataFlows.java b/gms/impl/src/main/java/com/linkedin/metadata/resources/datajob/DataFlows.java new file mode 100644 index 00000000000000..71323024fbc25b --- /dev/null +++ b/gms/impl/src/main/java/com/linkedin/metadata/resources/datajob/DataFlows.java @@ -0,0 +1,236 @@ +package com.linkedin.metadata.resources.datajob; + +import com.linkedin.datajob.DataFlowInfo; +import com.linkedin.common.Ownership; +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.datajob.DataFlow; +import com.linkedin.datajob.DataFlowKey; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.aspect.DataFlowAspect; +import com.linkedin.metadata.dao.BaseBrowseDAO; +import com.linkedin.metadata.dao.BaseLocalDAO; +import com.linkedin.metadata.dao.BaseSearchDAO; +import com.linkedin.metadata.dao.utils.ModelUtils; +import com.linkedin.metadata.query.AutoCompleteResult; +import com.linkedin.metadata.query.BrowseResult; +import com.linkedin.metadata.query.Filter; +import com.linkedin.metadata.query.SearchResultMetadata; +import com.linkedin.metadata.query.SortCriterion; +import com.linkedin.metadata.restli.BackfillResult; +import com.linkedin.metadata.restli.BaseBrowsableEntityResource; +import com.linkedin.metadata.search.DataFlowDocument; +import com.linkedin.metadata.snapshot.DataFlowSnapshot; +import com.linkedin.parseq.Task; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.EmptyRecord; +import com.linkedin.restli.server.CollectionResult; +import com.linkedin.restli.server.PagingContext; +import com.linkedin.restli.server.annotations.Action; +import com.linkedin.restli.server.annotations.ActionParam; +import com.linkedin.restli.server.annotations.Finder; +import com.linkedin.restli.server.annotations.Optional; +import com.linkedin.restli.server.annotations.PagingContextParam; +import com.linkedin.restli.server.annotations.QueryParam; +import com.linkedin.restli.server.annotations.RestLiCollection; +import com.linkedin.restli.server.annotations.RestMethod; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.inject.Named; + +import static com.linkedin.metadata.restli.RestliConstants.*; + + +@RestLiCollection(name = "dataFlows", namespace = "com.linkedin.dataflow", keyName = "key") +public class DataFlows extends BaseBrowsableEntityResource< + // @formatter:off + ComplexResourceKey, + DataFlow, + DataFlowUrn, + DataFlowSnapshot, + DataFlowAspect, + DataFlowDocument> { + // @formatter:on + + public DataFlows() { + super(DataFlowSnapshot.class, DataFlowAspect.class, DataFlowUrn.class); + } + + @Inject + @Named("dataFlowDAO") + private BaseLocalDAO _localDAO; + + @Inject + @Named("dataFlowSearchDAO") + private BaseSearchDAO _esSearchDAO; + + @Inject + @Named("dataFlowBrowseDao") + private BaseBrowseDAO _browseDAO; + + @Nonnull + @Override + protected BaseSearchDAO getSearchDAO() { + return _esSearchDAO; + } + + @Nonnull + @Override + protected BaseLocalDAO getLocalDAO() { + return _localDAO; + } + + @Nonnull + @Override + protected BaseBrowseDAO getBrowseDAO() { + return _browseDAO; + } + + @Nonnull + @Override + protected DataFlowUrn createUrnFromString(@Nonnull String urnString) throws Exception { + return DataFlowUrn.createFromString(urnString); + } + + @Nonnull + @Override + protected DataFlowUrn toUrn(@Nonnull ComplexResourceKey key) { + return new DataFlowUrn(key.getKey().getOrchestrator(), key.getKey().getFlowId(), key.getKey().getCluster()); + } + + @Nonnull + @Override + protected ComplexResourceKey toKey(@Nonnull DataFlowUrn urn) { + return new ComplexResourceKey<>( + new DataFlowKey() + .setOrchestrator(urn.getOrchestratorEntity()) + .setFlowId(urn.getFlowIdEntity()) + .setCluster(urn.getClusterEntity()), + new EmptyRecord()); + } + + @Nonnull + @Override + protected DataFlow toValue(@Nonnull DataFlowSnapshot snapshot) { + final DataFlow value = new DataFlow() + .setUrn(snapshot.getUrn()) + .setOrchestrator(snapshot.getUrn().getOrchestratorEntity()) + .setFlowId(snapshot.getUrn().getFlowIdEntity()) + .setCluster(snapshot.getUrn().getClusterEntity()); + ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> { + if (aspect instanceof DataFlowInfo) { + DataFlowInfo info = DataFlowInfo.class.cast(aspect); + value.setInfo(info); + } else if (aspect instanceof Ownership) { + Ownership ownership = Ownership.class.cast(aspect); + value.setOwnership(ownership); + } + }); + + return value; + } + + @Nonnull + @Override + protected DataFlowSnapshot toSnapshot(@Nonnull DataFlow dataFlow, @Nonnull DataFlowUrn urn) { + final List aspects = new ArrayList<>(); + if (dataFlow.hasInfo()) { + aspects.add(ModelUtils.newAspectUnion(DataFlowAspect.class, dataFlow.getInfo())); + } + if (dataFlow.hasOwnership()) { + aspects.add(ModelUtils.newAspectUnion(DataFlowAspect.class, dataFlow.getOwnership())); + } + return ModelUtils.newSnapshot(DataFlowSnapshot.class, urn, aspects); + } + + @RestMethod.Get + @Override + @Nonnull + public Task get(@Nonnull ComplexResourceKey key, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.get(key, aspectNames); + } + + @RestMethod.BatchGet + @Override + @Nonnull + public Task, DataFlow>> batchGet( + @Nonnull Set> keys, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.batchGet(keys, aspectNames); + } + + @RestMethod.GetAll + @Nonnull + public Task> getAll(@PagingContextParam @Nonnull PagingContext pagingContext, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, + @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, + @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion) { + return super.getAll(pagingContext, aspectNames, filter, sortCriterion); + } + + @Finder(FINDER_SEARCH) + @Override + @Nonnull + public Task> search(@QueryParam(PARAM_INPUT) @Nonnull String input, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, + @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, + @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion, + @PagingContextParam @Nonnull PagingContext pagingContext) { + return super.search(input, aspectNames, filter, sortCriterion, pagingContext); + } + + @Action(name = ACTION_AUTOCOMPLETE) + @Override + @Nonnull + public Task autocomplete(@ActionParam(PARAM_QUERY) @Nonnull String query, + @ActionParam(PARAM_FIELD) @Nullable String field, @ActionParam(PARAM_FILTER) @Nullable Filter filter, + @ActionParam(PARAM_LIMIT) int limit) { + return super.autocomplete(query, field, filter, limit); + } + + @Action(name = ACTION_BROWSE) + @Override + @Nonnull + public Task browse(@ActionParam(PARAM_PATH) @Nonnull String path, + @ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter, @ActionParam(PARAM_START) int start, + @ActionParam(PARAM_LIMIT) int limit) { + return super.browse(path, filter, start, limit); + } + + @Action(name = ACTION_GET_BROWSE_PATHS) + @Override + @Nonnull + public Task getBrowsePaths( + @ActionParam(value = "urn", typeref = com.linkedin.common.Urn.class) @Nonnull Urn urn) { + return super.getBrowsePaths(urn); + } + + @Action(name = ACTION_INGEST) + @Override + @Nonnull + public Task ingest(@ActionParam(PARAM_SNAPSHOT) @Nonnull DataFlowSnapshot snapshot) { + return super.ingest(snapshot); + } + + @Action(name = ACTION_GET_SNAPSHOT) + @Override + @Nonnull + public Task getSnapshot(@ActionParam(PARAM_URN) @Nonnull String urnString, + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.getSnapshot(urnString, aspectNames); + } + + @Action(name = ACTION_BACKFILL_WITH_URNS) + @Override + @Nonnull + public Task backfill(@ActionParam(PARAM_URNS) @Nonnull String[] urns, + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.backfill(urns, aspectNames); + } +} diff --git a/gms/impl/src/main/java/com/linkedin/metadata/resources/datajob/DataJobs.java b/gms/impl/src/main/java/com/linkedin/metadata/resources/datajob/DataJobs.java new file mode 100644 index 00000000000000..1f045fe9b84225 --- /dev/null +++ b/gms/impl/src/main/java/com/linkedin/metadata/resources/datajob/DataJobs.java @@ -0,0 +1,241 @@ +package com.linkedin.metadata.resources.datajob; + +import com.linkedin.datajob.DataJobInfo; +import com.linkedin.datajob.DataJobInputOutput; +import com.linkedin.common.Ownership; +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.datajob.DataJob; +import com.linkedin.datajob.DataJobKey; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.aspect.DataJobAspect; +import com.linkedin.metadata.dao.BaseBrowseDAO; +import com.linkedin.metadata.dao.BaseLocalDAO; +import com.linkedin.metadata.dao.BaseSearchDAO; +import com.linkedin.metadata.dao.utils.ModelUtils; +import com.linkedin.metadata.query.AutoCompleteResult; +import com.linkedin.metadata.query.BrowseResult; +import com.linkedin.metadata.query.Filter; +import com.linkedin.metadata.query.SearchResultMetadata; +import com.linkedin.metadata.query.SortCriterion; +import com.linkedin.metadata.restli.BackfillResult; +import com.linkedin.metadata.restli.BaseBrowsableEntityResource; +import com.linkedin.metadata.search.DataJobDocument; +import com.linkedin.metadata.snapshot.DataJobSnapshot; +import com.linkedin.parseq.Task; +import com.linkedin.restli.common.ComplexResourceKey; +import com.linkedin.restli.common.EmptyRecord; +import com.linkedin.restli.server.CollectionResult; +import com.linkedin.restli.server.PagingContext; +import com.linkedin.restli.server.annotations.Action; +import com.linkedin.restli.server.annotations.ActionParam; +import com.linkedin.restli.server.annotations.Finder; +import com.linkedin.restli.server.annotations.Optional; +import com.linkedin.restli.server.annotations.PagingContextParam; +import com.linkedin.restli.server.annotations.QueryParam; +import com.linkedin.restli.server.annotations.RestLiCollection; +import com.linkedin.restli.server.annotations.RestMethod; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.inject.Inject; +import javax.inject.Named; + +import static com.linkedin.metadata.restli.RestliConstants.*; + + +@RestLiCollection(name = "dataJobs", namespace = "com.linkedin.datajob", keyName = "key") +public class DataJobs extends BaseBrowsableEntityResource< + // @formatter:off + ComplexResourceKey, + DataJob, + DataJobUrn, + DataJobSnapshot, + DataJobAspect, + DataJobDocument> { + // @formatter:on + + public DataJobs() { + super(DataJobSnapshot.class, DataJobAspect.class, DataJobUrn.class); + } + + @Inject + @Named("dataJobDAO") + private BaseLocalDAO _localDAO; + + @Inject + @Named("dataJobSearchDAO") + private BaseSearchDAO _esSearchDAO; + + @Inject + @Named("dataJobBrowseDao") + private BaseBrowseDAO _browseDAO; + + @Nonnull + @Override + protected BaseSearchDAO getSearchDAO() { + return _esSearchDAO; + } + + @Nonnull + @Override + protected BaseLocalDAO getLocalDAO() { + return _localDAO; + } + + @Nonnull + @Override + protected BaseBrowseDAO getBrowseDAO() { + return _browseDAO; + } + + @Nonnull + @Override + protected DataJobUrn createUrnFromString(@Nonnull String urnString) throws Exception { + return DataJobUrn.createFromString(urnString); + } + + @Nonnull + @Override + protected DataJobUrn toUrn(@Nonnull ComplexResourceKey key) { + return new DataJobUrn(key.getKey().getDataFlow(), key.getKey().getJobId()); + } + + @Nonnull + @Override + protected ComplexResourceKey toKey(@Nonnull DataJobUrn urn) { + return new ComplexResourceKey<>( + new DataJobKey() + .setDataFlow(urn.getFlowEntity()) + .setJobId(urn.getJobIdEntity()), + new EmptyRecord()); + } + + @Nonnull + @Override + protected DataJob toValue(@Nonnull DataJobSnapshot snapshot) { + final DataJob value = new DataJob() + .setUrn(snapshot.getUrn()) + .setDataFlow(snapshot.getUrn().getFlowEntity()) + .setJobId(snapshot.getUrn().getJobIdEntity()); + ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> { + if (aspect instanceof DataJobInfo) { + DataJobInfo info = DataJobInfo.class.cast(aspect); + value.setInfo(info); + } else if (aspect instanceof DataJobInputOutput) { + DataJobInputOutput inputOutput = DataJobInputOutput.class.cast(aspect); + value.setInputOutput(inputOutput); + } else if (aspect instanceof Ownership) { + Ownership ownership = Ownership.class.cast(aspect); + value.setOwnership(ownership); + } + }); + + return value; + } + + @Nonnull + @Override + protected DataJobSnapshot toSnapshot(@Nonnull DataJob dataJob, @Nonnull DataJobUrn urn) { + final List aspects = new ArrayList<>(); + if (dataJob.hasInfo()) { + aspects.add(ModelUtils.newAspectUnion(DataJobAspect.class, dataJob.getInfo())); + } + if (dataJob.hasInputOutput()) { + aspects.add(ModelUtils.newAspectUnion(DataJobAspect.class, dataJob.getInputOutput())); + } + if (dataJob.hasOwnership()) { + aspects.add(ModelUtils.newAspectUnion(DataJobAspect.class, dataJob.getOwnership())); + } + return ModelUtils.newSnapshot(DataJobSnapshot.class, urn, aspects); + } + + @RestMethod.Get + @Override + @Nonnull + public Task get(@Nonnull ComplexResourceKey key, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.get(key, aspectNames); + } + + @RestMethod.BatchGet + @Override + @Nonnull + public Task, DataJob>> batchGet( + @Nonnull Set> keys, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.batchGet(keys, aspectNames); + } + + @RestMethod.GetAll + @Nonnull + public Task> getAll(@PagingContextParam @Nonnull PagingContext pagingContext, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, + @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, + @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion) { + return super.getAll(pagingContext, aspectNames, filter, sortCriterion); + } + + @Finder(FINDER_SEARCH) + @Override + @Nonnull + public Task> search(@QueryParam(PARAM_INPUT) @Nonnull String input, + @QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames, + @QueryParam(PARAM_FILTER) @Optional @Nullable Filter filter, + @QueryParam(PARAM_SORT) @Optional @Nullable SortCriterion sortCriterion, + @PagingContextParam @Nonnull PagingContext pagingContext) { + return super.search(input, aspectNames, filter, sortCriterion, pagingContext); + } + + @Action(name = ACTION_AUTOCOMPLETE) + @Override + @Nonnull + public Task autocomplete(@ActionParam(PARAM_QUERY) @Nonnull String query, + @ActionParam(PARAM_FIELD) @Nullable String field, @ActionParam(PARAM_FILTER) @Nullable Filter filter, + @ActionParam(PARAM_LIMIT) int limit) { + return super.autocomplete(query, field, filter, limit); + } + + @Action(name = ACTION_BROWSE) + @Override + @Nonnull + public Task browse(@ActionParam(PARAM_PATH) @Nonnull String path, + @ActionParam(PARAM_FILTER) @Optional @Nullable Filter filter, @ActionParam(PARAM_START) int start, + @ActionParam(PARAM_LIMIT) int limit) { + return super.browse(path, filter, start, limit); + } + + @Action(name = ACTION_GET_BROWSE_PATHS) + @Override + @Nonnull + public Task getBrowsePaths( + @ActionParam(value = "urn", typeref = com.linkedin.common.Urn.class) @Nonnull Urn urn) { + return super.getBrowsePaths(urn); + } + + @Action(name = ACTION_INGEST) + @Override + @Nonnull + public Task ingest(@ActionParam(PARAM_SNAPSHOT) @Nonnull DataJobSnapshot snapshot) { + return super.ingest(snapshot); + } + + @Action(name = ACTION_GET_SNAPSHOT) + @Override + @Nonnull + public Task getSnapshot(@ActionParam(PARAM_URN) @Nonnull String urnString, + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.getSnapshot(urnString, aspectNames); + } + + @Action(name = ACTION_BACKFILL_WITH_URNS) + @Override + @Nonnull + public Task backfill(@ActionParam(PARAM_URNS) @Nonnull String[] urns, + @ActionParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames) { + return super.backfill(urns, aspectNames); + } +} diff --git a/gms/impl/src/main/resources/dataFlowESAutocompleteQueryTemplate.json b/gms/impl/src/main/resources/dataFlowESAutocompleteQueryTemplate.json new file mode 100644 index 00000000000000..7241a1a5803ca0 --- /dev/null +++ b/gms/impl/src/main/resources/dataFlowESAutocompleteQueryTemplate.json @@ -0,0 +1,11 @@ +{ + "query_string": { + "query": "$INPUT", + "fields": [ + "$FIELD^4", + "$FIELD.ngram" + ], + "default_operator": "and", + "analyzer": "lowercase_keyword" + } +} diff --git a/gms/impl/src/main/resources/dataFlowESSearchQueryTemplate.json b/gms/impl/src/main/resources/dataFlowESSearchQueryTemplate.json new file mode 100644 index 00000000000000..a342c823130f74 --- /dev/null +++ b/gms/impl/src/main/resources/dataFlowESSearchQueryTemplate.json @@ -0,0 +1,27 @@ +{ + "function_score": { + "query": { + "query_string": { + "query": "$INPUT", + "fields": [ + "flowId^4", + "orchestrator", + "cluster" + ], + "default_operator": "and", + "analyzer": "standard" + } + }, + "functions": [ + { + "filter": { + "term": { + "removed": false + } + }, + "weight": 2 + } + ], + "score_mode": "multiply" + } +} diff --git a/gms/impl/src/main/resources/dataJobESAutocompleteQueryTemplate.json b/gms/impl/src/main/resources/dataJobESAutocompleteQueryTemplate.json new file mode 100644 index 00000000000000..7241a1a5803ca0 --- /dev/null +++ b/gms/impl/src/main/resources/dataJobESAutocompleteQueryTemplate.json @@ -0,0 +1,11 @@ +{ + "query_string": { + "query": "$INPUT", + "fields": [ + "$FIELD^4", + "$FIELD.ngram" + ], + "default_operator": "and", + "analyzer": "lowercase_keyword" + } +} diff --git a/gms/impl/src/main/resources/dataJobESSearchQueryTemplate.json b/gms/impl/src/main/resources/dataJobESSearchQueryTemplate.json new file mode 100644 index 00000000000000..204951131cb25a --- /dev/null +++ b/gms/impl/src/main/resources/dataJobESSearchQueryTemplate.json @@ -0,0 +1,26 @@ +{ + "function_score": { + "query": { + "query_string": { + "query": "$INPUT", + "fields": [ + "name^4", + "dataFlow" + ], + "default_operator": "and", + "analyzer": "standard" + } + }, + "functions": [ + { + "filter": { + "term": { + "removed": false + } + }, + "weight": 2 + } + ], + "score_mode": "multiply" + } +} diff --git a/gms/impl/src/main/resources/index/dataflow/mappings.json b/gms/impl/src/main/resources/index/dataflow/mappings.json new file mode 100644 index 00000000000000..2ecd9e71840fc4 --- /dev/null +++ b/gms/impl/src/main/resources/index/dataflow/mappings.json @@ -0,0 +1,75 @@ +{ + "properties": { + "browsePaths": { + "type": "text", + "fields": { + "length": { + "type": "token_count", + "analyzer": "slash_pattern" + } + }, + "analyzer": "custom_browse_slash", + "fielddata": true + }, + "access": { + "type": "keyword", + "fields": { + "ngram": { + "type": "text", + "analyzer": "delimit_edgengram" + } + } + }, + "description": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "owners": { + "type": "text", + "fields": { + "ngram": { + "type": "text", + "analyzer": "comma_pattern_ngram" + } + }, + "analyzer": "comma_pattern" + }, + "queryType": { + "type": "keyword" + }, + "title": { + "type": "keyword", + "fields": { + "delimited": { + "type": "text", + "analyzer": "delimit" + }, + "ngram": { + "type": "text", + "analyzer": "delimit_edgengram" + } + }, + "normalizer": "custom_normalizer" + }, + "tool": { + "type": "keyword", + "fields": { + "ngram": { + "type": "text", + "analyzer": "delimit_edgengram" + } + } + }, + "type": { + "type": "keyword" + }, + "urn": { + "type": "keyword", + "normalizer": "custom_normalizer" + } + } +} diff --git a/gms/impl/src/main/resources/index/dataflow/settings.json b/gms/impl/src/main/resources/index/dataflow/settings.json new file mode 100644 index 00000000000000..7be7ef7b7606da --- /dev/null +++ b/gms/impl/src/main/resources/index/dataflow/settings.json @@ -0,0 +1,97 @@ +{ + "index": { + "analysis": { + "filter": { + "autocomplete_filter": { + "type": "edge_ngram", + "min_gram": "3", + "max_gram": "20" + }, + "custom_delimiter": { + "split_on_numerics": "false", + "split_on_case_change": "false", + "type": "word_delimiter", + "preserve_original": "true", + "catenate_words": "false" + } + }, + "normalizer": { + "custom_normalizer": { + "filter": [ + "lowercase", + "asciifolding" + ], + "type": "custom" + } + }, + "analyzer": { + "slash_pattern": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "slash_tokenizer" + }, + "comma_pattern": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "comma_tokenizer" + }, + "comma_pattern_ngram": { + "filter": [ + "lowercase", + "autocomplete_filter" + ], + "type": "custom", + "tokenizer": "comma_tokenizer" + }, + "delimit_edgengram": { + "filter": [ + "lowercase", + "custom_delimiter", + "autocomplete_filter" + ], + "tokenizer": "whitespace" + }, + "delimit": { + "filter": [ + "lowercase", + "custom_delimiter" + ], + "tokenizer": "whitespace" + }, + "lowercase_keyword": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "keyword" + }, + "custom_browse_slash": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "path_hierarchy" + } + }, + "tokenizer": { + "path_hierarchy_tokenizer": { + "type": "path_hierarchy", + "replacement": "/", + "delimiter": "." + }, + "slash_tokenizer": { + "pattern": "[/]", + "type": "pattern" + }, + "comma_tokenizer": { + "pattern": ",", + "type": "pattern" + } + } + } + } +} diff --git a/gms/impl/src/main/resources/index/datajob/mappings.json b/gms/impl/src/main/resources/index/datajob/mappings.json new file mode 100644 index 00000000000000..2ecd9e71840fc4 --- /dev/null +++ b/gms/impl/src/main/resources/index/datajob/mappings.json @@ -0,0 +1,75 @@ +{ + "properties": { + "browsePaths": { + "type": "text", + "fields": { + "length": { + "type": "token_count", + "analyzer": "slash_pattern" + } + }, + "analyzer": "custom_browse_slash", + "fielddata": true + }, + "access": { + "type": "keyword", + "fields": { + "ngram": { + "type": "text", + "analyzer": "delimit_edgengram" + } + } + }, + "description": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword" + } + } + }, + "owners": { + "type": "text", + "fields": { + "ngram": { + "type": "text", + "analyzer": "comma_pattern_ngram" + } + }, + "analyzer": "comma_pattern" + }, + "queryType": { + "type": "keyword" + }, + "title": { + "type": "keyword", + "fields": { + "delimited": { + "type": "text", + "analyzer": "delimit" + }, + "ngram": { + "type": "text", + "analyzer": "delimit_edgengram" + } + }, + "normalizer": "custom_normalizer" + }, + "tool": { + "type": "keyword", + "fields": { + "ngram": { + "type": "text", + "analyzer": "delimit_edgengram" + } + } + }, + "type": { + "type": "keyword" + }, + "urn": { + "type": "keyword", + "normalizer": "custom_normalizer" + } + } +} diff --git a/gms/impl/src/main/resources/index/datajob/settings.json b/gms/impl/src/main/resources/index/datajob/settings.json new file mode 100644 index 00000000000000..7be7ef7b7606da --- /dev/null +++ b/gms/impl/src/main/resources/index/datajob/settings.json @@ -0,0 +1,97 @@ +{ + "index": { + "analysis": { + "filter": { + "autocomplete_filter": { + "type": "edge_ngram", + "min_gram": "3", + "max_gram": "20" + }, + "custom_delimiter": { + "split_on_numerics": "false", + "split_on_case_change": "false", + "type": "word_delimiter", + "preserve_original": "true", + "catenate_words": "false" + } + }, + "normalizer": { + "custom_normalizer": { + "filter": [ + "lowercase", + "asciifolding" + ], + "type": "custom" + } + }, + "analyzer": { + "slash_pattern": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "slash_tokenizer" + }, + "comma_pattern": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "comma_tokenizer" + }, + "comma_pattern_ngram": { + "filter": [ + "lowercase", + "autocomplete_filter" + ], + "type": "custom", + "tokenizer": "comma_tokenizer" + }, + "delimit_edgengram": { + "filter": [ + "lowercase", + "custom_delimiter", + "autocomplete_filter" + ], + "tokenizer": "whitespace" + }, + "delimit": { + "filter": [ + "lowercase", + "custom_delimiter" + ], + "tokenizer": "whitespace" + }, + "lowercase_keyword": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "keyword" + }, + "custom_browse_slash": { + "filter": [ + "lowercase" + ], + "type": "custom", + "tokenizer": "path_hierarchy" + } + }, + "tokenizer": { + "path_hierarchy_tokenizer": { + "type": "path_hierarchy", + "replacement": "/", + "delimiter": "." + }, + "slash_tokenizer": { + "pattern": "[/]", + "type": "pattern" + }, + "comma_tokenizer": { + "pattern": ",", + "type": "pattern" + } + } + } + } +} diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/DataFlowGraphBuilder.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/DataFlowGraphBuilder.java new file mode 100644 index 00000000000000..48e1169f57187c --- /dev/null +++ b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/DataFlowGraphBuilder.java @@ -0,0 +1,44 @@ +package com.linkedin.metadata.builders.graph; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.annotation.Nonnull; + +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.builders.graph.relationship.BaseRelationshipBuilder; +import com.linkedin.metadata.builders.graph.relationship.OwnedByBuilderFromOwnership; + +import com.linkedin.metadata.entity.DataFlowEntity; +import com.linkedin.metadata.snapshot.DataFlowSnapshot; + + +public class DataFlowGraphBuilder extends BaseGraphBuilder { + private static final Set RELATIONSHIP_BUILDERS = + Collections.unmodifiableSet(new HashSet() { + { + add(new OwnedByBuilderFromOwnership()); + } + }); + + public DataFlowGraphBuilder() { + super(DataFlowSnapshot.class, RELATIONSHIP_BUILDERS); + } + + @Nonnull + @Override + protected List buildEntities(@Nonnull DataFlowSnapshot snapshot) { + final DataFlowUrn urn = snapshot.getUrn(); + final DataFlowEntity entity = new DataFlowEntity().setUrn(urn) + .setOrchestrator(urn.getOrchestratorEntity()) + .setFlowId(urn.getFlowIdEntity()) + .setCluster(urn.getClusterEntity()); + + setRemovedProperty(snapshot, entity); + + return Collections.singletonList(entity); + } +} diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/DataJobGraphBuilder.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/DataJobGraphBuilder.java new file mode 100644 index 00000000000000..422ad0c967a99d --- /dev/null +++ b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/DataJobGraphBuilder.java @@ -0,0 +1,46 @@ +package com.linkedin.metadata.builders.graph; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.annotation.Nonnull; + +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.builders.graph.relationship.BaseRelationshipBuilder; +import com.linkedin.metadata.builders.graph.relationship.OwnedByBuilderFromOwnership; +import com.linkedin.metadata.builders.graph.relationship.RelationshipBuilderFromDataJobInputOutput; + + +import com.linkedin.metadata.entity.DataJobEntity; +import com.linkedin.metadata.snapshot.DataJobSnapshot; + + +public class DataJobGraphBuilder extends BaseGraphBuilder { + private static final Set RELATIONSHIP_BUILDERS = + Collections.unmodifiableSet(new HashSet() { + { + add(new OwnedByBuilderFromOwnership()); + add(new RelationshipBuilderFromDataJobInputOutput()); + } + }); + + public DataJobGraphBuilder() { + super(DataJobSnapshot.class, RELATIONSHIP_BUILDERS); + } + + @Nonnull + @Override + protected List buildEntities(@Nonnull DataJobSnapshot snapshot) { + final DataJobUrn urn = snapshot.getUrn(); + final DataJobEntity entity = new DataJobEntity().setUrn(urn) + .setFlow(urn.getFlowEntity()) + .setJobId(urn.getJobIdEntity()); + + setRemovedProperty(snapshot, entity); + + return Collections.singletonList(entity); + } +} diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/RegisteredGraphBuilders.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/RegisteredGraphBuilders.java index e2b14c4530bec9..8845afc832305a 100644 --- a/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/RegisteredGraphBuilders.java +++ b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/RegisteredGraphBuilders.java @@ -23,6 +23,8 @@ public class RegisteredGraphBuilders { add(new ChartGraphBuilder()); add(new CorpUserGraphBuilder()); add(new DashboardGraphBuilder()); + add(new DataFlowGraphBuilder()); + add(new DataJobGraphBuilder()); add(new DataProcessGraphBuilder()); add(new DatasetGraphBuilder()); add(new MLModelGraphBuilder()); diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/relationship/RelationshipBuilderFromDataJobInputOutput.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/relationship/RelationshipBuilderFromDataJobInputOutput.java new file mode 100644 index 00000000000000..819c0b063f245d --- /dev/null +++ b/metadata-builders/src/main/java/com/linkedin/metadata/builders/graph/relationship/RelationshipBuilderFromDataJobInputOutput.java @@ -0,0 +1,47 @@ +package com.linkedin.metadata.builders.graph.relationship; + +import com.linkedin.datajob.DataJobInputOutput; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.builders.graph.GraphBuilder; +import com.linkedin.metadata.relationship.DownstreamOf; +import com.linkedin.metadata.relationship.Consumes; +import com.linkedin.metadata.relationship.Produces; + +import java.util.List; +import java.util.Arrays; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; + +import static com.linkedin.metadata.dao.internal.BaseGraphWriterDAO.RemovalOption.*; + + +public class RelationshipBuilderFromDataJobInputOutput extends BaseRelationshipBuilder { + public RelationshipBuilderFromDataJobInputOutput() { + super(DataJobInputOutput.class); + } + + @Nonnull + @Override + public List buildRelationships(@Nonnull Urn urn, @Nonnull DataJobInputOutput inputOutput) { + final List downstreamEdges = inputOutput.getInputDatasets() + .stream() + .flatMap(upstreamDataset -> inputOutput.getOutputDatasets().stream() + .map(downstreamDataset -> new DownstreamOf().setSource(downstreamDataset).setDestination(upstreamDataset))) + .collect(Collectors.toList()); + + final List inputsList = inputOutput.getInputDatasets() + .stream() + .map(inputDataset -> new Consumes().setSource(urn).setDestination(inputDataset)) + .collect(Collectors.toList()); + + final List outputsList = inputOutput.getOutputDatasets() + .stream() + .map(outputDataset -> new Produces().setSource(urn).setDestination(outputDataset)) + .collect(Collectors.toList()); + + return Arrays.asList( + new GraphBuilder.RelationshipUpdates(downstreamEdges, REMOVE_ALL_EDGES_FROM_SOURCE), + new GraphBuilder.RelationshipUpdates(inputsList, REMOVE_ALL_EDGES_FROM_SOURCE), + new GraphBuilder.RelationshipUpdates(outputsList, REMOVE_ALL_EDGES_FROM_SOURCE)); + } +} diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/DataFlowIndexBuilder.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/DataFlowIndexBuilder.java new file mode 100644 index 00000000000000..89af77b9d89c9d --- /dev/null +++ b/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/DataFlowIndexBuilder.java @@ -0,0 +1,88 @@ +package com.linkedin.metadata.builders.search; + +import com.linkedin.common.Ownership; +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.datajob.DataFlowInfo; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.search.DataFlowDocument; +import com.linkedin.metadata.snapshot.DataFlowSnapshot; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class DataFlowIndexBuilder extends BaseIndexBuilder { + public DataFlowIndexBuilder() { + super(Collections.singletonList(DataFlowSnapshot.class), DataFlowDocument.class); + } + + @Nonnull + private static String buildBrowsePath(@Nonnull DataFlowUrn urn) { + return ("/" + urn.getOrchestratorEntity() + "/" + urn.getClusterEntity() + "/" + urn.getFlowIdEntity()).toLowerCase(); + } + + + @Nonnull + private static DataFlowDocument setUrnDerivedFields(@Nonnull DataFlowUrn urn) { + return new DataFlowDocument() + .setUrn(urn) + .setOrchestrator(urn.getOrchestratorEntity()) + .setFlowId(urn.getFlowIdEntity()) + .setCluster(urn.getClusterEntity()) + .setBrowsePaths(new StringArray(Collections.singletonList(buildBrowsePath(urn)))); + } + + @Nonnull + private DataFlowDocument getDocumentToUpdateFromAspect(@Nonnull DataFlowUrn urn, + @Nonnull DataFlowInfo info) { + final DataFlowDocument document = setUrnDerivedFields(urn); + document.setName(info.getName()); + if (info.getDescription() != null) { + document.setDescription(info.getDescription()); + } + if (info.getProject() != null) { + document.setProject(info.getProject()); + } + return document; + } + + @Nonnull + private DataFlowDocument getDocumentToUpdateFromAspect(@Nonnull DataFlowUrn urn, + @Nonnull Ownership ownership) { + return setUrnDerivedFields(urn) + .setOwners(BuilderUtils.getCorpUserOwners(ownership)); + } + + @Nonnull + private List getDocumentsToUpdateFromSnapshotType(@Nonnull DataFlowSnapshot snapshot) { + DataFlowUrn urn = snapshot.getUrn(); + return snapshot.getAspects().stream().map(aspect -> { + if (aspect.isDataFlowInfo()) { + return getDocumentToUpdateFromAspect(urn, aspect.getDataFlowInfo()); + } else if (aspect.isOwnership()) { + return getDocumentToUpdateFromAspect(urn, aspect.getOwnership()); + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + } + + @Nonnull + @Override + public List getDocumentsToUpdate(@Nonnull RecordTemplate snapshot) { + if (snapshot instanceof DataFlowSnapshot) { + return getDocumentsToUpdateFromSnapshotType((DataFlowSnapshot) snapshot); + } + return Collections.emptyList(); + } + + @Nonnull + @Override + public Class getDocumentType() { + return DataFlowDocument.class; + } +} diff --git a/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/DataJobIndexBuilder.java b/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/DataJobIndexBuilder.java new file mode 100644 index 00000000000000..0822006dc6b389 --- /dev/null +++ b/metadata-builders/src/main/java/com/linkedin/metadata/builders/search/DataJobIndexBuilder.java @@ -0,0 +1,84 @@ +package com.linkedin.metadata.builders.search; + +import com.linkedin.common.Ownership; +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.datajob.DataJobInfo; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.search.DataJobDocument; +import com.linkedin.metadata.snapshot.DataJobSnapshot; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class DataJobIndexBuilder extends BaseIndexBuilder { + public DataJobIndexBuilder() { + super(Collections.singletonList(DataJobSnapshot.class), DataJobDocument.class); + } + + @Nonnull + private static String buildBrowsePath(@Nonnull DataJobUrn urn) { + return ("/" + urn.getFlowEntity().getFlowIdEntity() + "/" + urn.getJobIdEntity()).toLowerCase(); + } + + + @Nonnull + private static DataJobDocument setUrnDerivedFields(@Nonnull DataJobUrn urn) { + return new DataJobDocument() + .setUrn(urn) + .setDataFlow(urn.getFlowEntity().getFlowIdEntity()) + .setJobId(urn.getJobIdEntity()) + .setBrowsePaths(new StringArray(Collections.singletonList(buildBrowsePath(urn)))); + } + + @Nonnull + private DataJobDocument getDocumentToUpdateFromAspect(@Nonnull DataJobUrn urn, + @Nonnull DataJobInfo info) { + final DataJobDocument document = setUrnDerivedFields(urn); + document.setName(info.getName()); + if (info.getDescription() != null) { + document.setDescription(info.getDescription()); + } + return document; + } + + @Nonnull + private DataJobDocument getDocumentToUpdateFromAspect(@Nonnull DataJobUrn urn, + @Nonnull Ownership ownership) { + return setUrnDerivedFields(urn) + .setOwners(BuilderUtils.getCorpUserOwners(ownership)); // TODO: should be optional? + } + + @Nonnull + private List getDocumentsToUpdateFromSnapshotType(@Nonnull DataJobSnapshot snapshot) { + DataJobUrn urn = snapshot.getUrn(); + return snapshot.getAspects().stream().map(aspect -> { + if (aspect.isDataJobInfo()) { + return getDocumentToUpdateFromAspect(urn, aspect.getDataJobInfo()); + } else if (aspect.isOwnership()) { + return getDocumentToUpdateFromAspect(urn, aspect.getOwnership()); + } + return null; + }).filter(Objects::nonNull).collect(Collectors.toList()); + } + + @Nonnull + @Override + public List getDocumentsToUpdate(@Nonnull RecordTemplate snapshot) { + if (snapshot instanceof DataJobSnapshot) { + return getDocumentsToUpdateFromSnapshotType((DataJobSnapshot) snapshot); + } + return Collections.emptyList(); + } + + @Nonnull + @Override + public Class getDocumentType() { + return DataJobDocument.class; + } +} diff --git a/metadata-builders/src/test/java/com/linkedin/metadata/builders/common/DataJobTestUtils.java b/metadata-builders/src/test/java/com/linkedin/metadata/builders/common/DataJobTestUtils.java new file mode 100644 index 00000000000000..0670154d992a7b --- /dev/null +++ b/metadata-builders/src/test/java/com/linkedin/metadata/builders/common/DataJobTestUtils.java @@ -0,0 +1,28 @@ +package com.linkedin.metadata.builders.common; + +import com.linkedin.datajob.DataJobInputOutput; +import com.linkedin.common.DatasetUrnArray; +import com.linkedin.common.urn.DatasetUrn; + +import static com.linkedin.metadata.testing.Owners.*; +import static com.linkedin.metadata.testing.Urns.*; +import static com.linkedin.metadata.utils.AuditStamps.*; + + +public class DataJobTestUtils { + + private DataJobTestUtils() { + // Util class should not have public constructor + } + + public static DataJobInputOutput makeDataJobInputOutput() { + DatasetUrn input1 = makeDatasetUrn("input1"); + DatasetUrn input2 = makeDatasetUrn("input2"); + DatasetUrn output1 = makeDatasetUrn("output1"); + DatasetUrn output2 = makeDatasetUrn("output2"); + + return new DataJobInputOutput() + .setInputDatasets(new DatasetUrnArray(input1, input2)) + .setOutputDatasets(new DatasetUrnArray(output1, output2)); + } +} \ No newline at end of file diff --git a/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/DataFlowGraphBuilderTest.java b/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/DataFlowGraphBuilderTest.java new file mode 100644 index 00000000000000..31d5ec21806270 --- /dev/null +++ b/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/DataFlowGraphBuilderTest.java @@ -0,0 +1,38 @@ +package com.linkedin.metadata.builders.graph; + +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.aspect.DataFlowAspectArray; +import com.linkedin.metadata.entity.DataFlowEntity; +import com.linkedin.metadata.snapshot.DataFlowSnapshot; + +import java.util.List; +import org.testng.annotations.Test; + +import static com.linkedin.metadata.testing.Urns.makeDataFlowUrn; +import static org.testng.Assert.*; + + +public class DataFlowGraphBuilderTest { + + @Test + public void testBuildEntity() { + DataFlowUrn urn = makeDataFlowUrn("etl_pipeline"); + DataFlowSnapshot snapshot = new DataFlowSnapshot().setUrn(urn).setAspects(new DataFlowAspectArray()); + DataFlowEntity expected = new DataFlowEntity().setUrn(urn) + .setOrchestrator(urn.getOrchestratorEntity()) + .setFlowId(urn.getFlowIdEntity()) + .setCluster(urn.getClusterEntity()); + + List dataFlowEntities = new DataFlowGraphBuilder().buildEntities(snapshot); + + assertEquals(dataFlowEntities.size(), 1); + assertEquals(dataFlowEntities.get(0), expected); + } + + @Test + public void testBuilderRegistered() { + assertEquals(RegisteredGraphBuilders.getGraphBuilder(DataFlowSnapshot.class).get().getClass(), + DataFlowGraphBuilder.class); + } +} diff --git a/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/DataJobGraphBuilderTest.java b/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/DataJobGraphBuilderTest.java new file mode 100644 index 00000000000000..b657ed75d094c9 --- /dev/null +++ b/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/DataJobGraphBuilderTest.java @@ -0,0 +1,37 @@ +package com.linkedin.metadata.builders.graph; + +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.aspect.DataJobAspectArray; +import com.linkedin.metadata.entity.DataJobEntity; +import com.linkedin.metadata.snapshot.DataJobSnapshot; + +import java.util.List; +import org.testng.annotations.Test; + +import static com.linkedin.metadata.testing.Urns.makeDataJobUrn; +import static org.testng.Assert.*; + + +public class DataJobGraphBuilderTest { + + @Test + public void testBuildEntity() { + DataJobUrn urn = makeDataJobUrn("foo_job"); + DataJobSnapshot snapshot = new DataJobSnapshot().setUrn(urn).setAspects(new DataJobAspectArray()); + DataJobEntity expected = new DataJobEntity().setUrn(urn) + .setFlow(urn.getFlowEntity()) + .setJobId(urn.getJobIdEntity()); + + List dataJobEntities = new DataJobGraphBuilder().buildEntities(snapshot); + + assertEquals(dataJobEntities.size(), 1); + assertEquals(dataJobEntities.get(0), expected); + } + + @Test + public void testBuilderRegistered() { + assertEquals(RegisteredGraphBuilders.getGraphBuilder(DataJobSnapshot.class).get().getClass(), + DataJobGraphBuilder.class); + } +} diff --git a/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/relationship/RelationshipBuilderFromDataJobInputOutputTest.java b/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/relationship/RelationshipBuilderFromDataJobInputOutputTest.java new file mode 100644 index 00000000000000..ca538be6c91bbe --- /dev/null +++ b/metadata-builders/src/test/java/com/linkedin/metadata/builders/graph/relationship/RelationshipBuilderFromDataJobInputOutputTest.java @@ -0,0 +1,96 @@ +package com.linkedin.metadata.builders.graph.relationship; + +import com.linkedin.datajob.DataJobInputOutput; +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.metadata.builders.graph.GraphBuilder; +import com.linkedin.metadata.dao.internal.BaseGraphWriterDAO; +import com.linkedin.metadata.relationship.DownstreamOf; +import com.linkedin.metadata.relationship.Consumes; +import com.linkedin.metadata.relationship.Produces; + +import java.util.List; +import org.testng.annotations.Test; + +import static com.linkedin.metadata.builders.common.DataJobTestUtils.*; +import static com.linkedin.metadata.testing.Urns.*; +import static org.testng.Assert.*; + + +public class RelationshipBuilderFromDataJobInputOutputTest { + @Test + public void testBuildRelationships() { + DataJobUrn job = makeDataJobUrn("my_job"); + DataJobInputOutput inputOutput = makeDataJobInputOutput(); + + List operations = + new RelationshipBuilderFromDataJobInputOutput().buildRelationships(job, inputOutput); + + assertEquals(operations.size(), 3); + + assertEquals(operations.get(0).getRelationships().size(), 4); + assertEquals( + operations.get(0).getRelationships().get(0), + makeDownstreamOf( + makeDatasetUrn("output1"), + makeDatasetUrn("input1"))); + assertEquals( + operations.get(0).getRelationships().get(1), + makeDownstreamOf( + makeDatasetUrn("output2"), + makeDatasetUrn("input1"))); + assertEquals( + operations.get(0).getRelationships().get(2), + makeDownstreamOf( + makeDatasetUrn("output1"), + makeDatasetUrn("input2"))); + assertEquals( + operations.get(0).getRelationships().get(3), + makeDownstreamOf( + makeDatasetUrn("output2"), + makeDatasetUrn("input2"))); + assertEquals(operations.get(0).getPreUpdateOperation(), + BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE); + + assertEquals(operations.get(1).getRelationships().size(), 2); + assertEquals( + operations.get(1).getRelationships().get(0), + makeConsumes(job, makeDatasetUrn("input1"))); + assertEquals( + operations.get(1).getRelationships().get(1), + makeConsumes(job, makeDatasetUrn("input2"))); + assertEquals(operations.get(1).getPreUpdateOperation(), + BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE); + + assertEquals(operations.get(2).getRelationships().size(), 2); + assertEquals( + operations.get(2).getRelationships().get(0), + makeProduces(job, makeDatasetUrn("output1"))); + assertEquals( + operations.get(2).getRelationships().get(1), + makeProduces(job, makeDatasetUrn("output2"))); + assertEquals(operations.get(2).getPreUpdateOperation(), + BaseGraphWriterDAO.RemovalOption.REMOVE_ALL_EDGES_FROM_SOURCE); + + } + + private DownstreamOf makeDownstreamOf(DatasetUrn source, DatasetUrn destination) { + return new DownstreamOf() + .setSource(source) + .setDestination(destination); + } + + private Consumes makeConsumes(DataJobUrn source, DatasetUrn destination) { + return new Consumes() + .setSource(source) + .setDestination(destination); + } + + private Produces makeProduces(DataJobUrn source, DatasetUrn destination) { + return new Produces() + .setSource(source) + .setDestination(destination); + } + +} + diff --git a/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/DataFlowActionRequestBuilder.java b/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/DataFlowActionRequestBuilder.java new file mode 100644 index 00000000000000..37f2de2114e8a7 --- /dev/null +++ b/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/DataFlowActionRequestBuilder.java @@ -0,0 +1,17 @@ +package com.linkedin.metadata.dao; + +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.metadata.snapshot.DataFlowSnapshot; + + +/** + * An action request builder for dataflow entities. + */ +public class DataFlowActionRequestBuilder extends BaseActionRequestBuilder { + + private static final String BASE_URI_TEMPLATE = "dataflows"; + + public DataFlowActionRequestBuilder() { + super(DataFlowSnapshot.class, DataFlowUrn.class, BASE_URI_TEMPLATE); + } +} diff --git a/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/DataJobActionRequestBuilder.java b/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/DataJobActionRequestBuilder.java new file mode 100644 index 00000000000000..b11f4d34172e8d --- /dev/null +++ b/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/DataJobActionRequestBuilder.java @@ -0,0 +1,17 @@ +package com.linkedin.metadata.dao; + +import com.linkedin.common.urn.DataJobUrn; +import com.linkedin.metadata.snapshot.DataJobSnapshot; + + +/** + * An action request builder for datajob entities. + */ +public class DataJobActionRequestBuilder extends BaseActionRequestBuilder { + + private static final String BASE_URI_TEMPLATE = "datajobs"; + + public DataJobActionRequestBuilder() { + super(DataJobSnapshot.class, DataJobUrn.class, BASE_URI_TEMPLATE); + } +} diff --git a/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/RequestBuilders.java b/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/RequestBuilders.java index 10a1d6b2e6069a..42e8ea71c02ca7 100644 --- a/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/RequestBuilders.java +++ b/metadata-dao-impl/restli-dao/src/main/java/com/linkedin/metadata/dao/RequestBuilders.java @@ -22,6 +22,8 @@ public class RequestBuilders { add(new CorpGroupActionRequestBuilder()); add(new CorpUserActionRequestBuilder()); add(new DashboardActionRequestBuilder()); + add(new DataFlowActionRequestBuilder()); + add(new DataJobActionRequestBuilder()); add(new DataPlatformActionRequestBuilder()); add(new DataProcessActionRequestBuilder()); add(new DatasetActionRequestBuilder()); diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/DataFlowEntity.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/DataFlowEntity.pdl new file mode 100644 index 00000000000000..1c9e777b52307b --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/DataFlowEntity.pdl @@ -0,0 +1,30 @@ +namespace com.linkedin.metadata.entity + +import com.linkedin.common.DataFlowUrn + +/** + * Data model for a DataFlow entity + */ +record DataFlowEntity includes BaseEntity { + + /** + * Urn for the DataFlow + */ + urn: DataFlowUrn + + /** + * Workflow orchestrator ex: Azkaban, Airflow + */ + orchestrator: optional string + + /** + * Id of the flow + */ + flowId: optional string + + /** + * Cluster of the flow + */ + cluster: optional string + +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/DataJobEntity.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/DataJobEntity.pdl new file mode 100644 index 00000000000000..91dfdeacecbf75 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/DataJobEntity.pdl @@ -0,0 +1,27 @@ +namespace com.linkedin.metadata.entity + +import com.linkedin.common.DataJobUrn +import com.linkedin.common.DataFlowUrn + + +/** + * Data model for a DataJob entity + */ +record DataJobEntity includes BaseEntity { + + /** + * Urn for the DataJob + */ + urn: DataJobUrn + + /** + * Urn of the associated DataFlow + */ + flow: optional DataFlowUrn + + /** + * Id of the job + */ + jobId: optional string + +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/Entity.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/Entity.pdl index 82fdd71e2e4751..6ba41ac73940cf 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/Entity.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/entity/Entity.pdl @@ -3,4 +3,4 @@ namespace com.linkedin.metadata.entity /** * A union of all supported entity types. */ -typeref Entity = union[CorpUserEntity, DatasetEntity, DataProcessEntity, MLModelEntity] \ No newline at end of file +typeref Entity = union[CorpUserEntity, DatasetEntity, DataProcessEntity, MLModelEntity, DataFlowEntity, DataJobEntity] \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Consumes.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Consumes.pdl new file mode 100644 index 00000000000000..1386f0a42c3d5b --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Consumes.pdl @@ -0,0 +1,11 @@ +namespace com.linkedin.metadata.relationship + +/** + * A generic model for the + */ +@pairings = [ { + "destination" : "com.linkedin.common.urn.DatasetUrn", + "source" : "com.linkedin.common.urn.DataJobUrn" +} ] +record Consumes includes BaseRelationship { +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/OwnedBy.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/OwnedBy.pdl index 711914186c3447..8ebcc506e50c5e 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/OwnedBy.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/OwnedBy.pdl @@ -14,6 +14,12 @@ import com.linkedin.common.OwnershipType }, { "destination" : "com.linkedin.common.urn.CorpuserUrn", "source" : "com.linkedin.common.urn.MLModelUrn" + }, { + "destination" : "com.linkedin.common.urn.CorpuserUrn", + "source" : "com.linkedin.common.urn.DataJobUrn" + }, { + "destination" : "com.linkedin.common.urn.CorpuserUrn", + "source" : "com.linkedin.common.urn.DataFlowUrn" } ] record OwnedBy includes BaseRelationship { diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Produces.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Produces.pdl new file mode 100644 index 00000000000000..7f7001e3e6c432 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Produces.pdl @@ -0,0 +1,11 @@ +namespace com.linkedin.metadata.relationship + +/** + * A generic model for the + */ +@pairings = [ { + "destination" : "com.linkedin.common.urn.DatasetUrn", + "source" : "com.linkedin.common.urn.DataJobUrn" +} ] +record Produces includes BaseRelationship { +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Relationship.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Relationship.pdl index 1e693a5b92ae69..982c3e888e9b91 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Relationship.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/relationship/Relationship.pdl @@ -3,4 +3,4 @@ namespace com.linkedin.metadata.relationship /** * A union of all supported relationship types. */ -typeref Relationship = union[Contains, IsPartOf, OwnedBy] \ No newline at end of file +typeref Relationship = union[Contains, IsPartOf, OwnedBy, Consumes, Produces] \ No newline at end of file diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/search/DataFlowDocument.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/DataFlowDocument.pdl new file mode 100644 index 00000000000000..2ee86f01d85d78 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/DataFlowDocument.pdl @@ -0,0 +1,51 @@ +namespace com.linkedin.metadata.search + +import com.linkedin.common.AccessLevel +import com.linkedin.common.DataFlowUrn + +/** + * Data model for DataFlow entity search + */ +record DataFlowDocument includes BaseDocument { + + /** + * Urn for the DataFlow + */ + urn: DataFlowUrn + + /** + * Id of the flow + */ + flowId: optional string + + /** + * Name of the flow + */ + name: optional string + + /** + * Description of the flow + */ + description: optional string + + /** + * Workflow orchestrator ex: Azkaban, Airflow + */ + orchestrator: optional string + + /** + * Cluster of the flow + */ + cluster: optional string + + /** + * Project of the flow + */ + project: optional string + + /** + * LDAP usernames of corp users who are the owners of this flow + */ + owners: optional array[string] + +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/search/DataJobDocument.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/DataJobDocument.pdl new file mode 100644 index 00000000000000..d5cabb9633f148 --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/search/DataJobDocument.pdl @@ -0,0 +1,41 @@ +namespace com.linkedin.metadata.search + +import com.linkedin.common.AccessLevel +import com.linkedin.common.DataJobUrn + +/** + * Data model for DataJob entity search + */ +record DataJobDocument includes BaseDocument { + + /* + * Urn for the DataJob + */ + urn: DataJobUrn + + /** + * Optional description of the job + */ + description: optional string + + /** + * Optional name of the job + */ + name: optional string + + /** + * Name of the associated data flow + */ + dataFlow: optional string + + /** + * Id of the job + */ + jobId: optional string + + /** + * LDAP usernames of corp users who are the owners of this job + */ + owners: optional array[string] + +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/Snapshot.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/Snapshot.pdl index 58b80697d3f1df..0e542c87887a50 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/Snapshot.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/snapshot/Snapshot.pdl @@ -8,6 +8,8 @@ typeref Snapshot = union[ CorpGroupSnapshot, CorpUserSnapshot, DashboardSnapshot, + DataFlowSnapshot, + DataJobSnapshot, DatasetSnapshot, DataProcessSnapshot, MLModelSnapshot, diff --git a/metadata-testing/metadata-models-test-utils/src/main/java/com/linkedin/metadata/testing/Urns.java b/metadata-testing/metadata-models-test-utils/src/main/java/com/linkedin/metadata/testing/Urns.java index 0a61e72ac7d802..995961be43f530 100644 --- a/metadata-testing/metadata-models-test-utils/src/main/java/com/linkedin/metadata/testing/Urns.java +++ b/metadata-testing/metadata-models-test-utils/src/main/java/com/linkedin/metadata/testing/Urns.java @@ -5,6 +5,8 @@ import com.linkedin.common.urn.CorpGroupUrn; import com.linkedin.common.urn.CorpuserUrn; import com.linkedin.common.urn.DashboardUrn; +import com.linkedin.common.urn.DataFlowUrn; +import com.linkedin.common.urn.DataJobUrn; import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.DataProcessUrn; import com.linkedin.common.urn.DatasetUrn; @@ -59,4 +61,16 @@ public static DashboardUrn makeDashboardUrn(@Nonnull String dashboardId) { public static MLModelUrn makeMLModelUrn(@Nonnull String name) { return new MLModelUrn(new DataPlatformUrn("mysql"), name, FabricType.DEV); } + + @Nonnull + public static DataFlowUrn makeDataFlowUrn(@Nonnull String name) { + return new DataFlowUrn("airflow", name, "production_cluster"); + } + + + @Nonnull + public static DataJobUrn makeDataJobUrn(@Nonnull String jobId) { + return new DataJobUrn(new DataFlowUrn("airflow", "my_flow", "production_cluster"), jobId); + } + }