From 761cd60c10ea7910dbe5a4c9e07cb4a04d1ac955 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 20 Dec 2022 00:54:46 -0500 Subject: [PATCH 1/2] chore(ingest): remove inferred args to MCPW --- metadata-ingestion/as-a-library.md | 10 +- .../examples/library/dashboard_usage.py | 19 ---- .../library/data_quality_mcpw_rest.py | 13 --- .../library/dataset_add_column_term.py | 4 - .../examples/library/dataset_schema.py | 4 - .../library/dataset_schema_with_tags_terms.py | 4 - .../examples/library/dataset_set_tag.py | 9 +- .../examples/library/dataset_set_term.py | 4 - .../library/lineage_chart_dashboard.py | 5 +- .../examples/library/lineage_dataset_chart.py | 5 +- .../library/lineage_dataset_job_dataset.py | 4 - .../lineage_emitter_datajob_finegrained.py | 5 +- .../lineage_emitter_dataset_finegrained.py | 4 - .../library/lineage_emitter_mcpw_rest.py | 4 - .../examples/library/lineage_job_dataflow.py | 8 +- metadata-ingestion/scripts/modeldocgen.py | 4 - .../datahub/api/entities/corpuser/corpuser.py | 12 +-- .../datahub/api/entities/datajob/dataflow.py | 10 -- .../datahub/api/entities/datajob/datajob.py | 16 --- .../dataprocess/dataprocess_instance.py | 22 ----- .../src/datahub/cli/delete_cli.py | 17 +--- metadata-ingestion/src/datahub/cli/migrate.py | 14 +-- .../src/datahub/cli/migration_utils.py | 4 - .../src/datahub/emitter/mcp_builder.py | 4 - .../src/datahub/emitter/mcp_patch_builder.py | 1 + .../src/datahub/ingestion/source/aws/glue.py | 12 --- .../ingestion/source/dbt/dbt_common.py | 12 +-- .../ingestion/source/delta_lake/source.py | 4 - .../ingestion/source/elastic_search.py | 19 ---- .../ingestion/source/iceberg/iceberg.py | 4 - .../source/iceberg/iceberg_profiler.py | 5 - .../src/datahub/ingestion/source/kafka.py | 4 - .../datahub/ingestion/source/kafka_connect.py | 15 --- .../src/datahub/ingestion/source/nifi.py | 16 --- .../src/datahub/ingestion/source/pulsar.py | 97 ++++++------------- .../source/snowflake/snowflake_utils.py | 4 - .../integrations/great_expectations/action.py | 10 -- .../tests/unit/test_allow_deny.py | 6 ++ 38 files changed, 51 insertions(+), 363 deletions(-) diff --git a/metadata-ingestion/as-a-library.md b/metadata-ingestion/as-a-library.md index b469b2470bf56..9bd0c871235e4 100644 --- a/metadata-ingestion/as-a-library.md +++ b/metadata-ingestion/as-a-library.md @@ -21,7 +21,7 @@ pip install -U `acryl-datahub[datahub-rest]` ```python import datahub.emitter.mce_builder as builder from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass +from datahub.metadata.schema_classes import DatasetPropertiesClass from datahub.emitter.rest_emitter import DatahubRestEmitter @@ -39,10 +39,7 @@ dataset_properties = DatasetPropertiesClass(description="This table stored the c # Construct a MetadataChangeProposalWrapper object. metadata_event = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"), - aspectName="datasetProperties", aspect=dataset_properties, ) @@ -75,7 +72,7 @@ pip install -U `acryl-datahub[datahub-kafka]` ```python import datahub.emitter.mce_builder as builder from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.metadata.schema_classes import ChangeTypeClass, DatasetPropertiesClass +from datahub.metadata.schema_classes import DatasetPropertiesClass from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig # Create an emitter to Kafka @@ -100,10 +97,7 @@ dataset_properties = DatasetPropertiesClass(description="This table stored the c # Construct a MetadataChangeProposalWrapper object. metadata_event = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_dataset_urn("bigquery", "my-project.my-dataset.user-table"), - aspectName="datasetProperties", aspect=dataset_properties, ) diff --git a/metadata-ingestion/examples/library/dashboard_usage.py b/metadata-ingestion/examples/library/dashboard_usage.py index ad3bd7c687b16..10edd72a9ea41 100644 --- a/metadata-ingestion/examples/library/dashboard_usage.py +++ b/metadata-ingestion/examples/library/dashboard_usage.py @@ -9,7 +9,6 @@ # Imports for metadata model classes from datahub.metadata.schema_classes import ( CalendarIntervalClass, - ChangeTypeClass, DashboardUsageStatisticsClass, DashboardUserUsageCountsClass, TimeWindowSizeClass, @@ -28,10 +27,7 @@ ] usage_day_1: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dashboard_urn("looker", "dashboards.999999"), - aspectName="dashboardUsageStatistics", aspect=DashboardUsageStatisticsClass( timestampMillis=round( datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000 @@ -45,10 +41,7 @@ absolute_usage_as_of_day_1: MetadataChangeProposalWrapper = ( MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dashboard_urn("looker", "dashboards.999999"), - aspectName="dashboardUsageStatistics", aspect=DashboardUsageStatisticsClass( timestampMillis=round( datetime.strptime("2022-02-09", "%Y-%m-%d").timestamp() * 1000 @@ -77,10 +70,7 @@ ), ] usage_day_2: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dashboard_urn("looker", "dashboards.999999"), - aspectName="dashboardUsageStatistics", aspect=DashboardUsageStatisticsClass( timestampMillis=round( datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000 @@ -94,10 +84,7 @@ absolute_usage_as_of_day_2: MetadataChangeProposalWrapper = ( MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dashboard_urn("looker", "dashboards.999999"), - aspectName="dashboardUsageStatistics", aspect=DashboardUsageStatisticsClass( timestampMillis=round( datetime.strptime("2022-02-10", "%Y-%m-%d").timestamp() * 1000 @@ -123,10 +110,7 @@ ), ] usage_day_3: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dashboard_urn("looker", "dashboards.999999"), - aspectName="dashboardUsageStatistics", aspect=DashboardUsageStatisticsClass( timestampMillis=round( datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000 @@ -140,10 +124,7 @@ absolute_usage_as_of_day_3: MetadataChangeProposalWrapper = ( MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dashboard_urn("looker", "dashboards.999999"), - aspectName="dashboardUsageStatistics", aspect=DashboardUsageStatisticsClass( timestampMillis=round( datetime.strptime("2022-02-11", "%Y-%m-%d").timestamp() * 1000 diff --git a/metadata-ingestion/examples/library/data_quality_mcpw_rest.py b/metadata-ingestion/examples/library/data_quality_mcpw_rest.py index 1b980e7640f0b..35192a7ae07d6 100644 --- a/metadata-ingestion/examples/library/data_quality_mcpw_rest.py +++ b/metadata-ingestion/examples/library/data_quality_mcpw_rest.py @@ -21,7 +21,6 @@ ) from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties -from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType from datahub.metadata.com.linkedin.pegasus2avro.timeseries import PartitionSpec @@ -39,10 +38,7 @@ def assertionUrn(info: AssertionInfo) -> str: def emitAssertionResult(assertionResult: AssertionRunEvent) -> None: dataset_assertionRunEvent_mcp = MetadataChangeProposalWrapper( - entityType="assertion", - changeType=ChangeType.UPSERT, entityUrn=assertionResult.assertionUrn, - aspectName="assertionRunEvent", aspect=assertionResult, ) @@ -58,10 +54,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None: ) # Construct a MetadataChangeProposalWrapper object for dataset dataset_mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeType.UPSERT, entityUrn=datasetUrn("bazTable"), - aspectName="datasetProperties", aspect=datasetProperties, ) @@ -93,10 +86,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None: # Construct a MetadataChangeProposalWrapper object. assertion_maxVal_mcp = MetadataChangeProposalWrapper( - entityType="assertion", - changeType=ChangeType.UPSERT, entityUrn=assertionUrn(assertion_maxVal), - aspectName="assertionInfo", aspect=assertion_maxVal, ) @@ -110,10 +100,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None: # Construct a MetadataChangeProposalWrapper object for assertion platform assertion_dataPlatformInstance_mcp = MetadataChangeProposalWrapper( - entityType="assertion", - changeType=ChangeType.UPSERT, entityUrn=assertionUrn(assertion_maxVal), - aspectName="dataPlatformInstance", aspect=assertion_dataPlatformInstance, ) # Emit Assertion entity platform aspect! diff --git a/metadata-ingestion/examples/library/dataset_add_column_term.py b/metadata-ingestion/examples/library/dataset_add_column_term.py index efb37228d846c..ea17938924ccc 100644 --- a/metadata-ingestion/examples/library/dataset_add_column_term.py +++ b/metadata-ingestion/examples/library/dataset_add_column_term.py @@ -10,7 +10,6 @@ # Imports for metadata model classes from datahub.metadata.schema_classes import ( AuditStampClass, - ChangeTypeClass, EditableSchemaFieldInfoClass, EditableSchemaMetadataClass, GlossaryTermAssociationClass, @@ -94,10 +93,7 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str: if need_write: event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="editableSchemaMetadata", aspect=current_editable_schema_metadata, ) graph.emit(event) diff --git a/metadata-ingestion/examples/library/dataset_schema.py b/metadata-ingestion/examples/library/dataset_schema.py index fec39427d4928..ed77df9ddd184 100644 --- a/metadata-ingestion/examples/library/dataset_schema.py +++ b/metadata-ingestion/examples/library/dataset_schema.py @@ -6,7 +6,6 @@ # Imports for metadata model classes from datahub.metadata.schema_classes import ( AuditStampClass, - ChangeTypeClass, DateTypeClass, OtherSchemaClass, SchemaFieldClass, @@ -16,10 +15,7 @@ ) event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD"), - aspectName="schemaMetadata", aspect=SchemaMetadataClass( schemaName="customer", # not used platform=make_data_platform_urn("hive"), # important <- platform must be an urn diff --git a/metadata-ingestion/examples/library/dataset_schema_with_tags_terms.py b/metadata-ingestion/examples/library/dataset_schema_with_tags_terms.py index cb8f930cc879f..eb9088844f04e 100644 --- a/metadata-ingestion/examples/library/dataset_schema_with_tags_terms.py +++ b/metadata-ingestion/examples/library/dataset_schema_with_tags_terms.py @@ -12,7 +12,6 @@ # Imports for metadata model classes from datahub.metadata.schema_classes import ( AuditStampClass, - ChangeTypeClass, GlobalTagsClass, GlossaryTermAssociationClass, GlossaryTermsClass, @@ -25,10 +24,7 @@ ) event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=make_dataset_urn(platform="hive", name="foodb.barTable", env="PROD"), - aspectName="schemaMetadata", aspect=SchemaMetadataClass( schemaName="customer", # not used platform=make_data_platform_urn("hive"), # important <- platform must be an urn diff --git a/metadata-ingestion/examples/library/dataset_set_tag.py b/metadata-ingestion/examples/library/dataset_set_tag.py index e1b73e2521092..9072f25323f9d 100644 --- a/metadata-ingestion/examples/library/dataset_set_tag.py +++ b/metadata-ingestion/examples/library/dataset_set_tag.py @@ -6,11 +6,7 @@ from datahub.emitter.rest_emitter import DatahubRestEmitter # Imports for metadata model classes -from datahub.metadata.schema_classes import ( - ChangeTypeClass, - GlobalTagsClass, - TagAssociationClass, -) +from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -18,10 +14,7 @@ dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD") tag_urn = make_tag_urn("purchase") event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="globalTags", aspect=GlobalTagsClass(tags=[TagAssociationClass(tag=tag_urn)]), ) diff --git a/metadata-ingestion/examples/library/dataset_set_term.py b/metadata-ingestion/examples/library/dataset_set_term.py index 11c1bdcddf38a..3572f93e17ad9 100644 --- a/metadata-ingestion/examples/library/dataset_set_term.py +++ b/metadata-ingestion/examples/library/dataset_set_term.py @@ -7,7 +7,6 @@ # Imports for metadata model classes from datahub.metadata.schema_classes import ( AuditStampClass, - ChangeTypeClass, GlossaryTermAssociationClass, GlossaryTermsClass, ) @@ -34,10 +33,7 @@ ) event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="glossaryTerms", aspect=terms_aspect, ) rest_emitter.emit(event) diff --git a/metadata-ingestion/examples/library/lineage_chart_dashboard.py b/metadata-ingestion/examples/library/lineage_chart_dashboard.py index 10813fa9871c6..c029f2ded3991 100644 --- a/metadata-ingestion/examples/library/lineage_chart_dashboard.py +++ b/metadata-ingestion/examples/library/lineage_chart_dashboard.py @@ -4,7 +4,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.com.linkedin.pegasus2avro.dashboard import DashboardInfoClass -from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass +from datahub.metadata.schema_classes import ChangeAuditStampsClass # Construct the DashboardInfo aspect with the charts -> dashboard lineage. charts_in_dashboard: List[str] = [ @@ -25,10 +25,7 @@ # Construct a MetadataChangeProposalWrapper object with the DashboardInfo aspect. # NOTE: This will overwrite all of the existing dashboard aspect information associated with this dashboard. chart_info_mcp = MetadataChangeProposalWrapper( - entityType="dashboard", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_dashboard_urn(platform="looker", name="my_dashboard_1"), - aspectName="dashboardInfo", aspect=dashboard_info, ) diff --git a/metadata-ingestion/examples/library/lineage_dataset_chart.py b/metadata-ingestion/examples/library/lineage_dataset_chart.py index 2c648055322da..cfbc92118d650 100644 --- a/metadata-ingestion/examples/library/lineage_dataset_chart.py +++ b/metadata-ingestion/examples/library/lineage_dataset_chart.py @@ -4,7 +4,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.com.linkedin.pegasus2avro.chart import ChartInfoClass -from datahub.metadata.schema_classes import ChangeAuditStampsClass, ChangeTypeClass +from datahub.metadata.schema_classes import ChangeAuditStampsClass # Construct the ChartInfo aspect with the input_datasets lineage. input_datasets: List[str] = [ @@ -24,10 +24,7 @@ # Construct a MetadataChangeProposalWrapper object with the ChartInfo aspect. # NOTE: This will overwrite all of the existing chartInfo aspect information associated with this chart. chart_info_mcp = MetadataChangeProposalWrapper( - entityType="chart", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_chart_urn(platform="looker", name="my_chart_1"), - aspectName="chartInfo", aspect=chart_info, ) diff --git a/metadata-ingestion/examples/library/lineage_dataset_job_dataset.py b/metadata-ingestion/examples/library/lineage_dataset_job_dataset.py index 4a570d22d1847..0c625c8e4a168 100644 --- a/metadata-ingestion/examples/library/lineage_dataset_job_dataset.py +++ b/metadata-ingestion/examples/library/lineage_dataset_job_dataset.py @@ -4,7 +4,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInputOutputClass -from datahub.metadata.schema_classes import ChangeTypeClass # Construct the DataJobInputOutput aspect. input_datasets: List[str] = [ @@ -33,12 +32,9 @@ # Construct a MetadataChangeProposalWrapper object. # NOTE: This will overwrite all of the existing lineage information associated with this job. datajob_input_output_mcp = MetadataChangeProposalWrapper( - entityType="dataJob", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_data_job_urn( orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="PROD" ), - aspectName="dataJobInputOutput", aspect=datajob_input_output, ) diff --git a/metadata-ingestion/examples/library/lineage_emitter_datajob_finegrained.py b/metadata-ingestion/examples/library/lineage_emitter_datajob_finegrained.py index d47fef4569e4c..8b29e0529425c 100644 --- a/metadata-ingestion/examples/library/lineage_emitter_datajob_finegrained.py +++ b/metadata-ingestion/examples/library/lineage_emitter_datajob_finegrained.py @@ -6,7 +6,7 @@ FineGrainedLineageDownstreamType, FineGrainedLineageUpstreamType, ) -from datahub.metadata.schema_classes import ChangeTypeClass, DataJobInputOutputClass +from datahub.metadata.schema_classes import DataJobInputOutputClass def datasetUrn(tbl): @@ -100,10 +100,7 @@ def fldUrn(tbl, fld): ) dataJobLineageMcp = MetadataChangeProposalWrapper( - entityType="dataJob", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_data_job_urn("spark", "Flow1", "Task1"), - aspectName="dataJobInputOutput", aspect=dataJobInputOutput, ) diff --git a/metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained.py b/metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained.py index 062c6b4e7a054..2d417c1646a4f 100644 --- a/metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained.py +++ b/metadata-ingestion/examples/library/lineage_emitter_dataset_finegrained.py @@ -9,7 +9,6 @@ Upstream, UpstreamLineage, ) -from datahub.metadata.schema_classes import ChangeTypeClass def datasetUrn(tbl): @@ -75,10 +74,7 @@ def fldUrn(tbl, fld): ) lineageMcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=datasetUrn("bar"), - aspectName="upstreamLineage", aspect=fieldLineages, ) diff --git a/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py b/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py index d1c934cba4040..1d769fea687db 100644 --- a/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py +++ b/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py @@ -8,7 +8,6 @@ UpstreamClass, UpstreamLineage, ) -from datahub.metadata.schema_classes import ChangeTypeClass upstream_table_1 = UpstreamClass( dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"), @@ -26,10 +25,7 @@ # Construct a MetadataChangeProposalWrapper object. lineage_mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_dataset_urn("bigquery", "downstream"), - aspectName="upstreamLineage", aspect=upstream_lineage, ) diff --git a/metadata-ingestion/examples/library/lineage_job_dataflow.py b/metadata-ingestion/examples/library/lineage_job_dataflow.py index a912c96abd9b2..6ab631491668f 100644 --- a/metadata-ingestion/examples/library/lineage_job_dataflow.py +++ b/metadata-ingestion/examples/library/lineage_job_dataflow.py @@ -2,7 +2,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.com.linkedin.pegasus2avro.datajob import DataJobInfoClass -from datahub.metadata.schema_classes import ChangeTypeClass, DataFlowInfoClass +from datahub.metadata.schema_classes import DataFlowInfoClass # Construct the DataJobInfo aspect with the job -> flow lineage. dataflow_urn = builder.make_data_flow_urn( @@ -12,10 +12,7 @@ dataflow_info = DataFlowInfoClass(name="LowLevelApiFlow") dataflow_info_mcp = MetadataChangeProposalWrapper( - entityType="dataflow", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataflow_urn, - aspectName="dataFlowInfo", aspect=dataflow_info, ) @@ -24,12 +21,9 @@ # Construct a MetadataChangeProposalWrapper object with the DataJobInfo aspect. # NOTE: This will overwrite all of the existing dataJobInfo aspect information associated with this job. datajob_info_mcp = MetadataChangeProposalWrapper( - entityType="dataJob", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_data_job_urn( orchestrator="airflow", flow_id="flow_old_api", job_id="job1", cluster="prod" ), - aspectName="dataJobInfo", aspect=datajob_info, ) diff --git a/metadata-ingestion/scripts/modeldocgen.py b/metadata-ingestion/scripts/modeldocgen.py index 44c01e6de5416..5621c8e3b203d 100644 --- a/metadata-ingestion/scripts/modeldocgen.py +++ b/metadata-ingestion/scripts/modeldocgen.py @@ -21,7 +21,6 @@ from datahub.ingestion.sink.file import FileSink, FileSinkConfig from datahub.metadata.schema_classes import ( BrowsePathsClass, - ChangeTypeClass, DatasetPropertiesClass, DatasetSnapshotClass, ForeignKeyConstraintClass, @@ -502,10 +501,7 @@ def strip_types(field_path: str) -> str: events.append(mce) mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=d.urn, - aspectName="subTypes", aspect=SubTypesClass(typeNames=["entity"]), ) events.append(mcp) diff --git a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py index 1f4c1d6fdca17..064103449f5c8 100644 --- a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py +++ b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py @@ -7,11 +7,7 @@ from datahub.emitter.kafka_emitter import DatahubKafkaEmitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.metadata.schema_classes import ( - ChangeTypeClass, - CorpUserInfoClass, - GroupMembershipClass, -) +from datahub.metadata.schema_classes import CorpUserInfoClass, GroupMembershipClass @dataclass @@ -58,9 +54,7 @@ def generate_group_membership_aspect(self) -> Iterable[GroupMembershipClass]: def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( - entityType="corpuser", entityUrn=str(self.urn), - aspectName="corpUserInfo", aspect=CorpUserInfoClass( active=True, # Deprecated, use CorpUserStatus instead. displayName=self.display_name, @@ -74,17 +68,13 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: fullName=self.full_name, countryCode=self.country_code, ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp for group_membership in self.generate_group_membership_aspect(): mcp = MetadataChangeProposalWrapper( - entityType="corpuser", entityUrn=str(self.urn), - aspectName="groupMembership", aspect=group_membership, - changeType=ChangeTypeClass.UPSERT, ) yield mcp diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index 8c4e2da92fe58..0dd16a6e21f94 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -5,7 +5,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( AuditStampClass, - ChangeTypeClass, DataFlowInfoClass, DataFlowSnapshotClass, GlobalTagsClass, @@ -91,36 +90,27 @@ def generate_mce(self) -> MetadataChangeEventClass: def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( - entityType="dataflow", entityUrn=str(self.urn), - aspectName="dataFlowInfo", aspect=DataFlowInfoClass( name=self.name if self.name is not None else self.id, description=self.description, customProperties=self.properties, externalUrl=self.url, ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp for owner in self.generate_ownership_aspect(): mcp = MetadataChangeProposalWrapper( - entityType="dataflow", entityUrn=str(self.urn), - aspectName="ownership", aspect=owner, - changeType=ChangeTypeClass.UPSERT, ) yield mcp for tag in self.generate_tags_aspect(): mcp = MetadataChangeProposalWrapper( - entityType="dataflow", entityUrn=str(self.urn), - aspectName="globalTags", aspect=tag, - changeType=ChangeTypeClass.UPSERT, ) yield mcp diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index 8335b0f66292e..7eb6fc8c8d1a9 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -6,7 +6,6 @@ from datahub.metadata.schema_classes import ( AuditStampClass, AzkabanJobTypeClass, - ChangeTypeClass, DataJobInfoClass, DataJobInputOutputClass, DataJobSnapshotClass, @@ -131,9 +130,7 @@ def generate_mce(self) -> MetadataChangeEventClass: def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( - entityType="datajob", entityUrn=str(self.urn), - aspectName="dataJobInfo", aspect=DataJobInfoClass( name=self.name if self.name is not None else self.id, type=AzkabanJobTypeClass.COMMAND, @@ -141,7 +138,6 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: customProperties=self.properties, externalUrl=self.url, ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp @@ -149,21 +145,15 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: for owner in self.generate_ownership_aspect(): mcp = MetadataChangeProposalWrapper( - entityType="datajob", entityUrn=str(self.urn), - aspectName="ownership", aspect=owner, - changeType=ChangeTypeClass.UPSERT, ) yield mcp for tag in self.generate_tags_aspect(): mcp = MetadataChangeProposalWrapper( - entityType="datajob", entityUrn=str(self.urn), - aspectName="globalTags", aspect=tag, - changeType=ChangeTypeClass.UPSERT, ) yield mcp @@ -184,26 +174,20 @@ def emit( def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( - entityType="datajob", entityUrn=str(self.urn), - aspectName="dataJobInputOutput", aspect=DataJobInputOutputClass( inputDatasets=[str(urn) for urn in self.inlets], outputDatasets=[str(urn) for urn in self.outlets], inputDatajobs=[str(urn) for urn in self.upstream_urns], ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp # Force entity materialization for iolet in self.inlets + self.outlets: mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=str(iolet), - aspectName="status", aspect=StatusClass(removed=False), - changeType=ChangeTypeClass.UPSERT, ) yield mcp diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index 5dde5fd1ff06e..fb276684d3b99 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -15,7 +15,6 @@ ) from datahub.metadata.schema_classes import ( AuditStampClass, - ChangeTypeClass, DataProcessInstanceRunEventClass, DataProcessInstanceRunResultClass, DataProcessRunStatusClass, @@ -96,15 +95,12 @@ def start_event_mcp( :param attempt: (int) the number of attempt of the execution with the same execution id """ mcp = MetadataChangeProposalWrapper( - entityType="dataProcessInstance", entityUrn=str(self.urn), - aspectName="dataProcessInstanceRunEvent", aspect=DataProcessInstanceRunEventClass( status=DataProcessRunStatusClass.STARTED, timestampMillis=start_timestamp_millis, attempt=attempt, ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp @@ -184,9 +180,7 @@ def end_event_mcp( :param attempt: (int) the attempt number of this execution """ mcp = MetadataChangeProposalWrapper( - entityType="dataProcessInstance", entityUrn=str(self.urn), - aspectName="dataProcessInstanceRunEvent", aspect=DataProcessInstanceRunEventClass( status=DataProcessRunStatusClass.COMPLETE, timestampMillis=end_timestamp_millis, @@ -198,7 +192,6 @@ def end_event_mcp( ), attempt=attempt, ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp @@ -237,9 +230,7 @@ def generate_mcp( :rtype: Iterable[MetadataChangeProposalWrapper] """ mcp = MetadataChangeProposalWrapper( - entityType="dataProcessInstance", entityUrn=str(self.urn), - aspectName="dataProcessInstanceProperties", aspect=DataProcessInstanceProperties( name=self.id, created=AuditStampClass( @@ -250,14 +241,11 @@ def generate_mcp( customProperties=self.properties, externalUrl=self.url, ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp mcp = MetadataChangeProposalWrapper( - entityType="dataProcessInstance", entityUrn=str(self.urn), - aspectName="dataProcessInstanceRelationships", aspect=DataProcessInstanceRelationships( upstreamInstances=[str(urn) for urn in self.upstream_urns], parentTemplate=str(self.template_urn) if self.template_urn else None, @@ -265,7 +253,6 @@ def generate_mcp( if self.parent_instance is not None else None, ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp @@ -348,36 +335,27 @@ def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance": def generate_inlet_outlet_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: if self.inlets: mcp = MetadataChangeProposalWrapper( - entityType="dataProcessInstance", entityUrn=str(self.urn), - aspectName="dataProcessInstanceInput", aspect=DataProcessInstanceInput( inputs=[str(urn) for urn in self.inlets] ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp if self.outlets: mcp = MetadataChangeProposalWrapper( - entityType="dataProcessInstance", entityUrn=str(self.urn), - aspectName="dataProcessInstanceOutput", aspect=DataProcessInstanceOutput( outputs=[str(urn) for urn in self.outlets] ), - changeType=ChangeTypeClass.UPSERT, ) yield mcp # Force entity materialization for iolet in self.inlets + self.outlets: mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=str(iolet), - aspectName="status", aspect=StatusClass(removed=False), - changeType=ChangeTypeClass.UPSERT, ) yield mcp diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 988e920107db8..30f5a525febd9 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -13,11 +13,7 @@ from datahub.cli import cli_utils from datahub.emitter import rest_emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.metadata.schema_classes import ( - ChangeTypeClass, - StatusClass, - SystemMetadataClass, -) +from datahub.metadata.schema_classes import StatusClass, SystemMetadataClass from datahub.telemetry import telemetry from datahub.upgrade import upgrade from datahub.utilities.urns.urn import guess_entity_type @@ -341,7 +337,6 @@ def delete_with_filters( urn, soft=soft, aspect_name=aspect_name, - entity_type=entity_type, dry_run=dry_run, cached_session_host=(session, gms_host), cached_emitter=emitter, @@ -354,7 +349,6 @@ def delete_with_filters( one_result = _delete_one_urn( urn, soft=soft, - entity_type=entity_type, dry_run=dry_run, cached_session_host=(session, gms_host), cached_emitter=emitter, @@ -370,16 +364,16 @@ def _delete_one_urn( urn: str, soft: bool = False, dry_run: bool = False, - entity_type: str = "dataset", aspect_name: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, cached_session_host: Optional[Tuple[sessions.Session, str]] = None, cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None, run_id: str = "delete-run-id", - deletion_timestamp: int = _get_current_time(), + deletion_timestamp: Optional[int] = None, is_soft_deleted: Optional[bool] = None, ) -> DeletionResult: + deletion_timestamp = deletion_timestamp or _get_current_time() soft_delete_msg: str = "" if dry_run and is_soft_deleted: soft_delete_msg = "(soft-deleted)" @@ -403,10 +397,7 @@ def _delete_one_urn( if not dry_run: emitter.emit_mcp( MetadataChangeProposalWrapper( - entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, entityUrn=urn, - aspectName="status", aspect=StatusClass(removed=True), systemMetadata=SystemMetadataClass( runId=run_id, lastObserved=deletion_timestamp @@ -448,7 +439,6 @@ def delete_one_urn_cmd( aspect_name: Optional[str] = None, soft: bool = False, dry_run: bool = False, - entity_type: str = "dataset", start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, cached_session_host: Optional[Tuple[sessions.Session, str]] = None, @@ -464,7 +454,6 @@ def delete_one_urn_cmd( urn, soft, dry_run, - entity_type, aspect_name, start_time, end_time, diff --git a/metadata-ingestion/src/datahub/cli/migrate.py b/metadata-ingestion/src/datahub/cli/migrate.py index b8c7ac21c61af..b46c0e5b134f5 100644 --- a/metadata-ingestion/src/datahub/cli/migrate.py +++ b/metadata-ingestion/src/datahub/cli/migrate.py @@ -24,7 +24,6 @@ ) from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.metadata.schema_classes import ( - ChangeTypeClass, ContainerKeyClass, ContainerPropertiesClass, DataPlatformInstanceClass, @@ -222,10 +221,7 @@ def dataplatform2instance_func( if not dry_run: rest_emitter.emit_mcp( MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=new_urn, - aspectName="dataPlatformInstance", aspect=DataPlatformInstanceClass( platform=make_data_platform_urn(platform), instance=make_dataplatform_instance_urn(platform, instance), @@ -253,10 +249,7 @@ def dataplatform2instance_func( ) # use mcpw mcp = MetadataChangeProposalWrapper( - entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, entityUrn=target_urn, - aspectName=aspect_name, aspect=aspect, ) if not dry_run: @@ -378,9 +371,7 @@ def migrate_containers( if not dry_run and not keep: log.info(f"will {'hard' if hard else 'soft'} delete {src_urn}") - delete_cli._delete_one_urn( - src_urn, soft=not hard, run_id=run_id, entity_type="container" - ) + delete_cli._delete_one_urn(src_urn, soft=not hard, run_id=run_id) migration_report.on_entity_migrated(src_urn, "status") # type: ignore print(f"{migration_report}") @@ -434,10 +425,7 @@ def process_container_relationships( ) # use mcpw mcp = MetadataChangeProposalWrapper( - entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, entityUrn=target_urn, - aspectName=aspect_name, aspect=aspect, ) diff --git a/metadata-ingestion/src/datahub/cli/migration_utils.py b/metadata-ingestion/src/datahub/cli/migration_utils.py index a5bf835aeb299..074e03212c792 100644 --- a/metadata-ingestion/src/datahub/cli/migration_utils.py +++ b/metadata-ingestion/src/datahub/cli/migration_utils.py @@ -8,7 +8,6 @@ from datahub.emitter.mce_builder import Aspect from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( - ChangeTypeClass, ChartInfoClass, ContainerClass, DataJobInputOutputClass, @@ -250,9 +249,6 @@ def clone_aspect( assert isinstance(aspect_value, DictWrapper) new_mcp = MetadataChangeProposalWrapper( entityUrn=dst_urn, - entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, - aspectName=a, aspect=aspect_value, systemMetadata=SystemMetadataClass( runId=run_id, diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 3bcfcd9909148..9fb559e1606d4 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -17,7 +17,6 @@ TimeStamp, ) from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties -from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType from datahub.metadata.schema_classes import ( ChangeTypeClass, ContainerClass, @@ -186,11 +185,8 @@ def wrap_aspect_as_workunit( wu = MetadataWorkUnit( id=f"{aspectName}-for-{entityUrn}", mcp=MetadataChangeProposalWrapper( - entityType=entityName, entityUrn=entityUrn, - aspectName=aspectName, aspect=aspect, - changeType=ChangeType.UPSERT, ), ) return wu diff --git a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py index 53e11978d8a70..a7c0e1c9c9935 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py @@ -51,6 +51,7 @@ def __init__( audit_header: Optional[KafkaAuditHeaderClass] = None, ) -> None: self.urn = urn + # TODO: Remove the entity_type parameter, as MCPW can infer it from the URN. self.entity_type = entity_type self.system_metadata = system_metadata self.audit_header = audit_header diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index a863a13da6e3b..382f5ed9f1909 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -75,7 +75,6 @@ SchemaMetadata, ) from datahub.metadata.schema_classes import ( - ChangeTypeClass, DataFlowInfoClass, DataFlowSnapshotClass, DataJobInfoClass, @@ -87,14 +86,12 @@ DatasetProfileClass, DatasetPropertiesClass, GlobalTagsClass, - JobStatusClass, MetadataChangeEventClass, OwnerClass, OwnershipClass, OwnershipTypeClass, PartitionSpecClass, PartitionTypeClass, - StatusClass, TagAssociationClass, UpstreamClass, UpstreamLineageClass, @@ -724,10 +721,7 @@ def get_lineage_if_enabled( ] ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=mce.proposedSnapshot.urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="upstreamLineage", aspect=upstream_lineage, ) return mcp @@ -742,10 +736,7 @@ def get_lineage_if_enabled( ] ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=s3_dataset_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="upstreamLineage", aspect=upstream_lineage, ) return mcp @@ -827,10 +818,7 @@ def _create_profile_mcp( ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=mce.proposedSnapshot.urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="datasetProfile", aspect=dataset_profile, ) return mcp diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 44338c3d7d102..97aeeefc654f5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -85,7 +85,6 @@ AssertionStdParametersClass, AssertionStdParameterTypeClass, AssertionTypeClass, - ChangeTypeClass, DataPlatformInstanceClass, DatasetAssertionInfoClass, DatasetAssertionScopeClass, @@ -1312,17 +1311,10 @@ def _create_subType_wu( subtypes = ["model", "view"] else: subtypes = [node.node_type] - subtype_mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, + subtype_wu = MetadataChangeProposalWrapper( entityUrn=node_datahub_urn, - aspectName="subTypes", aspect=SubTypesClass(typeNames=subtypes), - ) - subtype_wu = MetadataWorkUnit( - id=f"{subtype_mcp.entityUrn}-{subtype_mcp.aspectName}", - mcp=subtype_mcp, - ) + ).as_workunit() return subtype_wu def _create_lineage_aspect_for_dbt_node( diff --git a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py index 934395ffe694b..b7b4fd0e3228e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/delta_lake/source.py @@ -45,7 +45,6 @@ SchemaMetadata, ) from datahub.metadata.schema_classes import ( - ChangeTypeClass, DatasetPropertiesClass, NullTypeClass, OperationClass, @@ -179,9 +178,6 @@ def _create_operation_aspect_wu( ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", - aspectName="operation", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, aspect=operation_aspect, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py index d7a6e267e2eed..9420704bea39d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py +++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py @@ -39,7 +39,6 @@ ArrayTypeClass, BooleanTypeClass, BytesTypeClass, - ChangeTypeClass, DataPlatformInstanceClass, DatasetPropertiesClass, DateTypeClass, @@ -337,13 +336,10 @@ def _get_data_stream_index_count_mcps( platform_instance=self.source_config.platform_instance, ) yield MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - aspectName="datasetProperties", aspect=DatasetPropertiesClass( customProperties={"numPartitions": str(count)} ), - changeType=ChangeTypeClass.UPSERT, ) def _extract_mcps( @@ -396,27 +392,19 @@ def _extract_mcps( env=self.source_config.env, ) yield MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - aspectName="schemaMetadata", aspect=schema_metadata, - changeType=ChangeTypeClass.UPSERT, ) # 2. Construct and emit the status aspect. yield MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - aspectName="status", aspect=StatusClass(removed=False), - changeType=ChangeTypeClass.UPSERT, ) # 3. Construct and emit subtype yield MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - aspectName="subTypes", aspect=SubTypesClass( typeNames=[ "Index Template" @@ -426,7 +414,6 @@ def _extract_mcps( else "Datastream" ] ), - changeType=ChangeTypeClass.UPSERT, ) # 4. Construct and emit properties if needed. Will attempt to get the following properties @@ -453,26 +440,20 @@ def _extract_mcps( custom_properties["num_replicas"] = num_replicas yield MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - aspectName="datasetProperties", aspect=DatasetPropertiesClass(customProperties=custom_properties), - changeType=ChangeTypeClass.UPSERT, ) # 5. Construct and emit platform instance aspect if self.source_config.platform_instance: yield MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - aspectName="dataPlatformInstance", aspect=DataPlatformInstanceClass( platform=make_data_platform_urn(self.platform), instance=make_dataplatform_instance_urn( self.platform, self.source_config.platform_instance ), ), - changeType=ChangeTypeClass.UPSERT, ) def get_report(self): diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 3f017ad436c81..cd66d0b1e57b7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -51,7 +51,6 @@ SchemaMetadata, ) from datahub.metadata.schema_classes import ( - ChangeTypeClass, DataPlatformInstanceClass, DatasetPropertiesClass, OwnerClass, @@ -249,10 +248,7 @@ def _get_dataplatform_instance_aspect( # If we are a platform instance based source, emit the instance aspect if self.config.platform_instance: mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="dataPlatformInstance", aspect=DataPlatformInstanceClass( platform=make_data_platform_urn(self.platform), instance=make_dataplatform_instance_urn( diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py index d1bbda01deb97..766477e3a1cd8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py @@ -21,7 +21,6 @@ IcebergSourceReport, ) from datahub.metadata.schema_classes import ( - ChangeTypeClass, DatasetFieldProfileClass, DatasetProfileClass, ) @@ -177,12 +176,8 @@ def profile_table( ) dataset_profile.fieldProfiles.append(column_profile) - # https://github.com/linkedin/datahub/blob/599edd22aeb6b17c71e863587f606c73b87e3b58/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py#L829 mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="datasetProfile", aspect=dataset_profile, ) wu = MetadataWorkUnit(id=f"profile-{dataset_name}", mcp=mcp) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index bd39683f81e9d..32a18bc179603 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -42,7 +42,6 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathsClass, - ChangeTypeClass, DataPlatformInstanceClass, SubTypesClass, ) @@ -236,10 +235,7 @@ def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]: subtype_wu = MetadataWorkUnit( id=f"{topic}-subtype", mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=dataset_urn, - aspectName="subTypes", aspect=SubTypesClass(typeNames=["topic"]), ), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py index 15f6bfdf6a55b..109006ed66a14 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py @@ -1063,10 +1063,7 @@ def construct_flow_workunit( ) mcp = MetadataChangeProposalWrapper( - entityType="dataFlow", entityUrn=flow_urn, - changeType=models.ChangeTypeClass.UPSERT, - aspectName="dataFlowInfo", aspect=models.DataFlowInfoClass( name=connector_name, description=f"{connector_type.capitalize()} connector using `{connector_class}` plugin.", @@ -1138,10 +1135,7 @@ def construct_job_workunits( ] mcp = MetadataChangeProposalWrapper( - entityType="dataJob", entityUrn=job_urn, - changeType=models.ChangeTypeClass.UPSERT, - aspectName="dataJobInfo", aspect=models.DataJobInfoClass( name=f"{connector_name}:{job_id}", type="COMMAND", @@ -1159,10 +1153,7 @@ def construct_job_workunits( yield wu mcp = MetadataChangeProposalWrapper( - entityType="dataJob", entityUrn=job_urn, - changeType=models.ChangeTypeClass.UPSERT, - aspectName="dataJobInputOutput", aspect=models.DataJobInputOutputClass( inputDatasets=inlets, outputDatasets=outlets, @@ -1198,15 +1189,12 @@ def construct_lineage_workunits( ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=builder.make_dataset_urn_with_platform_instance( target_platform, target_dataset, platform_instance=target_platform_instance, env=self.config.env, ), - changeType=models.ChangeTypeClass.UPSERT, - aspectName="dataPlatformInstance", aspect=models.DataPlatformInstanceClass( platform=builder.make_data_platform_urn(target_platform), instance=builder.make_dataplatform_instance_urn( @@ -1222,15 +1210,12 @@ def construct_lineage_workunits( yield wu if source_dataset: mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=builder.make_dataset_urn_with_platform_instance( source_platform, source_dataset, platform_instance=source_platform_instance, env=self.config.env, ), - changeType=models.ChangeTypeClass.UPSERT, - aspectName="dataPlatformInstance", aspect=models.DataPlatformInstanceClass( platform=builder.make_data_platform_urn(source_platform), instance=builder.make_dataplatform_instance_urn( diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index 9b628dd03e4ec..aa6d8feee3b26 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -28,7 +28,6 @@ from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( - ChangeTypeClass, DataFlowInfoClass, DataJobInfoClass, DataJobInputOutputClass, @@ -983,10 +982,7 @@ def construct_flow_workunits( flow_properties: Optional[Dict[str, str]] = None, ) -> Iterable[MetadataWorkUnit]: mcp = MetadataChangeProposalWrapper( - entityType="dataFlow", entityUrn=flow_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="dataFlowInfo", aspect=DataFlowInfoClass( name=flow_name, customProperties=flow_properties, @@ -1017,10 +1013,7 @@ def construct_job_workunits( job_properties = {k: v for k, v in job_properties.items() if v is not None} mcp = MetadataChangeProposalWrapper( - entityType="dataJob", entityUrn=job_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="dataJobInfo", aspect=DataJobInfoClass( name=job_name, type=job_type, @@ -1043,10 +1036,7 @@ def construct_job_workunits( inputJobs.sort() mcp = MetadataChangeProposalWrapper( - entityType="dataJob", entityUrn=job_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="dataJobInputOutput", aspect=DataJobInputOutputClass( inputDatasets=inlets, outputDatasets=outlets, inputDatajobs=inputJobs ), @@ -1073,10 +1063,7 @@ def construct_dataset_workunits( ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="dataPlatformInstance", aspect=DataPlatformInstanceClass( platform=builder.make_data_platform_urn(dataset_platform) ), @@ -1092,10 +1079,7 @@ def construct_dataset_workunits( yield wu mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn=dataset_urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="datasetProperties", aspect=DatasetPropertiesClass( externalUrl=external_url, customProperties=datasetProperties ), diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index 6efe86afac340..dc2f6550217dd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -44,7 +44,6 @@ ) from datahub.metadata.schema_classes import ( BrowsePathsClass, - ChangeTypeClass, DataPlatformInstanceClass, DatasetPropertiesClass, SubTypesClass, @@ -414,32 +413,20 @@ def _extract_record( env=self.config.env, ) - status_wu = MetadataWorkUnit( - id=f"{dataset_urn}-status", - mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="status", - aspect=StatusClass(removed=False), - ), - ) + status_wu = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=StatusClass(removed=False), + ).as_workunit() self.report.report_workunit(status_wu) yield status_wu # 2. Emit schemaMetadata aspect schema, schema_metadata = self._get_schema_metadata(pulsar_topic, platform_urn) if schema_metadata is not None: - schema_metadata_wu = MetadataWorkUnit( - id=f"{dataset_urn}-schemaMetadata", - mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="schemaMetadata", - aspect=schema_metadata, - ), - ) + schema_metadata_wu = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=schema_metadata, + ).as_workunit() self.report.report_workunit(schema_metadata_wu) yield schema_metadata_wu @@ -454,19 +441,13 @@ def _extract_record( # Add some static properties to the schema properties schema.properties.update(schema_properties) - dataset_properties_wu = MetadataWorkUnit( - id=f"{dataset_urn}-datasetProperties", - mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="datasetProperties", - aspect=DatasetPropertiesClass( - description=schema.schema_description, - customProperties=schema.properties, - ), + dataset_properties_wu = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=DatasetPropertiesClass( + description=schema.schema_description, + customProperties=schema.properties, ), - ) + ).as_workunit() self.report.report_workunit(dataset_properties_wu) yield dataset_properties_wu @@ -480,52 +461,34 @@ def _extract_record( else pulsar_path ) - browse_path_wu = MetadataWorkUnit( - id=f"{dataset_urn}-browsePaths", - mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="browsePaths", - aspect=BrowsePathsClass( - [f"/{self.config.env.lower()}/{self.platform}/{browse_path_suffix}"] - ), + browse_path_wu = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=BrowsePathsClass( + [f"/{self.config.env.lower()}/{self.platform}/{browse_path_suffix}"] ), - ) + ).as_workunit() self.report.report_workunit(browse_path_wu) yield browse_path_wu # 5. Emit dataPlatformInstance aspect. if self.config.platform_instance: - platform_instance_wu = MetadataWorkUnit( - id=f"{dataset_urn}-dataPlatformInstance", - mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="dataPlatformInstance", - aspect=DataPlatformInstanceClass( - platform=platform_urn, - instance=make_dataplatform_instance_urn( - self.platform, self.config.platform_instance - ), + platform_instance_wu = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=DataPlatformInstanceClass( + platform=platform_urn, + instance=make_dataplatform_instance_urn( + self.platform, self.config.platform_instance ), ), - ) + ).as_workunit() self.report.report_workunit(platform_instance_wu) yield platform_instance_wu # 6. Emit subtype aspect marking this as a "topic" - subtype_wu = MetadataWorkUnit( - id=f"{dataset_urn}-subTypes", - mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="subTypes", - aspect=SubTypesClass(typeNames=["topic"]), - ), - ) + subtype_wu = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=SubTypesClass(typeNames=["topic"]), + ).as_workunit() self.report.report_workunit(subtype_wu) yield subtype_wu diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index 88a5057c6431c..a779f43159554 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -11,7 +11,6 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report -from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType from datahub.metadata.schema_classes import _Aspect logger: logging.Logger = logging.getLogger(__name__) @@ -190,11 +189,8 @@ def wrap_aspect_as_workunit( wu = MetadataWorkUnit( id=id, mcp=MetadataChangeProposalWrapper( - entityType=entityName, entityUrn=entityUrn, - aspectName=aspectName, aspect=aspect, - changeType=ChangeType.UPSERT, ), ) self.report.report_workunit(wu) diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 4f45dede47fba..29ef2d6724591 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -53,7 +53,6 @@ DatasetAssertionScope, ) from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance -from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass from datahub.utilities.sql_parser import DefaultSQLParser @@ -170,30 +169,21 @@ def _run( # Construct a MetadataChangeProposalWrapper object. assertion_info_mcp = MetadataChangeProposalWrapper( - entityType="assertion", - changeType=ChangeType.UPSERT, entityUrn=assertion["assertionUrn"], - aspectName="assertionInfo", aspect=assertion["assertionInfo"], ) emitter.emit_mcp(assertion_info_mcp) # Construct a MetadataChangeProposalWrapper object. assertion_platform_mcp = MetadataChangeProposalWrapper( - entityType="assertion", - changeType=ChangeType.UPSERT, entityUrn=assertion["assertionUrn"], - aspectName="dataPlatformInstance", aspect=assertion["assertionPlatform"], ) emitter.emit_mcp(assertion_platform_mcp) for assertionResult in assertion["assertionResults"]: dataset_assertionResult_mcp = MetadataChangeProposalWrapper( - entityType="assertion", - changeType=ChangeType.UPSERT, entityUrn=assertionResult.assertionUrn, - aspectName="assertionRunEvent", aspect=assertionResult, ) diff --git a/metadata-ingestion/tests/unit/test_allow_deny.py b/metadata-ingestion/tests/unit/test_allow_deny.py index 6f3af9cba287f..13ec4a66fffe8 100644 --- a/metadata-ingestion/tests/unit/test_allow_deny.py +++ b/metadata-ingestion/tests/unit/test_allow_deny.py @@ -16,6 +16,12 @@ def test_single_table() -> None: assert pattern.allowed("foo.mytable") +def test_prefix_match(): + pattern = AllowDenyPattern(allow=["mytable"]) + assert pattern.allowed("mytable.foo") + assert not pattern.allowed("foo.mytable") + + def test_default_deny() -> None: pattern = AllowDenyPattern(allow=["foo.mytable"]) assert not pattern.allowed("foo.bar") From ffa415f2121861e970db1ae25c7bd68bb0f7689a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 20 Dec 2022 01:03:16 -0500 Subject: [PATCH 2/2] a few more --- metadata-ingestion/src/datahub/cli/migrate.py | 2 -- metadata-ingestion/src/datahub/cli/migration_utils.py | 2 +- .../src/datahub/ingestion/source/usage/bigquery_usage.py | 9 +-------- .../src/datahub/ingestion/source/usage/redshift_usage.py | 9 +-------- .../src/datahub/ingestion/source/usage/usage_common.py | 4 ---- 5 files changed, 3 insertions(+), 23 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/migrate.py b/metadata-ingestion/src/datahub/cli/migrate.py index b46c0e5b134f5..17c2fdf927849 100644 --- a/metadata-ingestion/src/datahub/cli/migrate.py +++ b/metadata-ingestion/src/datahub/cli/migrate.py @@ -209,7 +209,6 @@ def dataplatform2instance_func( for mcp in migration_utils.clone_aspect( src_entity_urn, aspect_names=migration_utils.all_aspects, - entity_type="dataset", dst_urn=new_urn, dry_run=dry_run, run_id=run_id, @@ -336,7 +335,6 @@ def migrate_containers( for mcp in migration_utils.clone_aspect( src_urn, aspect_names=migration_utils.all_aspects, - entity_type="container", dst_urn=dst_urn, dry_run=dry_run, run_id=run_id, diff --git a/metadata-ingestion/src/datahub/cli/migration_utils.py b/metadata-ingestion/src/datahub/cli/migration_utils.py index 074e03212c792..09bf58bf4ec76 100644 --- a/metadata-ingestion/src/datahub/cli/migration_utils.py +++ b/metadata-ingestion/src/datahub/cli/migration_utils.py @@ -21,6 +21,7 @@ log = logging.getLogger(__name__) +# TODO: Make this dynamic based on the real aspect class map. all_aspects = [ "schemaMetadata", "datasetProperties", @@ -232,7 +233,6 @@ def modify_urn_list_for_aspect( def clone_aspect( src_urn: str, - entity_type: str, aspect_names: List[str], dst_urn: str, run_id: str = str(uuid.uuid4()), diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index 078a066b73b86..f4e11a935d864 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -35,11 +35,7 @@ from datahub.ingestion.source_report.usage.bigquery_usage import ( BigQueryUsageSourceReport, ) -from datahub.metadata.schema_classes import ( - ChangeTypeClass, - OperationClass, - OperationTypeClass, -) +from datahub.metadata.schema_classes import OperationClass, OperationTypeClass from datahub.utilities.delayed_iter import delayed_iter from datahub.utilities.parsing_util import ( get_first_missing_key, @@ -1128,9 +1124,6 @@ def _create_operation_aspect_work_unit( operation_aspect.numAffectedRows = event.query_event.numAffectedRows mcp = MetadataChangeProposalWrapper( - entityType="dataset", - aspectName="operation", - changeType=ChangeTypeClass.UPSERT, entityUrn=_table_ref_to_urn( destination_table, env=self.config.env, diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py index 841217a1e60ff..1fb3fe3060083 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/redshift_usage.py @@ -30,11 +30,7 @@ BaseUsageConfig, GenericAggregatedDataset, ) -from datahub.metadata.schema_classes import ( - ChangeTypeClass, - OperationClass, - OperationTypeClass, -) +from datahub.metadata.schema_classes import OperationClass, OperationTypeClass logger = logging.getLogger(__name__) @@ -346,9 +342,6 @@ def _gen_operation_aspect_workunits_from_access_events( ), ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", - aspectName="operation", - changeType=ChangeTypeClass.UPSERT, entityUrn=builder.make_dataset_urn_with_platform_instance( "redshift", resource.lower(), diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py index dba6610c6532d..915dce342a8ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/usage_common.py @@ -16,7 +16,6 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( - ChangeTypeClass, DatasetFieldUsageCountsClass, DatasetUsageStatisticsClass, DatasetUserUsageCountsClass, @@ -123,9 +122,6 @@ def make_usage_workunit( ) mcp = MetadataChangeProposalWrapper( - entityType="dataset", - aspectName="datasetUsageStatistics", - changeType=ChangeTypeClass.UPSERT, entityUrn=urn_builder(self.resource), aspect=usageStats, )